1.本发明涉及数据处理的技术领域,尤其涉及数据统一实时采集装置及采集方法。
背景技术:2.随着信息化建设和无纸化办公的大力推进,需要对各种来源的海量数据进行统一的管理和利用。现在面临的最大难题是,累积的数据资产数量大,且以各种形式散落在各种设备或系统中,导致数据资产处理量大,离散数据的标准不统一。
3.传统的方式通过单点的方式去访问上述数据资产,可以局部或临时的利用这些数据,但存在重复的成本投入,投入产出效率低。对离散数据资产的访问,需要通过多种协议/ 格式,如:http(s)、(s)ftp、jdbc、syslog、各种文件格式以及各数据库或厂商私有协议等,每一种协议/格式都有各自的技术栈,给技术和管理提出了更高的要求和难度。
4.数据资产并非静态,各设备或系统还在源源不断的生产新的数据,传统的数据采集方式通常通过定期查询再读取的方式,一般会有几小时至一天的延迟,无法满足上层业务或应用对实时性的要求;高校业务中存在大量日志或流水数据,在处理这些大数据的采集或同步时,传统单机或固定节点集群无法承载这么大的数据量,处理效率低导致硬件资源的无谓消耗。
技术实现要素:5.本发明目的是提供一种数据统一实时采集方法及采集装置,解决离散异构数据的集中采集,规范化采集应用的开发与实施,提升采集应用的可复用性,满足上层业务或应用对实时性的要求;解决海量日志或流水数据的并行采集,充分发挥现有硬件资源的效能,并支持断点续传。
6.一种数据统一实时采集装置,包括:
7.数据格式转换装置:建立用于接收源端数据的数据传输管道,在数据传输管道中定义标准的数据传输管道数据格式,各异构数据格式向数据传输管道数据格式进行标准转换或映射;
8.数据采集交互装置:经过标准转换或映射后的数据传输管道数据建立统一模型结构,进行数据交互;所述数据交互包括:处理器应用注册、编排、部署以及卸载管理;
9.数据写入装置:对源端数据的实时变化检测读取后,将经过数据采集交互装置交互后的数据写入数据传输管道;
10.数据处理装置:写入数据传输管道的数据进行实时流动,后续环节各处理器实时对数据进行处理。
11.优选的是,本发明的数据采集交互装置作为所有采集事务的逻辑划分,由一组处理应用进行有序编排组合而成,数据采集交互装置的模型结构如下:
12.每个处理器应用有且仅有一个采集器;每个处理器应用配置相匹配的转换器,支持多个转换器进行先后串行处理;每个处理器应用至少有一个输出器,支持将当前任务数
据写入多个目标数据存储;每个处理器应用按照先采集、然后转换、再写入的顺序进行编排。
13.优选的是,本发明的采集器、转换器、输出器满足以下条件:有且仅有一个输入端口,有且仅有一个输出端口;采集器的输出端与数据传输管道的输入端连接;转换器的输入、输出端均与数据传输管道连接;输出器的输入端与数据传输管道连接。
14.优选的是,本发明的数据传输管道作为数据流动的中枢,在物理上与各类处理器应用建立连接,采用如下模型结构:
15.1.1数据传输管道由上游入口流向下游出口,所有数据遵循先进先出的队列规则沿同一方向流动;
16.1.2数据传输管道上设置若干个用于连接处理器应用的连接端口,每个连接端口对数据传输管道中数据的访问形式为读取或写入中的一种;
17.1.3数据在数据传输管道内会被物理切分为多个分区,这些数据分区按一定的规则分布于集群存储中;
18.1.4数据传输管道缓冲或存储内部流动的数据,同时也负责记录和存储各处理器应用对每条数据的处理位置。
19.优选的是,本发明标准的数据格式包括统一的数据类型和记录结构;
20.所述数据类型包括但不限于:字符串、整数、小数、布尔、二进制、对象、结构、数组;
21.所述数据记录结构包括:记录头、记录键、记录值,记录键、记录值分别包含字段结构和字段值。
22.基于本发明的数据统一实时采集装置的采集方法,包括如下步骤:
23.采集器启动,通过预先准备的访问账号与数据源建立连接;首次启动时,采集器会读取源库中各表的当前数据,做一次全量的数据读取,并将读取到的数据写入数据传输管道,并记录源端处理的当前位置;
24.采集器进入增量采集阶段,根据上一次记录的处理位置,按顺序读取,直到数据的最新位置;
25.采集器进入监听数据库变化阶段,由于采集器是基于订阅机制,源端的任何变化都会实时通知到采集器应用;
26.采集器将获得的原始数据,经由内部转换为数据传输管道标准的数据格式,并通过采集器的输出端口写入管道;采集器在将数据写入数据传输管道时,根据当前所需采集的规模,为当前采集任务设定管道内数据分区数量的初始值,作为后续自动负荷均衡的参考;数据写入数据传输管道时,将由数据传输管道负责记录采集器当前处理数据的状态和位置;
27.采集器保持监听状态并持续处理新数据,直到人工退出或遇错退出;
28.采集器退出重启时,由于上述过程中实时记录了采集器最后处理数据的位置,采集器将从最后的处理位置继续工作。
29.优选的是,本发明的采集方法,将数据写入数据传输管道时,根据处理应用写入指定的初始分区数,对每条数据进行分区计算,并将分区与集群节点进行关联,过程如下:
30.获取当前数据记录的记录键的字段值,对字段值进行散列计算,获得一个数字散列值;
31.根据上述散列值对分区数进行取模计算,获得该数据记录所应存入的分区号;
32.数据传输管道根据上述分区计算结果,将每条数据写入对应的集群分区;
33.数据传输管道记录处理器应用的当前状态和位置,以便在处理器应用退出重启后,从最新的位置继续处理;
34.重复上述过程。
35.优选的是,本发明的采集方法,采集器将获得的原始数据,经由内部转换为数据传输管道标准的数据格式,并通过采集器的输出端口写入管道;具体过程如下:
36.对数据传输管道中的数据进行实时变换,包括对数据结构和数据值的变换;
37.处理程序启动,连接数据传输管道,并根据设定的订阅条件获得数据流,数据传输管道中数据流动时将实时进入转换器进行处理;
38.转换器读取到数据传输管道中的数据流时,对数据流中的每一条数据进行处理;当前转换器设定为对msg字段进行特征格式识别,并将识别结果写会msg字段;数据记录内msg字段原始值;特征识别采用字符串正则匹配;更新msg字段,替换原始值;完成一次转换过程;
39.转换器将处理完的数据记录,以新数据流的形式写回到管道,以便后续处理程序继续处理。
40.优选的是,本发明的采集方法,还包括将数据传输管道中的数据实时输出到外部目标系统,具体过程如下:
41.输出器处理程序启动,连接数据传输管道,并根据设定的订阅条件获得数据流,数据传输管道中数据流动时将实时进入输出器进行处理;
42.输出器读取到数据传输管道中的数据流时,对数据流中的每一条数据进行处理;读取数据记录的记录键,并将记录键中的各字段作为目标表的主键列;读取数据记录的记录值,并将记录值中的各字段作为目标表的数据列;以上目标表的主键列、数据列在数据库的列类型,由数据记录中字段结构的类型进行映射转换;根据数据记录的记录键值,在目标表进行查询检索,如果存在则生成update的dml,否则生成insert的dml;
43.输出器建立与目标关系型数据库的连接,并在目标数据库中执行上述步骤产生的dml,将数据流中的数据在目标数据库中进行持久存储;
44.重复上述步骤。
45.优选的是,本发明的采集方法,还包括针对任务实例中的处理器应用实例数和应用内任务数进行动态调整,过程如下:
46.处理器应用实例数的自动调整过程,该过程依靠底层云原生容器化运行环境的能力:
47.数据传输管道实时检测数据处理任务内各处理器应用的当前处理位置,识别需要伸缩的应用实例;
48.当某个处理器应用的当前处理位置值低于其他处理器应用,且在当前系统资源足够的情况下,管道请求容器运行时,启动新的处理器应用实例;
49.当某个处理器应用的当前处理位置值高于其他处理器应用,当前系统资源使用率处于持续高位,数据传输管道请求容器运行的情况下,对具备多实例的处理器应用进行停止,直到保留一个处理器应用实例,处理器应用退出时自动退出所在数据订阅逻辑分组,以
便让数据传输管道重新均衡各处理器应用实例的负荷。
50.本发明采用上述技术方案,与现有技术相比具有如下优点:
51.采用规范化的集中采集以及可复用的采集应用,极大的提升了开发与实施的效率;结合源端的实时采集能力,对新数据的同步更新延迟由原来的小时级别以上缩短至秒级别;在同等硬件资源的情况下,海量数据的吞吐率可提升10倍以上。
附图说明
52.图1是本发明采集装置的结构示意图。
53.图2是本发明采集方法的流程示意图。
具体实施方式
54.如图1所示,一种数据统一实时采集装置,包括:
55.数据格式转换装置:建立用于接收源端数据的数据传输管道,在数据传输管道中定义标准的数据传输管道数据格式,各异构数据格式向数据传输管道数据格式进行标准转换或映射;
56.数据采集交互装置:经过标准转换或映射后的数据传输管道数据建立统一模型结构,进行数据交互;所述数据交互包括:处理器应用注册、编排、部署以及卸载管理;
57.数据写入装置:对源端数据的实时变化检测读取后,将经过数据采集交互装置交互后的数据写入数据传输管道;
58.数据处理装置:写入数据传输管道的数据进行实时流动,后续环节各处理器实时对数据进行处理。
59.优选的是,本发明的数据采集交互装置作为所有采集事务的逻辑划分,由一组处理器应用进行有序编排组合而成,数据采集交互装置的模型结构如下:
60.每个处理器应用有且仅有一个采集器;每个处理器应用配置相匹配的转换器,支持多个转换器进行先后串行处理;每个处理器应用至少有一个输出器,支持将当前任务数据写入多个目标数据存储;每个处理器应用按照先采集、然后转换、再写入的顺序进行编排。
61.优选的是,本发明的采集器、转换器、输出器满足以下条件:有且仅有一个输入端口,有且仅有一个输出端口;采集器的输出端与数据传输管道的输入端连接;转换器的输入、输出端均与数据传输管道连接;输出器的输入端与数据传输管道连接。
62.优选的是,本发明的数据传输管道作为数据流动的中枢,在物理上与各类处理器应用建立连接,采用如下模型结构:
63.1.1数据传输管道由上游入口流向下游出口,所有数据遵循先进先出的队列规则,沿同一方向流动;
64.1.2数据传输管道上设置若干个用于连接处理器应用的连接端口,每个连接端口对数据传输管道中数据的访问形式为读取或写入中的一种;
65.1.3数据在数据传输管道内会被物理切分为多个分区,这些数据分区按一定的规则分布于集群存储中;
66.1.4数据传输管道缓冲或存储内部流动的数据,同时也负责记录和存储各处理器
应用对每条数据的处理位置。
67.优选的是,本发明标准的数据格式包括统一的数据类型和记录结构;
68.所述数据类型包括但不限于:字符串、整数、小数、布尔、二进制、对象、结构、数组;
69.所述数据记录结构包括:记录头、记录键、记录值,记录键、记录值分别包含字段结构和字段值。
70.基于本发明的数据统一实时采集装置的采集方法,包括如下步骤:
71.采集器启动,通过预先准备的访问账号与数据源建立连接;首次启动时,采集器会读取源库中各表的当前数据,做一次全量的数据读取,并将读取到的数据写入数据传输管道,并记录源端处理的当前位置;具体的:通过预先准备的账号以slave的身份与mysql 服务器建立连接;首次启动时,采集器会读取源库中各表的当前数据,做一次全量的数据读取,并将读取到的数据写入数据传输管道,并记录binlog的当前位置;
72.采集器进入增量采集阶段,根据上一次记录的处理位置,按顺序读取,直到数据的最新位置;具体的:根据上一次记录的binlog位置,顺序读取binlog,直到binlog最新位置;
73.采集器进入监听数据库变化阶段,由于采集器是基于订阅机制,源端的任何变化都会实时通知到采集器应用;具体的:采集器是slave节点身份,源端mysql主库的任何变化都会实时通知到采集器应用;
74.采集器将获得的原始数据,经由内部转换为数据传输管道标准的数据格式,并通过采集器的输出端口写入管道;采集器在将数据写入数据传输管道时,根据当前所需采集的规模,为当前采集任务设定管道内数据分区数量的初始值,作为后续自动负荷均衡的参考;数据写入数据传输管道时,将由数据传输管道负责记录采集器当前处理数据的状态和位置;
75.采集器保持监听状态并持续处理新数据,直到人工退出或遇错退出;
76.采集器退出重启时,由于上述过程中实时记录了采集器最后处理数据的位置,采集器将从最后的处理位置继续工作。
77.优选的是,本发明数据写入数据传输管道时,根据处理应用写入指定的初始分区数,对每条数据进行分区计算,并将分区与集群节点进行关联,过程如下:
78.获取当前数据记录结构的键,对键进行散列计算,获得一个数字散列值;
79.根据上述散列值对分区数进行取模计算,获得该数据记录所应存入的分区号;
80.数据传输管道根据上述分区计算结果,将每条数据写入对应的集群分区;
81.数据传输管道记录处理应用的当前状态和位置,以便在处理应用退出重启后,从最新的位置继续处理;
82.重复上述过程。
83.本发明实现了统一的数据传输管道,在数据传输管道中定义标准的数据格式,各异构数据格式向管道数据格式进行标准转换或映射;
84.本发明在数据传输管道两端分别挂接上游采集器和下游输出器,在管道中间挂接数据转换器,这些采集器、转换器、输出器(统称为处理器应用)采用一致的模型结构,可进行统一开发和扩展,实现与数据传输管道之间进行数据交互;
85.本发明实现了数据传输管道中各数据流实例的生命周期管理,包括数据流实例的处理器应用注册、编排、部署以及卸载管理;
86.本发明实现基于变化监测抓取策略的实时读取器,将源端数据进行实时变化检测读取并写入数据传输管道,数据进入数据传输管道后进行实时流动,后续环节各处理器可实时对数据进行处理;
87.本发明实现了处理器应用基于容器运行时,依托底层云原生架构设施,可按所需处理数据量自动调节应用的运行实例数;
88.实现数据传输管道中流动数据的分布式、多分区分片存储,每个分区可独立并行处理,数据传输管道负责记录各处理器当前处理数据的位置,以便在处理器中断重启后从上次记录的位置继续处理后续数据。
89.1.应用、管道及任务模型
90.处理器应用作为基本的数据处理单元,完成一个采集任务所需处理应用分三种类型,分别是采集器、转换器、输出器,这些应用都遵循统一的模型结构:
91.(1)每个处理器应用有且仅有一个输入端口;
92.(2)每个处理器应用有且仅有一个输出端口;
93.(3)每个处理器应用独立自主,不依赖任何其他应用;
94.(4)每个处理器应用至少有一个端口与数据传输管道进行连接,采集器的输出端与管道连接,转换器的输入、输出端都与管道连接,输出器的输入端与数据传输管道连接;
95.数据传输管道作为数据流动的中枢,在物理上与各类处理器应用建立连接,采用如下模型结构:
96.(1)数据传输管道内数据由上游入口流向下游出口,所有数据遵循先进先出的队列规则,沿同一方向进行流动;
97.(2)数据传输管道可以有任意个用于连接处理器应用的连接端口,各连接端口对管道中数据的访问形式分为两种:读取和写入,每个连接端口仅支持一种形式的操作;
98.(3)数据在数据传输管道内会被物理切分为多个细流,称之为分区,这些数据分区可按一定的规则分布于集群存储中;
99.(4)数据传输管道除了负责缓冲/存储管道内流动的数据,同时也负责记录和存储各连接应用对每条数据的处理位置;
100.(5)数据传输管道内数据采用统一格式,数据以记录为单位,一条记录由记录头、记录键以及记录值组成,记录键和记录值又分别包含字段结构和字段值;
101.统一数据格式包括统一数据类型和记录结构,描述如下:
102.(1)数据类型:
[0103][0104]
(2)数据记录结构:
[0105]
数据记录结构如下表所示:
[0106][0107]
其中,记录键和记录值中的字段结构样例如下:
[0108][0109][0110]
记录键和记录值中的字段值样例如下:
[0111][0112]
采集任务作为所有采集事务的一个逻辑划分,由一组处理器应用进行有序编排组合而成,模型结构如下:
[0113]
(1)每个采集任务有且仅有一个采集器;
[0114]
(2)每个采集任务可选配置所需的转换器,支持多个转换器进行先后串行处理;
[0115]
(3)每个采集任务至少有一个输出器,支持将当前任务数据写入多个目标数据存储;
[0116]
(4)每个采集任务固定按照先采集、然后转换、再写入的顺序进行编排;
[0117]
上述模型完整构成了整个数据采集的结构,该结构能够规范化采集应用的开发与实施,提升采集应用的可复用性。装置的拥有者可基于该模型,经历多个项目的实施,积累不同场景下的应用实例和采集任务实施模式,在后续的项目实施过程中,采用相同的应用实例,套用类似的采集任务模板,可有效降低开发和实施的成本。
[0118]
2.装置工作过程
[0119]
采集器可根据不同的数据源进行分别实现,接下来将以mysql数据采集为实例,描述装置各部件的工作过程:
[0120]
2.1采集器工作过程
[0121]
mysql binlog是用来记录数据库表结构变更以及表数据修改的二进制日志,它只会记录表的变更操作,这些变更操作包含了表结构的变化和数据的变化,常用于mysql主从(master/slave)复制的实现。基于mysql的这个特性实现mysql采集器(以下简称采集器),其工作过程描述如下:
[0122]
2.1.1采集器应用启动,通过预先准备的账号以slave的身份与mysql服务器建立连接;
[0123]
2.1.2首次启动时,采集器会读取源库中各表的当前数据,做一次全量的数据读取,并将读取到的数据写入数据传输管道,并记录binlog的当前位置;
[0124]
2.1.3接着,采集器进入增量采集阶段,根据上一次记录的binlog位置,顺序读取 binlog,直到binlog最新位置;
[0125]
2.1.4接着,采集器进入监听数据库变化阶段,由于采集器是slave节点身份,源端 mysql主库的任何变化都会实时通知到采集器应用;
[0126]
2.1.5采集器在步骤2.1.2、2.1.3、2.1.4中获得的原始数据,经由内部转换为数据传输管道统一数据格式,并通过采集器的输出端口写入数据传输管道。采集器在将数据写入数据传输管道时,可根据当前所需采集的规模,为当前采集任务设定数据传输管道内数据分区数量的初始值,作为后续自动负荷均衡的参考;数据写入数据传输管道时,将由数据
传输管道负责记录采集器当前处理数据的状态和位置;
[0127]
2.1.6至此,采集器保持步骤2.1.4中的监听状态并按步骤2.1.5持续处理新数据,直到人工退出或遇错退出;
[0128]
2.1.7采集器退出重启时,由于上述过程中实时记录了采集器最后处理数据的位置,采集器将从最后的处理位置按步骤2.1.3、2.1.4、2.1.5继续工作。
[0129]
2.2数据传输管道工作过程
[0130]
由方案附图及模型描述,数据传输管道与各处理器应用之间只有固定的读取和写入两种,因此数据传输管道内其实不区分具体的处理应用类型(即读取器、转换器、输出器),任何应用与数据传输管道进行数据交换仅需区分处理读/写操作。
[0131]
采集器和转换器都具备写入操作能力,其过程描述如下:
[0132]
(1).数据传输管道根据处理器应用写入数据时指定的初始分区数,对每条数据进行分区计算,并将分区与集群节点进行关联,过程如下:
[0133]
(1.1)获取当前数据记录结构的键,对键进行散列(hash)计算,获得一个数字散列值,例如:hash(记录n.键)=1234;
[0134]
(1.2)根据上述散列值对分区数进行取模计算,获得该数据记录所应存入的分区号,例如:hash值=1234,分区数=3,则分区号=1234%3=1。
[0135]
(2).数据传输管道上述根据分区结算结果,将每条数据(如:记录n)写入对应的集群分区(如:分区1)。
[0136]
(3).数据传输管道记录处理应用的当前状态和位置,以便在处理应用退出重启后,从最新的位置继续处理。
[0137]
(4).重复步骤(1)-(3)。
[0138]
转换器和输出器都具备读取操作能力,其过程描述如下:
[0139]
(1).转换器和输出器处理程序启动时,通过设定订阅条件告知数据传输管道需要读取哪些数据;
[0140]
(2).数据传输管道为上述应用进行逻辑分组,并在该逻辑分组下,根据数据传输管道中数据分区数目,为转换器和输出器实例和任务进行分区数据的均衡分配;
[0141]
分组内数据传输管道数据每次分区均衡过程,计算公式为:每个任务处理的分区数量=数据的总分区数/处理器应用实例数/每个应用实例内并行任务数;
[0142]
比如:管道中某个数据总共有18个分区,输出器运行实例3个,每个运行实例开启3个任务并行处理,则每个任务分配到的数据分区数为:18/3/3=2(个)。
[0143]
(3).转换器和输出器应用实例数和任务数可在运行期进行调整,调整后,管道会按以上计算公式进行数据分区的重新均衡分配,以达到计算负载均衡的目的;
[0144]
(4).管道向转换器和输出器输出数据的同时,会以逻辑分组为单位,记录数据读取的状态和位置,以便在处理程序退出重启后,从最新的位置继续处理;
[0145]
(5).重复步骤(1)-(4)。
[0146]
2.3.转换器工作过程
[0147]
转换器的作用是对数据传输管道中的数据进行实时变换,包括对数据结构和数据值的变换。下面以数据值的变换为例描述其工作过程:
[0148]
2.3.1转换器处理程序启动,连接管道,并根据设定的订阅条件获得数据流,数据
传输管道中数据流动时将实时进入转换器进行处理;
[0149]
2.3.2转换器读取到数据传输管道中的数据流时,对数据流中的每一条数据进行处理,举例说明如下:
[0150]
2.3.2.1当前转换器设定为对msg字段进行特征格式识别,并将识别结果写入msg字段;
[0151]
2.3.2.2数据记录内msg字段原始值:client(dc85.decd.65cf)notify:attachtoap(tm2q_2d_7f_ap5528-b);
[0152]
2.3.2.3特征识别采用字符串正则匹配,如:^client\s*\((?《clientmac》\s+)\).*ap\s*\((?《ap》\s+)\).*$;
[0153]
识别结果为:clientmac=dc85.decd.65cf,ap=tm2q_2d_7f_ap5528-b,此时更新msg字段,替换原始值;
[0154]
2.3.2.4完成一次转换过程;
[0155]
2.3.3转换器将处理完的数据记录,以新数据流的形式写回到管道,以便后续处理程序继续处理;
[0156]
2.3.4重复步骤2.3.2-2.3.3。
[0157]
2.4.输出器工作过程
[0158]
输出器的作用是将数据传输管道中的数据实时输出到外部目标系统,如:关系型数据库。下面以jdbc(javadatabaseconnectivity)输出器为例描述其工作过程:
[0159]
2.4.1输出器处理程序启动,连接管道,并根据设定的订阅条件获得数据流,数据传输管道中数据流动时将实时进入输出器进行处理;
[0160]
2.4.2输出器读取到数据传输管道中的数据流时,对数据流中的每一条数据进行处理,举例说明如下:
[0161]
2.4.2.1读取数据记录的记录键,并将记录键中的各字段作为目标表的主键列;
[0162]
2.4.2.2读取数据记录的记录值,并将记录值中的各字段作为目标表的数据列;
[0163]
2.4.2.3以上目标表的主键列、数据列在数据库的列类型,由数据记录中字段结构的类型进行映射转换,如:数据记录的string类型字段,转换为目标表的varchar类型字段;
[0164]
2.4.2.4根据数据记录的记录键字段值,在目标表进行查询检索,如果存在则生成update的dml(sql语句的一种,用来操作数据表中的数据),否则生成insert的dml;
[0165]
2.4.3输出器建立与目标关系型数据库的连接,并在目标数据库中执行步骤2产生的dmlsql,将数据流中的数据在目标数据库中进行持久存储;
[0166]
2.4.4.重复步骤2.4.2-2.4.3。
[0167]
由以上过程可以看出,采集器工作在实时变化监测抓取模式,数据传输管道上对数据流的实时读取和写入,转换器实时读取数据流,对数据进行转换后实时写回管道,输出器根据设定的订阅条件获得数据流,并将结果输出到目标数据湖中,整个采集任务的各环节都各自工作在实时处理模式,确保了整个采集过程的实时性要求。
[0168]
2.5.自动伸缩过程
[0169]
以上由数据传输管道工作过程可知,数据传输管道在整个数据流过程中实时记录了各处理应用的当前处理位置(以下以offset表示),该位置用一个增量值来表示,如:offset=100,offset数值越大表示已处理的数据越多,反之,offset数值越小则表示已处
理的数据越少,同时,在一个任务实例范围内,各处理器应用的实例参与整个数据的处理过程,通过比较每个应用当前的offset值,可得知哪个处理环节(应用)超前,哪个处理环节(应用)落后。一般情况下,为了提高整个数据流处理任务的处理效率,可以通过对落后环节(应用)的算力进行扩容(伸展);特殊情况下,为了控制整个数据流处理任务的能耗(满足业务实时最低要求),可以通过对超前环节(应用)的算力进行限制(收缩)。
[0170]
本装置的自动伸缩过程包含两层控制,即针对任务实例中的处理器应用实例数和应用内任务数进行动态调整来完成:
[0171]
2.5.1处理应用实例数的自动调整过程,该过程依靠底层云原生容器化运行环境的能力:
[0172]
2.5.1.1数据传输管道实时检测数据处理任务内各处理器应用当前offset值,识别需要伸缩的应用实例;
[0173]
2.5.1.2当某个处理器应用的offset值低于其他处理器应用,并且在当前系统资源足够的情况下,数据传输管道请求容器运行时,启动新应用实例,特殊的对于转换器和输出器应用在订阅数据流时加入之前的逻辑分组,以便让数据传输管道重新均衡每个处理器应用实例的负荷;
[0174]
2.5.1.3当某个处理器应用的offset值高于其他处理器应用,且当前系统资源使用率处于持续高位,数据传输管道请求容器运行时,对具备多实例的应用进行停止,直到至少保留一个应用实例,应用退出时自动退出所在数据订阅逻辑分组,以便让数据传输管道重新均衡各处理应用实例的负荷;
[0175]
2.5.2应用内任务数的自动调整过程,该过程依靠处理应用自身技术架构特性(支持运行期任务数调整):
[0176]
2.5.2.1数据传输管道实时检测数据处理任务内各应用当前offset值,识别需要伸缩的应用实例;
[0177]
2.5.2.2当某个处理器应用的offset值低于其他处理器应用,当前系统资源足够,且当前应用的资源请求尚未达到最大值的情况下,数据传输管道调度该处理器应用启动新任务实例,特殊的对于转换器和输出器应用任务在订阅数据流时加入之前的逻辑分组,以便让数据传输管道重新均衡每个处理器应用任务实例的负荷;
[0178]
2.5.2.3当某个处理器应用的offset值高于其他处理器应用,且当前系统资源使用率处于持续高位的情况下,数据传输管道调度该处理器应用进行变更,对具备多任务实例的处理器应用进行停止任务运行,直到至少保留一个处理器应用任务实例,处理器应用实例停止时自动退出所在数据订阅逻辑分组,以便让数据传输管道重新均衡各处理器应用任务实例的负荷。
[0179]
注:以上两层伸缩控制策略基本一致,都是根据当前处理器应用的offset值作为调整的依据之一,区别是:处理器应用实例数方式是通过容器运行时控制应用实例数,而处理器应用内任务数方式是通过数据传输管道调度应用实例控制处理器应用内并行运行的任务数,不管何种方式,目的都是对当前系统资源、所需处理的数据量以及实时要求之间做一个合理的平衡。
技术特征:1.一种数据统一实时采集装置,其特征在于包括:数据格式转换装置:建立用于接收源端数据的数据传输管道,在数据传输管道中定义标准的数据传输管道数据格式,各异构数据格式向数据传输管道数据格式进行标准转换或映射;数据采集交互装置:经过标准转换或映射后的数据传输管道数据建立统一模型结构,进行数据交互;所述数据交互包括:处理器应用注册、编排、部署以及卸载管理;数据写入装置:对源端数据的实时变化检测读取后,将经过数据采集交互装置交互后的数据写入数据传输管道;数据处理装置:写入数据传输管道的数据进行实时流动,各处理器应用实时对数据进行处理。2.根据权利要求1所述的数据统一实时采集装置,其特征在于上述数据采集交互装置作为所有采集事务的逻辑划分,由一组处理器应用进行有序编排组合而成,数据采集交互装置的模型结构如下:每个处理器应用有且仅有一个采集器;每个处理器应用配置相匹配的转换器,支持多个转换器进行先后串行处理;每个处理器应用至少有一个输出器,支持将当前任务数据写入多个目标数据存储;每个处理器应用按照先采集、然后转换、再写入的顺序进行编排。3.根据权利要求2所述的数据统一实时采集装置,其特征在于上述采集器、转换器、输出器满足以下条件:有且仅有一个输入端口;有且仅有一个输出端口;采集器的输出端与数据传输管道的输入端连接;转换器的输入、输出端均与数据传输管道连接;输出器的输入端与数据传输管道连接。4.根据权利要求1所述的数据统一实时采集装置,其特征在于上述数据传输管道作为数据流动的中枢,在物理上与各类处理器应用建立连接,采用如下模型结构:1.1数据传输管道由上游入口流向下游出口,所有数据遵循先进先出的队列规则,沿同一方向流动;1.2数据传输管道上设置若干个用于连接处理器应用的连接端口,每个连接端口对数据传输管道中数据的访问形式为读取或写入中的一种;1.3数据在数据传输管道内会被物理切分为多个分区,这些数据分区按一定的规则分布于集群存储中;1.4数据传输管道缓冲或存储内部流动的数据,同时也负责记录和存储各处理器应用对每条数据的处理位置。5.根据权利要求1所述的数据统一实时采集装置,其特征在于上述标准的数据格式包括统一的数据类型和记录结构;所述数据类型包括但不限于:字符串、整数、小数、布尔、二进制、对象、结构、数组;所述数据记录结构包括:记录头、记录键、记录值,记录键、记录值分别包含字段结构和字段值。6.基于权利要求1-5任一项所述的数据统一实时采集装置的采集方法,其特征在于包括如下步骤:采集器启动,通过预先准备的访问账号与数据源建立连接;首次启动时,采集器会读取源库中各表的当前数据,做一次全量的数据读取,并将读取到的数据写入数据传输管道,并
记录源端处理的当前位置;采集器进入增量采集阶段,根据上一次记录的处理位置,按顺序读取,直到数据的最新位置;采集器进入监听数据库变化阶段,由于采集器是基于订阅机制,源端的任何变化都会实时通知到采集器应用;采集器将获得的原始数据,经由内部转换为数据传输管道标准的数据格式,并通过采集器的输出端口写入管道;采集器在将数据写入数据传输管道时,根据当前所需采集的规模,为当前采集任务设定管道内数据分区数量的初始值,作为后续自动负荷均衡的参考;数据写入数据传输管道时,将由数据传输管道负责记录采集器当前处理数据的状态和位置;采集器保持监听状态并持续处理新数据,直到人工退出或遇错退出;采集器退出重启时,由于上述过程中实时记录了采集器最后处理数据的位置,采集器将从最后的处理位置继续工作。7.根据权利要求6所述的采集方法,其特征在于上述数据写入数据传输管道时,根据处理应用写入指定的初始分区数,对每条数据进行分区计算,并将分区与集群节点进行关联,过程如下:获取当前数据记录的记录键的字段值,对字段值进行散列计算,获得一个数字散列值;根据上述散列值对分区数进行取模计算,获得该数据记录所应存入的分区号;数据传输管道根据上述分区计算结果,将每条数据写入对应的集群分区;数据传输管道记录处理器应用的当前状态和位置,以便在处理器应用退出重启后,从最新的位置继续处理;重复上述过程。8.根据权利要求6所述的采集方法,其特征在于采集器将获得的原始数据,经由内部转换为数据传输管道标准的数据格式,并通过采集器的输出端口写入管道;具体过程如下:对数据传输管道中的数据进行实时变换,包括对数据结构和数据值的变换;处理程序启动,连接数据传输管道,并根据设定的订阅条件获得数据流,数据传输管道中数据流动时将实时进入转换器进行处理;转换器读取到数据传输管道中的数据流时,对数据流中的每一条数据进行处理;当前转换器设定为对msg字段进行特征格式识别,并将识别结果写会msg字段;数据记录内msg字段原始值;特征识别采用字符串正则匹配;更新msg字段,替换原始值;完成一次转换过程;转换器将处理完的数据记录,以新数据流的形式写回到管道,以便后续处理程序继续处理。9.根据权利要求6所述的采集方法,其特征在于还包括将数据传输管道中的数据实时输出到外部目标系统,具体过程如下:输出器处理程序启动,连接数据传输管道,并根据设定的订阅条件获得数据流,数据传输管道中数据流动时将实时进入输出器进行处理;输出器读取到数据传输管道中的数据流时,对数据流中的每一条数据进行处理;读取数据记录的记录键,并将记录键中的各字段作为目标表的主键列;读取数据记录的记录值,并将记录值中的各字段作为目标表的数据列;以上目标表的主键列、数据列在数据库的列类型,由数据记录中字段结构的类型进行映射转换;根据数据记录的记录键值,在目标表进
行查询检索,如果存在则生成update的dml,否则生成insert的dml;输出器建立与目标关系型数据库的连接,并在目标数据库中执行上述步骤产生的dml,将数据流中的数据在目标数据库中进行持久存储;重复上述步骤。10.根据权利要求6所述的采集方法,其特征在于还包括针对任务实例中的处理器应用实例数和应用内任务数进行动态调整,过程如下:处理器应用实例数的自动调整过程,该过程依靠底层云原生容器化运行环境的能力:数据传输管道实时检测数据处理任务内各处理器应用的当前处理位置,识别需要伸缩的应用实例;当某个处理器应用的当前处理位置值低于其他处理器应用,且在当前系统资源足够的情况下,管道请求容器运行时,启动新的处理器应用实例;当某个处理器应用的当前处理位置值高于其他处理器应用,当前系统资源使用率处于持续高位,数据传输管道请求容器运行的情况下,对具备多实例的处理器应用进行停止,直到保留一个处理器应用实例,处理器应用退出时自动退出所在数据订阅逻辑分组,以便让数据传输管道重新均衡各处理器应用实例的负荷。
技术总结一种数据统一实时采集装置及采集方法,建立数据传输管道,在数据传输管道中定义标准的数据格式,各异构数据格式向数据传输管道数据格式进行标准转换或映射;数据传输管道采集的数据建立统一模型结构,进行数据交互,完成数据采集;所述数据交互包括:处理器应用注册、编排、部署以及卸载管理;对源端数据的实时变化检测读取后,将数据采集交互装置建立的数据交互后写入数据传输管道;写入数据传输管道的数据进行实时流动,后续环节各处理器实时对数据进行处理。解决离散异构数据的集中采集,提升采集应用的可复用性,满足上层业务或应用对实时性的要求;解决海量日志或流水数据的并行采集,充分发挥现有硬件资源的效能,并支持断点续传。续传。续传。
技术研发人员:孙震坚 季春东 张大庆 王珂
受保护的技术使用者:南京迪塔维数据技术有限公司
技术研发日:2022.05.20
技术公布日:2022/11/1