沧州热线
沧州热线 > 商讯 > 正文

RocketMQ 在平安银行的实践应用

导读: 

随着互联网金融业务和相关技术的不断发展,传统金融行业为满足业务快速发展需求,正在积极引入各类开源技术,以快速抢占市场。那么,以金融和科技作为双驱动的平安银行在开源技术的引入方面是如何评估,运用到哪些业务场景,以及面对复杂的网络环境,是如何去部署的呢?

随着互联网金融业务和相关技术的不断发展,传统金融行业为满足业务快速发展需求,正在积极引入各类开源技术,以快速抢占市场。那么,以金融和科技作为双驱动的平安银行在开源技术的引入方面是如何评估,运用到哪些业务场景,以及面对复杂的网络环境,是如何去部署的呢?

本文将以 Apache RocketMQ 为例,和您一起了解平安银行在开源技术选型方面的思考和实践。

RocketMQ 在平安银行的应用场景;

复杂网络环境下的部署实践;

多隔离区场景下的部署情况;

多 IDC 场景下的部署情况;

改造实践和遇到的小插曲;

RocketMQ 在平安银行的应用场景

目前,平安银行通过 RocketMQ 解决了数据预加、信息通知和状态变化方面的业务需求,接下来,我们通过 App 登录、资产总览和工资理财 3 个应用场景来展开讲下。

App 登录:

当用户打开平安银行 App 的时候,系统会根据用户的登录 ID 去加载相应的用户数据,比如银行卡、信用卡和理财产品等,以及一些系统通知。这个场景下,我们用到了 RocketMQ 的异步处理能力,即预加载需要使用的数据,提升用户体验。

资产总览:

进入平安银行 App 资产总览的页面,页面显示账户余额、各类理财产品(黄金、基金和股票等),以及贷款等方面的信息。平安银行对接了各类基金、黄金和股票等来自不同金融主体、不同系统的数据,具有种类多、异构系统多和变化快的特点。我们的目标就是让资产总览数据尽可能准确,不同种类的资产变动的时候发出通知,资产系统收到通知后,根据变化的数据来计算出用户当前的资产总览。

工资理财:

工资理财是指每月工资下发到银行卡后,系统可以实现自动买入用户设置好的理财产品,例如买一些定投类的理财产品。这里信息的传递流程是:

银行卡里的余额出现变动,系统把变动信息发送到消息引擎

Consumer 端进行消费,通知用户账户已经出现变化;

系统判断变动是否来源于代发工资;

如果是,系统会再发送一条消息;

理财的 Consumer 进行消费;

判断现在是不是应该买入工资理财类的产品;

如果是,自动买入用户设定好的理财产品;

自动买入之后,余额再次变动,系统会再一次通知,这一次通知,判断的就是一些其他的逻辑了。

那么,在这些场景中,我们对消息引擎会有哪些要求呢?

A、高可用、高可靠和高性能,这是金融行业引入开源技术的基本要求;

B、堆积能力,代发工资的用户很多,一个公司的员工会在某个时间点集中发放;

C、顺序能力,即账户变动在先,发出通知在后;

D、事务性能力,如果没有事务性,有可能会出现账户变动了,但没有触发购买理财产品的情况;

E、重试和延迟消息功能,比如工资发放的时候,可能是在晚上,这时候系统会自动把购买理财的动作放在第二天白天去操作,再发出通知;

F、消息回溯能力,就是出现问题的时候,我们可以把这个消息进行回溯,重新进行消息的处理,提供完整的消息轨迹;

在技术选型的过程中,RocketMQ 符合我们在这些典型使用场景中对消息产品的需求,在引入的过程中,平安银行也对社区做了相应的贡献。

复杂网络环境下的部署实践

多测试子环境下的服务调用场景


平安银行有多套测试子环境,用来对不同的feature进行测试,即图中的 FAT、FAT001?FAT002?FAT003等。传统银行系统由大型机时代向更面向互联网用户的零售时代转型过程中,不可避免微服务化,传统较集中式系统会被划分为较多的微服务,正常的情况如下图,服务 A 调用服务 B,服务 B 调用服务 C,服务 C 调用服务 D。

随着业务的需求,新的 feature,我们需要对服务 A 和 B 进行改动。相比在FAT环境里去部署测试,更合适的方式是另起一套 FAT 环境,这里我们命名为 FAT001,把服务A和B部署上去,A 调用 B,B会调用原来 FAT 环境里的 C 和 D。

此时,另一个新的需求,需要对服务 A 和 C 进行改动。如果直接发布到FAT 或 FAT001 肯定是不对的,会影响正在进行的测试,此时,我们会再起一套测试环境,命名为FAT002,发布服务 A 和 C。由于 FAT002 里没有服务 B,所以服务A要调用服务 B 就需要去 FAT 环境(FAT 定义为较稳定的测试子环境)。服务 B 调用服务 C 的时候,就不应该调用本环境的 C了,而是调动 FAT002 的 C 才能实现测试功能。

再假设,系统同时还会有另外一个 feature 在测试 C 和 D,此时的调用逻辑也是一样的,系统调用服务 A 和 B 的时候是在 FAT,调用服务 C 和 D 的时候会到 FAT003 的环境。

以上的服务调用场景是可以通过微服务框架解决的,进行全链路测试,但在生产环境中,用户的真实请求比测试环境中会更复杂一些。

真实的用户请求过程

我们看一下真实的用户请求。

APP发起一个请求请求,进入网关,需要知道请求哪一个测试环境。通常的做法是:测试人员需要在APP上选好子环境,例如选择 FAT001,我们可以直接把请求 FAT001 的网关(每个环境网关单独部署),或者在requestheader上添加标识,让网关去区分它来源于哪一个环境(网关统一部署)。假如网关判断请求来源于 FAT002,那就会把分发给 FAT002环境进行处理。

消息层面,如何处理这类用户请求

以上是服务调用的请求路径,比较好理解,但到了消息队列上,问题会变得复杂些,假如一个 feature 只是更改了消费者,那如何把这条消息传递到改的消费者应用上去进行测试,而不被其它环境的消费者消费掉,这是我们需要去考虑的问题。

来看下具体的情况,集群部署了 Broke A 和 Broke B,TopicA 分别部署在这两个Broker上。 此时,Producer Group A 向 Topic A 中写数据,Consumer Group A去消费,这种情况下是不会有任何问题。

但如果新增一套 FAT001 的环境,要求 FAT001 发布的消息,只能由 FAT001 来消费,FAT 不能消费,这种情况下我们该怎么解决?

在消息上面加一些路由、或是加一些Tag、Filter、消息的Property?

这些都不能解决我们的问题。??

每个子环境部署一套 RocketMQ?

一方面成本太高,另一方面如果这个feture测试完成了,需要把相关的

应用再切回 FAT 进行处理,实现太过复杂。??

我们想一下,多个 feature 同时进行测试,DB 会部署一套还是多套?

首先一个 feature 不会更改所在的应用,一般来说 DB 是部署一套的,在数据库里面添加字段,来识别数据是来源于哪一个子环境,如果多套部署,不更改的应用取不到新部署的 DB 数据,无法进行全链路测试,所以同样的,我们也没有在每个子环境都部署一套 RocketMQ,而是部署统一部署,通过 RPC 路由把请求路由到正确的生产者集,改造消息路由算法把消息路由到正确的消费者进行处理。

真实的用户请求过程

生产者变更

在上图中生产者变更的场景下,默认的场景 FAT发送,FAT 消费 ,没有问题的,假设 FAT001 的生产者发布了,需要 FAT001 发送到MQ集群,FAT 是可以直接消费。

生产者、消费者同时变更

在上图生产者和消费者同时变更的场景下,如果消费者在 FAT001也部署了应用,需要FAT消费者不能消费由FAT001产生的消息,而是由 FAT001的消费者去消费。我们的处理方式是在逻辑上把Topic A下面的Queue进行分组,相当于加了逻辑分组,如果消费者在 FAT001 有部署,我们会把 Queue 的分组扩大,在其默认设置的情况下再翻一倍,新增的 Queue 就是给到 FAT001 进行消费的。

只有消费者变更

再来看看只有消费者变更的场景,如上图。

假设有个feature只需要更改消费者,部署在 FAT001。也是可以通过逻辑分组的方式,实现生产者根据请求来源来发送消息到队列 FAT001 逻辑分组内的 Queue,从而只让部署在 FAT001 的消费者消费。

通过以上 3 个场景,我们了解到添加逻辑分组的必要性,实现过程并不复杂。主要做了以下调整:?

这个逻辑分组什么时候建立?

新建 Topic 的时候,全部建好?还是 Consumer 上线/下线的时候动态的去进行调整?

我们选择了动态创建的方式,这个过程中,我们添加了 Meta Server 进行元数据管理,进行动态创建:

添加 Meta Service,管理的元数据包括 Producer、Consumer、Topic、Queue 和 Subenv等信息:

调整 Producer,取Request Head 里面请求来源(FAT、FAT001、FAT002...),如果 Topic 对应的存在分组,选择分组的 Queue,否则发到默认分组呢的Queue;

调整 Consumer,上线时判断应用部署的分组(FAT、FAT001、FAT002...),如果Topic不存在对应的分组,则新建;存在,则 rebalalce (新Consumer节点上线),下线时,判断该分组是否还存在 其它Consumer实例,若不存在,删除分组,否则 rebalalce(Consumer某一节点下线);

多隔离区场景下的部署实践

由于对安全上的重视,金融行业的网络环境相比其他行业会更复杂。整个隔离区分为以下几个部分:

DMZ 区:

外网可以直接访问,用于放置网关;

Web 区:

面向的是用户手机,或者网页上可以看到的功能应用;

核心区:

包含核心的调用逻辑功能,和交易、订单相关的核心应用,例如 DB 和存储;

外联区:

用于访问外网,有时候也会部署一些 Poxy 代理,因为内网不能直接访问外网,需要通过代理去访问外网;

专用区域:

对接基金、三方存管等外部系统。在金融行业,如果某个系统是闭环的,那必须要去做隔离;

管理区:

是指对整个区域的应用要进行集中管理,且数据流动是单向的,只能取数据,不能通过管理区把某区域的数据推送到另一区域。

此外,从安全的角度出发,所有的区域在生产环境下都通过防火墙进行隔离,这就给我们部署 RocketMQ 带来了很大的实施难度。如果只部署一套,面对这么多的防火墙,生产者消费者到集群的的流量跨墙,会给网络带来极大的不稳定,遇到瓶颈,防火墙几乎无法在线平滑扩容;如果每个子环境都部署一套,又带来运维复杂度,而且还是存在数据同步和跨墙消费的问题。

最终,我们采用的是折中的办法,即统一部署加分隔离区部署,这样做的益处是:

防火墙是开大策略,保证安全的前提下,符合监管要求;

针对跨隔离区消费的问题,我们采用复制的方式,模拟消费者重新写入目标集群;

多IDC场景下的部署实践

同城多IDC,可以认为近似局域网,比较好处理,但异地多IDC多活场景,目前我们还没有特别好的解方案,多活不可避免面临数据分片、数据合并和数据冲突的解决等问题。

如果 Topic 下数据有多活需求,我们暂时通过复制方式来处理。但这类手工模拟消费者消费数据写入新集群的复制方式,会存在一些问题,即复制到另一个集群之后 offset 会改变,处理逻辑就会有偏差。我们通过 pull 的方式自定义的去根据 offset 去进行消费。当故障转移到新的集群需要去消费的时候,需要获取到新集群里面正确的offset 值。此时,这个值和之前集群里的已经不一样了,需要根据时间得到新集群里正确的offset 值,再去进行消费。在没有更好的解决方案前,治理就显得很关键了。

不过,我们注意到,在 RocketMQ 最新发布的版本里,提供了 DLedger 的特性,DLedger 定位的是一个工业级的 Java Library,可以友好地嵌入各类 Java 系统中,满足其高可用、高可靠、强一致的需求。我们会尽快对这一特性进行集成和测试。


改造实践和遇到的小插曲

我们在对 RocketMQ 的使用过程中,添加了以下功能或特性:

A. 为生产者提供消息的堆积能力。

B. 将所有配置管理部署到配置中心,并做云端化处理,以满足动态修改的需求。

C. 在 4.3 版本还未提供事务处理能力前,我们在本地数据库里去建一张消息表,数据库更改数据状态的时候,会同步将数据写入消息表。若发送失败,则进行补偿。并在客户端里进行封装。

D. 实现统一的消息者幂等处理。

E. 添加身份认证和消息认证(注:RocketMQ 4.3 版本中已经实现身份认证功能)

当然,也遇到了一些小插曲,基本都是使用上的小问题,可能大家也会碰到:

A. 一个应用使用多个RocketMQ集群时,未加载到正确的配置。在Client 端,如果没有对 instancename 进行配置,一个应用连多个集群会失败。

B. 在大数据的场景下,内存溢出。订阅的 Topic 越多,每个 Queue 在本地缓存的 message 也会越多,默认每个 Queue 1000条,可能会把内存打爆,可根据实际情况调整。

C. 删除数据时 IO 抖动,默认每天凌晨4点删除数据,量上来后出现 IO 抖动,配置了消息删除策略,默认逗号分隔开,多配几个时间删除就可以了。

D. Broker上日志报延迟消息找不到数据文件。在主备切换的演练过程中,出现了延迟消息在 Broker 上处理异常的情况。当主切换到备端时,延迟消息在 Broker 上保存的文件被自动删除,再切回主,由于延时消息的元数据感觉在,会查询文件进行处理,此时已找不到文件。

E. 挂 NAS 的时候,IP 获取了 NAS 的私网地址,并被提交给了服务端。

以上就是我们在部署过程中遇到的一些小插曲,基本都是可以很快定位原因,解决的。

总的来看,RocketMQ 对平安银行的消息系统建设的帮助是非常大的,尤其是满足了数据持久化、顺序消费和回溯的需求,此外,在消息的查询方面,也比我们之前使用的消息引擎好很多。最后再分享一点自己对中间件的一点感悟:中间件使用重在治理,规范不先行,开发两行泪。

推荐阅读:华为手机快捷键