基于流计算的大规模数据处理方法及装置与流程

专利2024-12-06  50



1.本发明属于大数据技术领域,具体提出一种基于流计算的大规模数据处理方法及装置。


背景技术:

2.在处理互联网规模的数据及其类似业务的时候,为了提高数据的实时处理能力,提高数据处理效率、通常采用流计算的方式对处理进行处理。数据或者日志采集的过程中,需要一个消息队列作为数据的中转、分发组件,并实现数据的暂存功能。
3.kafka即是应对大数据处理的分布式消息队列,拥有低延迟、高吞吐、高可靠的特点。配合消息队列,需要使用分布式流计算框架对数据进行处理。现有技术在使用kafka作为消息队列时,通常方式是将单条数据通过json序列化之后发送给broker,计算框架就能很容易的处理这种数据格式。但这种处理方式有以下几个问题:
4.1.接入kafka的协议相对复杂,需要集成客户端类库、应用对应版本的api,对多种编程语言实现的数据写入程序不友好。
5.2.json格式实现简单,但缺乏丰富的类型支持,同时序列化和解序列化的效率不高。
6.3.单条数据写入消息的时候,消息体积一般都很小,影响队列的吞吐量。


技术实现要素:

7.为了解决上述问题,本发明公开一种基于流计算的大规模数据处理方法及装置,由数据提供方连接httpserver并发送数据,httpserver处理之后将数据写入kafka消息队列,flink处理引擎订阅消息并执行计算任务。
8.本发明的技术内容包括:
9.一种基于流计算的大规模数据处理方法,其步骤包括:
10.建立数据提供方与数据处理方之间的http服务,所述http服务将http协议消息体中的数据转换为二进制数据包;
11.将二进制数据包发送到kafka消息队列;
12.在flink计算框架的构造消费者程序的流程中创建一个反序列化器,以构建新flink计算框架;
13.基于所述新flink计算框架,对所述kafka消息队列进行数据处理,以得到数据处理结果。
14.进一步地,所述http协议消息体中的数据格式包括:csv类格式和/或json类格式。
15.进一步地,所述http服务将http协议消息体中的数据转换为二进制数据包,包括:
16.在数据处理之前,将所述数据的所有基本类型字段的描述信息预加载至schemaregistry组件;
17.数据处理时,所述http服务从schemaregistry组件根据消息队列topic获得
avroschema之后,按照字段顺序和字段的类型,对http协议消息体中的数据进行解析,得到每个字段的值;
18.按照avro的序列化方式,将所述每个基本字段的值转换为二进制数据,以得到所述二进制数据包。
19.进一步地,,所述基本类型包括:boolean类型、int类型、long类型、float类型、double类型、string类型和null类型;
20.进一步地,所述按照字段顺序和字段的类型,对http协议消息体中的数据进行解析,得到每个基本字段的值,包括:
21.针对boolean类型的基本字段,通过该基本字段为false或true,获取所述基本字段的值;
22.针对int类型或long类型的基本字段,通过遵循变长的、zig-zag的编码模式,获取所述基本字段的值;
23.针对float类型的基本字段,通过java的floattointbytes转换为4字节的编码,并以小端字节序输出的方式,获取所述基本字段的值;
24.针对double类型的基本字段,通过java的doubletolongbytes转换为8个字节的编码,并以小端字节序输出的方式,获取所述基本字段的值;
25.针对string类型的基本字段,通过long类型的编码描述字节数后,跟随utf-8编码的字符数据,以获取所述基本字段的值;
26.针对null类型的基本字段,不输出所述基本字段的值。
27.进一步地,所述将二进制数据包发送到kafka消息队列,包括:
28.采用多行数据写入一个kafka消息的方式,将二进制数据包发送到kafka消息队列。
29.进一步地,所述反序列化器的类型属性包括:deserializationschema。
30.进一步地,所述基于所述新flink计算框架,对所述kafka消息队列进行数据处理,以得到数据处理结果,包括:
31.基于所述kafka消息队列,构造消费者程序;
32.所述消费者程序从schemaregistry中获取消息队列中消息对应的schema;
33.基于所述schema,对所述kafka消息队列中的消息逐个字段解析数据;
34.将解析后的数据作为数据处理结果。
35.一种存储介质,所述存储介质中存储有计算机程序,其中,所述计算机程序被设置为处理器运行时执行上述任一所述方法。
36.一种电子装置,其特征在于,包括存储器和处理器,所述存储器中存储有计算机程序,所述处理器被设置为运行所述计算机程序以执行上述任一所述方法。
37.本发明提出的方法具有以下优点及效果:
38.1.降低流数据处理系统的数据接入难度。
39.2.avro有丰富的类型支持,而且是二进制协议,序列化后的消息占用空间更小,节省带宽。
40.3.多行数据合并发送的方式能够提高整个数据处理系统的吞吐量。
附图说明
41.图1本发明的整体结构图。
42.图2flink处理消息队列的流程图。
具体实施方式
43.下面将结合附图,对本发明实施方式中的技术方案进行清楚、完整地描述,显然,所描述的实施方式仅仅是本发明特定实施方式,而不是全部的实施方式。基于本发明中的实施方式,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施方式,都属于本发明保护的范围。
44.本发明的整体结构如图1所示,由数据提供方连接httpserver并发送数据,httpserver处理之后将数据写入kafka消息队列,flink处理引擎订阅消息并执行计算任务,包括以下步骤:
45.步骤一:使用http协议建立服务,基于http协议的消息体获取数据包。
46.http是常用的网络协议,常用编程语言都有对应的实现。
47.一示例中,httpserver在接收到的csv、json等数据格式的数据之后,对该数据进行解析并进行avro序列化,形成一个一个二进制数据包。
48.通常来说,csv、json类的数据由多个基本类型字段组成,在解析过程中,字段的基本信息是由avroschema的方式描述的,消息队列topic和avroschema由数据提供方定义,在数据加载前预先写入schemaregistry,由schemaregistry统一管理。httpserver程序从schemaregistry组件根据消息队列topic获得avroschema之后,按照字段顺序和字段的类型,对收到的csv或json数据内容进行解析,得到每个字段的值之后,按照avro的序列化方式将值转换为二进制数据。一般来说,csv或json类型的数据,包含的字段都是基本类型的字段,支持boolean类型、int类型、long类型、float类型、double类型、string类型和null类型等,boolean类型的二进制模式为一个字节的0或者1,0代表false,1代表true;int和long类型都遵循变长的、zig-zag的编码模式;float类型由java的floattointbytes转换为4字节的编码,以小端字节序输出;double类型由java的doubletolongbytes转换为8个字节的编码,同样以小端字节序输出;string类型由一个long类型的编码描述字节数,后跟随utf-8编码的字符数据;null类型不输出字节。由多个基本类型的字段组成一条数据,在avro的类型系统中称为record(记录)。多行数据合并序列化时,选择复合类型中array类型,先写入一个long类型的二进制数据,代表数组的元素个数,后续再写入元素。
49.步骤二:将数据包封装为消息,发送到kafka消息队列。
50.一示例中,在数据包发送到kafka消息队列时,采用多行数据写入一个消息的方式。合并多行数据之后kafka消息队列,消息体变大。kafka节点的网络流量大小与消息体大小正相关。相关测试结果如表1所示:
51.消息体大小每秒消息数速率100byte821,55778.3mb/s1k532,183519.7mb/s10k109,9221073.4mb/s
52.表1
53.进一步地,配合数据压缩功能,合并数据还可以能够明显提高节点的资源利用率。
54.步骤三:修改flink处理数据的流程,适配消息封装方式。
55.flink是当下流行的计算框架,在提供数据处理接口、计算资源管理等功能的同时,支持有状态的流计算,通过检查点、保存点保证容错性。
56.在flink处理数据的过程中,需要经过source(数据源)、算子、sink等各个部分。source部分包含kafka节点的消费者程序。修改flink处理数据的流程是在构造消费者程序的流程中创建一个反序列化器(deserializer)来对消息进行解析,反序列化器有一个类型为deserializationschema的属性,deserializationschema定义了如何解析kafka消息体中的二进制数据,通过这个接口,可以实现一个avro批量解析数据的功能。
57.修改flink处理数据之后,如图2所示,消费者程序从schemaregistry中获取消息队列中消息对应的schema,开始解析流程。基于之前数据的序列化方式,按照获取到的schema,逐个字段解析数据。如果一行记录解析完成之后,数据包还有剩余字节,那么解码器会继续工作,直到整个数据包读完。解析后的数据以迭代器的方式返回,则从source获取的时候即为单行数据。
58.以上实施例仅用以说明本发明的技术方案而非对其进行限制,本领域的普通技术人员可以对本发明的技术方案进行修改或者等同替换,而不脱离本发明的精神和范围,本发明的保护范围应以权利要求书所述为准。

技术特征:
1.一种基于流计算的大规模数据处理方法,其步骤包括:建立数据提供方与数据处理方之间的http服务,所述http服务将http协议消息体中的数据转换为二进制数据包;将二进制数据包发送到kafka消息队列;在flink计算框架的构造消费者程序的流程中创建一个反序列化器,以构建新flink计算框架;基于所述新flink计算框架,对所述kafka消息队列进行数据处理,以得到数据处理结果。2.如权利要求1所述的方法,其特征在于,所述http协议消息体中的数据格式包括:csv类格式和/或json类格式。3.如权利要求1所述的方法,其特征在于,所述http服务将http协议消息体中的数据转换为二进制数据包,包括:在数据处理之前,将所述数据的所有基本类型字段的描述信息预加载至schemaregistry组件;数据处理时,所述http服务从schemaregistry组件根据消息队列topic获得avroschema之后,按照字段顺序和字段的类型,对http协议消息体中的数据进行解析,得到每个字段的值;按照avro的序列化方式,将所述每个基本字段的值转换为二进制数据,以得到所述二进制数据包。4.如权利要求3所述的方法,其特征在于,所述基本类型包括:boolean类型、int类型、long类型、float类型、double类型、string类型和null类型。5.如权利要求4所述的方法,其特征在于,所述按照字段顺序和字段的类型,对http协议消息体中的数据进行解析,得到每个基本字段的值,包括:针对boolean类型的基本字段,通过该基本字段为false或true,获取所述基本字段的值;针对int类型或long类型的基本字段,通过遵循变长的、zig-zag的编码模式,获取所述基本字段的值;针对float类型的基本字段,通过java的floattointbytes转换为4字节的编码,并以小端字节序输出的方式,获取所述基本字段的值;针对double类型的基本字段,通过java的doubletolongbytes转换为8个字节的编码,并以小端字节序输出的方式,获取所述基本字段的值;针对string类型的基本字段,通过long类型的编码描述字节数后,跟随utf-8编码的字符数据,以获取所述基本字段的值;针对null类型的基本字段,不输出所述基本字段的值。6.如权利要求1所述的方法,其特征在于,针对包含多种基本类型的复合类型中数组,所述按照avro的序列化方式进行转换,还包括:在数据内容前加入一个long类型的数值,用以描述记录的行数。7.如权利要求1所述的方法,其特征在于,所述将二进制数据包发送到kafka消息队列,包括:
采用多行数据写入一个kafka消息的方式,将二进制数据包发送到kafka消息队列。8.如权利要求1所述的方法,其特征在于,所述反序列化器的类型属性包括:deserializationschema。9.如权利要求1所述的方法,其特征在于,所述基于所述新flink计算框架,对所述kafka消息队列进行数据处理,以得到数据处理结果,包括:基于所述kafka消息队列,构造消费者程序;所述消费者程序从schemaregistry中获取消息队列中消息对应的schema;基于所述schema,对所述kafka消息队列中的消息逐个字段解析数据;将解析后的数据作为数据处理结果。10.一种电子装置,其特征在于,包括存储器和处理器,所述存储器中存储有计算机程序,所述处理器被设置为运行所述计算机程序以执行如权利要求1-9中任一所述方法。

技术总结
本发明提供了一种基于流计算的大规模数据处理方法及装置,所述方法包括:建立数据提供方与数据处理方之间的HTTP服务,所述HTTP服务将HTTP协议消息体中的数据转换为二进制数据包;将二进制数据包发送到Kafka消息队列;在Flink计算框架的构造消费者程序的流程中创建一个反序列化器,以构建新Flink计算框架;基于所述新Flink计算框架,对所述Kafka消息队列进行数据处理,以得到数据处理结果。本发明降低流数据处理系统的数据接入难度,提高整个数据处理系统的吞吐量。处理系统的吞吐量。处理系统的吞吐量。


技术研发人员:窦禹 任彦 郑礼雄 薛晨 易立 王一宇 杨昕雨 张博文 李晓雪
受保护的技术使用者:国家计算机网络与信息安全管理中心
技术研发日:2022.06.17
技术公布日:2022/11/1
转载请注明原文地址: https://tieba.8miu.com/read-10736.html

最新回复(0)