通过Flink实现个推海量消息推送数据的实时统计

 

作者:个推数据研发工程师 糖炒栗子

 

个推的 消息推送统计模块原先是一个离线统计消息下发的系统,具体来说,个推每天会根据APP开发者的需求下发 消息推送,我们需要根据不同的维度进行数据统计,生成消息报表,从而能够更好的了解每天的推送情况。个推每天的消息推送下发数量巨大,可以达到数百亿级别,之前一直做的是离线统计,但是这样一来,当天的数据只能第二天才能够看到,随着业务能力的不断延伸,我们需要能够实时进行数据报表的统计,因此,我们选择了Flink作为数据处理引擎。

 

一.消息计算平台架构

 

在消息报表系统的初期,我们采用的是离线计算的方式,主要采用spark作为计算引擎,原始数据存放在HDFS中,聚合数据存放在Solr、Hbase和Mysql中:

 

查询的时候,先根据筛选条件,查询的维度主要有三个:

 

1)appId

2)下发时间

3)taskGroupName

 

根据不同维度可以查询到taskId的列表,然后根据task查询hbase获取相应的结果,获取下发、展示和点击相应的指标数据。在我们考虑将其改造为实时统计时,会存在着一系列的难点:

 

1. 原始数据体量巨大,每天的消息推送数据量达到几百亿规模,需要支持高吞吐量

2. 需要支持实时的查询

3. 需要对多份数据进行关联

4. 需要保证数据的完整性和数据的准确性

 

二. Why Flink

 

1. Flink是什么

首先,来看下什么是Flink,Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要还是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据。Flink 的前身是柏林理工大学一个研究性项目, 在 2014 被 Apache 孵化器所接受,然后迅速地成为了 ASF(Apache Software Foundation)的顶级项目之一。

 

2. 方案对比

为了实现个推消息推送报表的实时统计,我们之前考虑使用spark streaming作为我们的实时计算引擎,但是在考虑了spark streaming、storm和flink的一些差异点后,还是决定使用Flink作为计算引擎:

 

 

可以看出来,针对上面的业务痛点,Flink能够满足需要:

1. Flink以管道推送数据的方式,可以让Flink实现高吞吐量

2. Flink是真正意义上的流式处理,延时更低,能够满足我们消息推送数据报表统计的实时性要求

3. Flink可以依靠强大的窗口功能,实现数据的增量聚合;同时,可以在窗口内进行数据的join操作

4. 我们的消息报表涉及到金额结算,因此对于不允许存在误差,Flink依赖自身的exact once机制,保证了我们数据不会重复消费和漏消费。

 

3. Flink的重要特性

下面我们来具体说说Flink中一些重要的特性,以及实现它的原理。

 

1)低延时、高吞吐:

Flink速度之所以这么快地处理消息推送,主要是在于它的流处理模型,Flink 采用 Dataflow 模型,和 Lambda 模式不同。Dataflow 是纯粹的节点组成的一个图,图中的节点可以执行批计算,也可以是流计算,也可以是机器学习算法,流数据在节点之间流动,被节点上的处理函数实时 apply 处理,节点之间是用 netty 连接起来,两个 netty 之间 keepalive,网络 buffer 是自然反压的关键。经过逻辑优化和物理优化,Dataflow 的逻辑关系和运行时的物理拓扑相差不大。这是纯粹的流式设计,时延和吞吐理论上是最优的。简单来说,当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。

 

2)Checkpoint

Flink是通过分布式快照来实现checkpoint,能够支持Exactly-Once语义。分布式快照是基于Chandy和Lamport在1985年设计的一种算法,用于生成分布式系统当前状态的一致性快照,不会丢失信息且不会记录重复项。Flink使用的是Chandy Lamport算法的一个变种,定期生成正在运行的流拓扑的状态快照,并将这些快照存储到持久存储中(例如,存储到HDFS或内存中文件系统)。检查点的存储频率是可配置的。

 

3)backpressure

back pressure出现的原因是为了应对短期消息推送数据尖峰。旧版本Spark Streaming的back pressure通过限制最大消费速度实现,对于基于Receiver 形式,我们可以通过配置 spark.streaming.receiver.maxRate 参数来限制每个 receiver 每秒最大可以接收的记录的数据;对于 Direct Approach 的数据接收,我们可以通过配置 spark.streaming.kafka.maxRatePerPartition 参数来限制每次作业中每个 Kafka 分区最多读取的记录条数。但这样是非常不方便的,在实际上线前,还需要对集群进行压测,来决定参数的大小。 

Flink运行时的构造部件是operators以及streams。每一个operator消费一个中间/过渡状态的流,对它们进行转换,然后生产一个新的流。描述这种机制最好的类比是:Flink使用有效的分布式阻塞队列来作为有界的缓冲区。如同Java里通用的阻塞队列跟处理线程进行连接一样,一旦队列达到容量上限,一个相对较慢的接受者将拖慢发送者。

 

四.消息报表的实时计算

优化之后,架构升级成如下:

可以看出,我们做了以下几点优化:

1.Flink替换了之前的spark,进行消息报表的实时计算

2.ES替换了之前的Solr

 

对于Flink进行实时计算,我们的关注点主要有以下4个方面:

1.ExactlyOnce保证了数据只会被消费一次

2.状态管理的能力

3.强大的时间窗口

4.流批一体

 

为了实现我们实时消息推送统计报表的需求,主要依靠Flink的增量聚合功能。首先,我们设置了Event Time作为时间窗口的类型,保证了只会计算当天的数据,同时,我们每隔一分钟增量统计当日的消息报表,因此分配1分钟的时间窗口,然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。之后,我们将增量聚合后的数据写入到ES和Hbase中。

 

流程如下所示:

 

 

同时,在查询的时候,我们通过taskID、日期等维度进行查询,先从ES中获取taskID的集合,之后通过taskID查询hbase,得出统计结果。

 

五.总结

通过使用Flink,目前我们实现了消息报表的实时统计,能够实时查看 消息推送下发数、展示、点击等数据指标,同时,借助FLink强大的状态管理功能,消息推送服务的稳定性也得到了一定的保障。未来,我们也会将Flink引入到其他的业务线中,以满足一些实时性要求高的业务场景需求。

 

 

 

  • 在线咨询
  • 技术咨询
  • 业务咨询
  • 电话咨询