一种汽车舆情快速处理方法与流程

专利2023-06-05  102



1.本发明涉及舆情数据处理技术领域,尤其涉及一种汽车舆情快速处理方法。


背景技术:

2.etl,是英文 extract-transform-load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。etl一词较常用在数据仓库,但其对象并不限于数据仓库。
3.当前的etl处理的技术主要分为定时批量etl处理和实时分布式计算框架,定时批量etl处理技术比较成熟,大部分bi项目的数据处理都会涉及到定时etl处理。实时分布式计算框架是后发展起来的etl技术,其相比定时etl方案,其效率有数量级上的飞跃。当前比较主流的实时分布式计算技术框架有spark streaming、flink等。
4.舆情数据的特点是每一条数据都需要进行甄别,打上汽车行业相关标签,并且需要确定数据的情感倾向,而且舆情数据质量参差不齐,需要进行大量异常处理。在etl处理过程中,并不需要在处理阶段进行大量计算和汇总。针对这样的数据特点,比较适合做海量数据计算和汇总的实时分布式计算技术框架不太适用,所以当前一般用定时批量etl处理技术进行处理。
5.如图2所示,现有技术方案的流程是,从网络获取到的文章数据,放入数据库中,调度平台会定时调用ai程序,ai程序读取数据库中指定时间段的原始文章数据集合,对数据打标签和判断情感,把处理结果存回数据库,调度平台再调用etl工具kettle,kettle批量对上一步的结果处理异常数据和规范格式,把清洗结果和历史原始数据存回数据库供前台调用。
6.当前方案处理舆情数据流程,包括以下步骤:s1,首先做基础数据的收集和整理:s1.1,数据收集阶段,数据类型包括但不限于新闻文章、论坛贴吧、视频视频等本品历史数据。
7.s1.2,调度平台定时调用爬虫程序,从网络爬取公开互联网数据。
8.s1.3,数据存储,爬取到的文章标题、文章内容、发布时间、爬取时间、文章连接、所属渠道等,存储在数据库的抓取文章表中。
9.s2,收集来的数据,定时调用ai程序进行处理:s2.1,调度平台定时调度ai程序,批量读取抓取文章表中的新文章。
10.s2.1,对每条数据去除噪声,规范符号。
11.s2.2,对每条数据进行英文与标点格式化。
12.s2.3,对每条数据打上分类标签。
13.s2.4,对每条数据打上热词标签。
14.s2.5,对每条数据打上业务类型标签。
15.s2.6,对每条数据打上情感标签。
16.s2.7,把此批量数据处理的结果存入数据库ai处理结果表中,包括文章与标签关联关系表,文章情感信息,热词关联关系表等。
17.s3,调度程序定时调度kettle etl处理程序,对抓取文章表中的新文章进行处理:s3.1,根据ai处理结果标识,批量获取新文章。
18.s3.2,筛选出包含指定分类标识的文章,包含这些标识的文章需要保留。
19.s3.3,筛选出这些文章的基本信息,如rowkey、标题、链接、发布时间、抓取时间等,存入文章基本信息表里。
20.s3.4,筛选出这些文章的文章rowkey和内容,保存到es中。
21.s3.5,不管是否包含标识,把所有新文章存入历史文章表中,供以后查找原始历史记录。
22.s4,调度平台定时调度etl,删除抓取文章表中已经处理过的文章。
23.s5,调度平台定时调用java程序,java程序扫描新抓取的文章列表,按规则判断要预警的文章,向钉钉群发送预警。预警规则:s5.1,当天发布的文章。
24.s5.2,情感判断为负向的文章。
25.s5.3,提到某些关键词的文章。
26.s5.4,符合特定规则的文章。
27.现有技术主要问题是针对网络舆情的响应时间比较久,由于是定时任务,从抓取到数据开始,经过ai处理-》etl处理-》预警的一套流程时间,由于数据量的原因,ai打标就需要20-25分钟,整个执行过程超过30分钟,为了保证资源不被重复占用导致资源耗尽,所以调度定在1小时执行一次。
28.之前也研究过使用实时数据处理方案,但是由于舆情数据的特点,目前主流的实时数据处理方案如spark streaming、flink,处理这类数据时很难针对每条数据做不同的判断处理,或者说这些计算框架本身只适合做分布式计算,不适合做这样的业务处理。所以当前汽车舆情数据的处理方案主要以定时批量处理为主。


技术实现要素:

29.本发明的目的在于解决目前主流的实时数据处理方案如spark streaming、flink,处理这类数据时很难针对每条数据做不同的判断处理,或者说这些计算框架本身只适合做分布式计算,不适合做这样的业务处理的不足,提供了一种汽车舆情快速处理方法。
30.本发明的目的是通过以下技术方案来实现的:一种汽车舆情快速处理方法,包括以下具体步骤:通过爬虫程序爬取网络舆情文章和基础信息,发送到分布式流平台(kafka,以下简称kafka);启动与kafka分区相同数量的数据抽取转换加载(etl,以下简称etl)处理程序,etl处理程序实时并发读取kafka的每一条数据;通过对数据进行预处理,并对文章进行分类处理;在大规模并行处理(mpp)数据库产品starrocks(原名dorisdb,以下简称starrocks)中新建和kafka中对应的定时加载模块,定时从kafka中加载数据并进行推送。
31.所述基础信息包括文章标题、文章内容、发布时间、爬取时间、文章连接和文章获取渠道。
32.所述对数据进行预处理具体步骤为:加载json格式数据;去掉文章标题中的空格和符号,形成纯净字符;格式化爬取时间;处理异常数据(比如日期格式错误、不可识别的文字符号、缺少发布日期、阅读量等必要字段);判断是否是当天文章。
33.所述对文章进行分类处理具体步骤为:判断是否是当天文章:若不是当天文章,而是以往已删除的文章,调用删除文章处理函数;若不是当天文章,而是以往未爬取到的新文章,调用新文章处理函数;若不是当天文章,而是以往已经入库过的文章,调用老文章处理函数;若是当天文章,并且是已删除的当天文章,调用删除文章处理函数;若是当天文章,并且是已经入库过的当天文章,调用老文章处理函数;若是当天新文章,则调用ai服务的情感分析服务,返回文章情感倾向,根据返回结果进行判断。
34.所述根据返回结果进行判断具体方法为:如果文章情感为负向,调用预警处理函数;如果文字情感为其他,调用新文章处理函数。
35.所述删除文章处理函数逻辑具体为:保存文章基础信息和全文到搜索引擎产品elasticsearch(以下简称es)删除表。
36.所述新文章处理函数逻辑具体为:调用ai情感服务获取文章情感;调用ai标签服务获取文章标签;保存文章基础信息、关联信息和文章声量信息到kafka;保存文章全文信息到es;保存文章声量信息到内存数据结构存储系统redis(以下简称redis)中。
37.所述老文章处理函数逻辑具体为:读取redis中的点击量信息计算增量;保存声量增量信息到kafka。
38.所述预警处理函数逻辑具体为:生成预警信息;从缓存获取文章需要预警的钉钉群信息;对每个需要预警的钉钉群,发送预警消息。
39.本发明的有益效果:本发明通过实时技术架构,大大节省数据处理时间,可以将舆情的预警时间从1小时缩短到5秒内,将数据展现时间从1小时缩短到1分钟。
附图说明
40.为了更清楚地说明本发明实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图示出的结构获得其他的附图。
41.图1是本发明的实时处理技术流程框图;图2是现有技术流程示意图。
具体实施方式
42.应当理解,此处所描述的具体实施例仅用以解释本发明,并不用于限定本发明。
43.下面将结合本发明实施例中的附图,对本发明实施例中的技术方案进行清楚、完
整地描述,显然,所描述的实施例仅是本发明的一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有作出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。
44.现有技术主要问题是针对网络舆情的响应时间比较久,由于是定时任务,从抓取到数据开始,经过ai处理-》etl处理-》预警的一套流程时间,由于数据量的原因, ai打标就需要20-25分钟,整个执行过程超过30分钟,为了保证资源不被重复占用导致资源耗尽,所以调度定在1小时执行一次。
45.之前也研究过使用实时数据处理方案,但是由于舆情数据的特点,目前主流的实时数据处理方案如spark streaming、flink,处理这类数据时很难针对每条数据做不同的判断处理,或者说这些计算框架本身只适合做分布式计算,不适合做这样的业务处理。所以当前汽车舆情数据的处理方案主要以定时批量处理为主。
46.为了提高舆情预警速度,加快数据处理速度,本方案通过结合kafka、redis、etl服务、ai服务、starrocks等组件和模块,实现了文章从抓取进入kafka,5秒内即可预警,60秒内即可入库展现的能力。
47.技术术语表:kafka:由apache软件基金会开发的一个开源流处理平台,由scala和java编写。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
48.etl:英文extract-transform-load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。etl一词较常用在数据仓库,但其对象并不限于数据仓库。
49.json:英文javascript object notation的缩写,json是一种轻量级的数据交换格式。json采用完全独立于语言的文本格式,这些特性使json成为理想的数据交换语言。易于人阅读和编写,同时也易于机器解析和生成。
50.ai:(英语:artificial intelligence,缩写为ai)亦称智械、机器智能,指由人制造出来的机器所表现出来的智能。通常人工智能是指通过普通计算机程序来呈现人类智能的技术。
51.es:elasticsearch是一个基于lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于restful web接口。elasticsearch是用java开发的,并作为apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
52.redis:redis是一个开源的使用ansi c语言编写、支持网络、可基于内存亦可持久化的日志型、key-value数据库,并提供多种语言的api。
53.starrocks:starrocks是一种极速全场景mpp数据库,支持多种数据模型(明细模型、聚合模型、更新模型),多种导入方式(批量和实时),可整合和接入多种现有系统(spark、flink、hive、 elasticsearch)。
54.routine load:routine load是一种同步的数据导入方式。
55.routine load 支持导入的数据类型:文本和json两种格式。
56.spark streaming:spark streaming是个批处理的流式(实时)计算框架。其基本
原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。
57.flink:apache flink是由apache软件基金会开发的开源流处理框架,其核心是用java和scala编写的分布式流数据流引擎。flink以数据并行和流水线方式执行任意流数据程序,flink的流水线运行时系统可以执行批处理和流处理程序。
58.如图1所示,具体流程如下:1.预处理文章:1.1.文章和基础信息通过爬虫进入kafka,1.2.启动与kafka分区相同数量的etl处理程序。每个etl程序配置和当前节点cpu核数相同的进程,并发读取kafka分区,1.3.etl程序实时并发读取kafka的每一条数据,以下都是对每一条数据的处理,2.对数据进行预处理:2.1.加载json数据,2.2.去掉标题中的空格和符号,形成纯净字符,2.3.格式化抓取时间,2.4.处理异常数据,2.5.判断是否是当天文章。
59.3.整体处理逻辑:3.1.调用ai服务中的关键词服务,判断此数据是否保留,3.2.如果不是当前文章,3.2.1.如果是删除文章,调用删除文章处理函数,3.2.2.如果是新文章,调用新文章处理函数,3.2.3.如果是已经入库过的文章,调用老文章处理函数,3.3.如果是当天文章,3.3.1.如果是删除文章,调用删除文章处理函数,3.3.2.如果是已经入库过的文章,调用老文章处理函数,3.4.对于都不符合上面条件的,当天新文章,调用ai服务的情感分析服务,返回文章情感倾向,根据返回结果判断,3.4.1.如果文章情感为负向,调用预警处理函数,3.4.2.如果文字情感为其他,调用新文章处理函数,4.新文章处理函数逻辑:4.1.调用ai情感服务获取文章情感,4.2.调用ai标签服务获取文章标签,4.3.保存文章基础信息、关联信息到kafka,4.4.保存文章全文信息到es,4.5.保存文章声量信息到redis,4.6.保存文章声量信息到kafka,5.老文章处理函数逻辑:5.1.读取redis中的点击量信息计算增量,
5.2.保存声量增量信息到kafka,6.删除文章处理逻辑:6.1.保存文章基础信息和全文到es删除表,7.预警文章处理逻辑:7.1.生成预警信息,7.2.从缓存获取文章需要预警的钉钉群信息,7.3.对每个需要预警的钉钉群,发送预警消息,8.数据加载:8.1.在starrocks中新建和kafka中对应的常驻的导入任务(routine load),定时从kafka中加载数据,加载批次为20-30秒一次。routine load 是一种例行导入方式,dorisdb通过这种方式支持从kafka持续不断的导入数据,并且支持通过sql控制导入任务的暂停、重启、停止。
60.步骤2.4.处理异常数据的规则具体为:剔除无效文本,有些文本可能只是一个视频链接等无意义的格式文本,需要进行剔除;去除重复的文本,由于转发或者作者重复发帖导致文本有重复的现象,为了不重复计算,先做去重处理;删除过长过短文本,文本过短可用信息含量过少,无法准确判断其情感;部分文本内容过长,增加了情绪判断的难度;股吧主要是投资者的评论,大多较为短小,选择保留文本满足一定长度阈值的文本;繁体转简体,有些股评是用繁体字书写,在分词时无法与金融词库进行匹配,因此需要预先进行转简体操作;去除url、拼音等无意义的符号。
61.步骤1.1.文章和基础信息通过爬虫进入kafka中爬虫模块的爬虫与模型架构包含:爬虫服务通过scrapy-redis发布,爬虫服务监听redis,等待请求任务数据,每个任务通过azkaban调度,生成请求 url到redis中,触发爬虫服务执行。爬虫抓取数据后将结果写入到oss,同时将结果发布到kafka(topic=crawl_data);清洗服务订阅kafka (topic=crawl_data),获取爬虫数据,将数据清洗后发布kafka (topic=etl_data);模型服务订阅kafka(topic=etl_data),获取清洗后数据,将数据发布到celery,由celery调度对应的模型处理模块model_worker。model_worker处理结果发布到kafka(topic=model);应用服务订阅kafka (topic=model),将模型结果写入数据库。
62.现有技术在进行舆情监控的过程中,主要是通过定时执行etl程序批量处理某一时间段(一般是t+1)的舆情数据,把处理结果进行统计分析并向客户展现。
63.现如今随着客户对舆情信息处理时效性要求的提高,定时etl处理已经提速到1小时处理一次,但是由于数据量庞大,很难再缩短处理时间间隔。本方法通过采用的实时etl处理技术,使对汽车行业的舆情数据处理、舆情预警,从1小时缩短到了5秒以内,完全满足了客户实时舆情处理与预警的需求。
64.以上显示和描述了本发明的基本原理和主要特征和本发明的优点。本行业的技术人员应该了解,本发明不受上述实施例的限制,上述实施例和说明书中描述的只是说明本发明的原理,在不脱离本发明精神和范围的前提下,本发明还会有各种变化和改进,这些变化和改进都落入要求保护的本发明范围内。本发明要求保护的范围由所附的权利要求书及其等效物界定。

技术特征:
1.一种汽车舆情快速处理方法,其特征在于,包括以下具体步骤:通过爬虫程序爬取网络舆情文章和基础信息,发送到分布式流平台模块;启动与分布式流平台分区相同数量的数据抽取转换加载处理程序,数据抽取转换加载处理程序实时并发读取分布式流平台的每一条数据;通过对数据进行预处理,并对文章进行分类处理;在大规模并行处理数据库产品starrocks中新建和分布式流平台中对应的定时加载模块,定时从分布式流平台中加载数据并进行推送。2.根据权利要求1所述的一种汽车舆情快速处理方法,其特征在于,所述基础信息包括文章标题、文章内容、发布时间、爬取时间、文章连接和文章获取渠道。3.根据权利要求1所述的一种汽车舆情快速处理方法,其特征在于,所述对数据进行预处理具体步骤为:加载json格式数据;去掉文章标题中的空格和符号,形成纯净字符;格式化爬取时间;处理异常数据;判断是否是当天文章。4.根据权利要求1所述的一种汽车舆情快速处理方法,其特征在于,所述对文章进行分类处理具体步骤为:判断是否是当天文章:若不是当天文章,而是以往已删除的文章,调用删除文章处理函数;若不是当天文章,而是以往未爬取到的新文章,调用新文章处理函数;若不是当天文章,而是以往已经入库过的文章,调用老文章处理函数;若是当天文章,并且是已删除的当天文章,调用删除文章处理函数;若是当天文章,并且是已经入库过的当天文章,调用老文章处理函数;若是当天新文章,则调用ai服务的情感分析服务,返回文章情感倾向,根据返回结果进行判断。5.根据权利要求4所述的一种汽车舆情快速处理方法,其特征在于,所述根据返回结果进行判断具体方法为:如果文章情感为负向,调用预警处理函数;如果文字情感为其他,调用新文章处理函数。6.根据权利要求4所述的一种汽车舆情快速处理方法,其特征在于,所述删除文章处理函数逻辑具体为:保存文章基础信息和全文到搜索引擎产品elasticsearch删除表。7.根据权利要求4所述的一种汽车舆情快速处理方法,其特征在于,所述新文章处理函数逻辑具体为:调用ai情感服务获取文章情感;调用ai标签服务获取文章标签;保存文章基础信息、关联信息和文章声量信息到分布式流平台;保存文章全文信息到搜索引擎产品elasticsearch中;保存文章声量信息到内存数据结构存储系统。8.根据权利要求4所述的一种汽车舆情快速处理方法,其特征在于,所述老文章处理函数逻辑具体为:读取内存数据结构存储系统中的点击量信息计算增量;保存声量增量信息到分布式流平台。9.根据权利要求5所述的一种汽车舆情快速处理方法,其特征在于,所述预警处理函数逻辑具体为:生成预警信息;从缓存获取文章需要预警的钉钉群信息;对每个需要预警的钉钉群,发送预警消息。

技术总结
本发明公开了一种汽车舆情快速处理方法,包括以下具体步骤:通过爬虫程序爬取网络舆情文章和基础信息,发送到Kafka模块;启动与Kafka分区相同数量的ETL处理程序,ETL处理程序实时并发读取Kafka的每一条数据;通过对数据进行预处理,并对文章进行分类处理;在StarRocks中新建和Kafka中对应的定时加载模块,定时从Kafka中加载数据并进行推送。本发明通过实时技术架构,大大节省数据处理时间,可以将舆情的预警时间从1小时缩短到5秒内,将数据展现时间从1小时缩短到1分钟。据展现时间从1小时缩短到1分钟。据展现时间从1小时缩短到1分钟。


技术研发人员:王循 杜阿卫
受保护的技术使用者:启明信息技术股份有限公司
技术研发日:2022.07.14
技术公布日:2022/11/1
转载请注明原文地址: https://tieba.8miu.com/read-3068.html

最新回复(0)