如何接入第三方MQ(二)原创
金蝶云社区-sharkv
sharkv
14人赞赏了该文章 2,939次浏览 未经作者许可,禁止转载编辑于2022年07月19日 23:08:13
summary-icon摘要由AI智能服务提供

本文接续前篇,探讨了在引入第三方MQ连接信息后遇到的实际问题,如序列化与反序列化异常、上下文缺失等。针对序列化问题,提出了自定义消费者模块和引入相关SDK两种方案,但后者因应用解耦考虑被否定。对于上下文问题,介绍了通过OpenApi调用苍穹API和为消费者服务构造上下文两种解决方案,并详细阐述了通过AppStarter启动消费者线程及构造RequestContext的方法。最后,提出了改进消费者模块的可扩展性和维护性的方法,如使用配置文件和统一处理类。文章强调了在集成云消息时优先使用现有组件,并鼓励技术人员保持探索精神。

前言:

上篇文章主要介绍了如何通过一系列配置引入第三方MQ的连接信息,并通过平台的MQ服务进行消息发送及消费的过程。但是真实研究一下还是会发现一些“麻烦”的。本文将续接上篇文章的实现,指出其中的问题,并探讨和展示相关的解决方案。

上一篇链接:https://club.kdcloud.com/article/223109264221295360


1.序列化与反序列化问题:

1.1问题产生:

在上篇文章中,可以发现我们的测试场景是比较简单的,生产者和消费者其实都是苍穹自己,并没有涉及到异构系统的MQ消息集成。带着这个伏笔,我们来看下,如果通过rabbitMQ管理界面发布一条消息,结果并没有进到我们消费者类中的断点,进一步发现是消息反序列化出现了异常,结果如下图:



 

1.png

2.png

1.2问题分析

跟踪以上代码可发现平台封装的消费者统一处理类会先将body转为kd.bos.mq.support.Message,其中用到了这个方法

kd.bos.mq.support.HessianMessageSerde.decode(byte[]),这是用于对象序列化与反序列化的,这就意味着body必须得经过相关的encode,才能正常的按照decode来反序列化,否则会解析失败。


随着这个思路,我们可以定位到平台的生产者消息发送代码块,果然是用了同样的HessianMessageSerde进行序列化的。

3.png


1.3解决方案

1.3.1.自定义消费者模块

我们可以尝试脱离平台,自己写一个通用的消费者,进行订阅,这样我们可以正常的拿到body里面的内容进行处理,如图所示:

 

4.png

5.png

1.3.2.引入相关sdk(×)

既然平台的消费者处理类是通过HessianMessageSerded将body统一处理为kd.bos.mq.support.Message,那我们是不是可以在生产者发送消息的时候就将body包装成kd.bos.mq.support.Message,再利用HessianMessageSerded来序列化成byte数组呢?只需要把相关的jar包提供给生产者方就好了,理论上是可以的。

但是,我们思考一下MQ的其中一个作用——应用解耦,就是说生产者不用关心消费者何时消费,如何消费,只管往队列里面丢消息就行了。

l  1. 如果现在生产者又引入了消费者相关的sdk那就产生生产者相关的代码改造,是侵入式的做法。

l  2. 另外,一个队列可能有多个消费者进行消费,如果多个消费者都有自己的反序列化规则,那生产者该用遵循哪个序列化协议?

基于以上考虑,这种方案PASS

2.上下文问题

2.1问题产生:

前面我们讲到通过自定义消费者模块的方式,可以获取到队列中的消息进行消费,但我们具体的业务逻辑又会涉及到什么呢——苍穹实体的增删改查

         但由于我们的自定义消费者模块脱离了苍穹的上下文运行环境,因此我们是无法执行ORM等相关服务的调用的。如下图,由于一些基本的框架服务没有启动如日志框架,导致调用失败。

6.png


2.2解决方案

2.2.1.OpenApi

在自定义消费者模块,我们可以调用苍穹开放出来的API进行实体的增删改查。需要思考几点:

1.接口安全

我们知道,调用苍穹的OpenApi需要先做接口授权拿到accessToken,而accessToken的获取则需要tenantId、accountId、appid、appSecret、user、userType等信息,这些信息从哪里获取呢?

1).直接写死在启动类中

2).如果第三方应用已经和苍穹做了登录集成,那么可以在生产者消息发送时,将会话信息携带在body中

2.性能问题

         显然,通过http接口调用的方式比直接在苍穹上下文环境中做增删改查慢一些,不过消息集成本身就是异步的,一般也不强调消费者的时效性,单个消息的处理时长不要太长就行,剩下的问题可以通过水平扩展消费者来提升处理效率。最重要的还是通过消息集成的方式,我们解耦了生产者和消费者两端,生产者完全不用依赖于消费者的处理进度做前端的展示,这是最重要的。

 

openApi学习参考链接:

https://club.kdcloud.com/article/218694224386822400

https://club.kdcloud.com/article/218021385161819904      

2.3.2为消费者服务构造上下文

通过AppStarter启动自定义消费者线程

我们可以不可以在苍穹框架服务初始化完成之后,再启动自定义消费者模块呢?通过社区我们可以搜到关于appstarter的帖子https://club.kdcloud.com/article/115402171927003392,根据介绍:这是在应用启动时,完成应用的特定初始化过程的一种机制。

接下来,我们依葫芦画瓢,根据配置文档进行开发就可以了。示例代码如下:

7.png


你以为就完事了?当然没这么简单,还是报错:


8.png

 

根据报错堆栈,我们再调试跟踪下,发现确实ORM的底层依赖了一些上下文参数RequestContext如图中所示的数据中心id,当然这还只是看得见的,还有一些隐藏的参数犹未可知。因此我们需要为当前消费者线程构造上下文。


9.png

如何构造RequestContext?

我们可以从平台的生产者代码出发:kd.bos.mq.rabbit. RabbitPublisher,经过调试,可以看到其中有一段关键代码,在将body包装为Message的过程种,会同时为Message创建RequestContext上下文(包括但不限于租户、数据中心、用户、组织、会话、语言环境等信息)


10.png

         我们可以用相似的方式构造上下文,现在代码变成这样:


11.png

执行结果:查询成功!


12.png

更实用的版本

可以看到,我们上面写的消费者代码是写死的队列名,以及我们指定队列名对应的消费者类均是代码里面写死的,不利于日后扩展及维护。因此,我们可以参考、复用平台已有机制进行搭建。

 

1.增加了MyUsageConfig:用于读取mq的队列信息,包括队列名称、消费者类;仿照平台的UsageConfig类写的。这里我们改写了原有的配置文件参数名,转而读取custom.mqConfigFiles.config,配置规则与原本的mq配置文件一样。

为什么不用平台的参数呢?

与平台共用会导致消费者冲突,根据平台的机制同样会把我们的消费者注册上去,但是由于前面提到的序列化问题又无法正常消费,所以不用。

2.遍历配置文件:为每个队列初始化消费者,并设置启动。

3.增加消费者统一处理类:上面的代码中我们是直接用了内部类作为消费者,不利于扩展,现在我们是要根据配置文件来注册消费者,那么上下文的构造、消息体的解析这些工作不希望重复执行,因此我们单独创建一个统一处理类来完成这些工作。

 image.png

 

附件(拖到最下面有附件

Java类:

 image.png


 

队列信息配置文件:


image.png

 

Debugserver配置:

         System.setProperty("custom.mqConfigFiles.config","erkai.consummqconfig.xml");


总结

通过以上的一些分析和探讨,希望可以有效的帮助大家了解苍穹消息集成中需要注意的一些问题点和关键设计;其实上面的一些示例代码不过是demo级别,真正的生产级别的还需要大力改造和优化。通过代码调试、阅读源码,就可以发现其实集成云的消息集成也是类似这种机制去完成的,因此建议各位同学,在没有特殊情况下,优先使用我们已有的组件、产品,不要重复造轮子,因为你造的不一定比得过平台。


另外想带给大家的可能不仅仅是消息集成本身,有较大的篇幅其实是在记录一些分析过程和推导思路,作为技术人员,我们应该要有探索精神,无论成功与否都先去尝试,每一次的探索都可以沉淀,每一次的思考,都可能反哺到你的下一次的goodidea!


赞 14