方罗交接
亚马逊交接
广州同步店铺框架搭建
QPS-Starter 集成文档
刊登规则引擎技术方案
MQ异常补偿方案
负责项目总目录
广州侧同步商品平台
本文档使用 MrDoc 发布
-
+
首页
MQ异常补偿方案
## 一.前言 此次补偿消息方案的场景是Java发送消息到RabbitMQ是成功的,但是PHP端**未消费**到来自RabbitMQ消息场景。因为为了解决该问题,Java端在发送消息时会记录消息生命周期,这样以便于做重新推送补偿机制。同时Java端会开设新接口,用来响应PHP端接收到的消息id。这样就可以把PHP端接收到的消息生命周期状态置为已接收。形成闭环 项目git: ## 二.数据字典 ### ES(DBA推荐) mdc_message_roll 索引别名 ```es "mappings": { "properties": { "messageId": { "type": "long" }, "devPartnerId": { "type": "keyword" }, "appId": { "type": "keyword" }, "platformId": { "type": "keyword" }, "shopId": { "type": "long" }, "topic": { "type": "keyword" }, "msgType": { "type": "keyword" }, "actionType": { "type": "keyword" }, "indexName": { "type": "keyword" }, "mqType": { "type": "keyword" }, "messageData": { "type": "text", "analyzer": "ik_max_word" }, "state":{ "type": "integer" }, "resultDesc":{ "type": "text", "analyzer": "ik_max_word" }, "updateTime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" } } } ``` mdc_message_callback_roll 索引别名 ```es "mappings": { "properties": { "messageId": { "type": "long" }, "retry": { "type": "integer" }, "req_data": { "type": "text", "analyzer": "ik_max_word" }, "update_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } ``` ## 三.流程图  ### 1. PushMessage视角 在push-message项目里面,把携带有**rabbit-kandeng标志**但是又没有**compensateFlag标识**的消息存放到ES。最后推给receive-message服务 **rabbit-kandeng标志**: 现在只存储刊登业务消息到ES **compensateFlag标识**: 如果携带这个标识就代表是补偿服务重推消息,就不再记录进ES,以免形成死环 ### 2. PHP视角 PHP侧收到来自rabbit-mq消息以后就可以请求异常补偿服务的回执状态接口 code : 200 代表请求正常 code : 9999 代表消息不存在,现阶段针对这个错误码是忽略即可 code : other 具体原因具体分析 ### 3. 补偿服务视角 在异常补偿服务(**compensate-service**)项目里面,新开设一个接收PHP消息状态回写接口。当接收到请求时(不管是单个或批量)先把请求体落到ES(失败的话就放入Redis做定时重试), 确保我们后续在处理状态时有之前的请求数据体来做重试兜底或做请求数据比对。最后根据请求体内容更新对应消息即可。 ## 四.实施方案细节 ### 1. 实施方案图  ### 2. 实施细节 1)项目创建时就会根据配置文件配置的平台信息来创建==对应的周期性任务==做消息重推 2)收到PHP回执的消息,进行ES搜索是否消息存在, 如果存在就存放进内存Queue里做好==定时批量==更新文档回执状态。如果收到PHP回执的消息在ES里面搜索出来的平台id==在之前配置文件里面不存在,会重新发送事件进行新周期性任务创建== 3) 每个平台自身的周期性任务会默认按照每2分钟进行一次数据检查, 如果有发现需要重推的消息就会携带上**compensateFlag标识**进行重新推送到push-message 4) 针对于接口请求对象也会有一个周期性任务来从redis里面获取到重新推给执行业务类, 避免用户请求丢失 ### 3. PHP调用接口 ```java /** * 接收PHP回调请求对象vo * * @Author: Thread * @Date: 2022/8/31 10:38 */ @Data @ApiModel(value = "PHP回调请求对象", description = "PHP回调请求对象") public class ReceiveDataReqVo implements Serializable { /** * messageId */ @NotNull(message = "请填写消息Id") @ApiModelProperty(value = "消息Id", required = true) private Long messageId; /** * 对应Topic */ @NotBlank(message = "请填写消息topic") @ApiModelProperty(value = "消息topic", required = true) private String topic; /** * 重试 1、非重试;2、重试 */ @ApiModelProperty(value = "重试 false 非重试; true 重试") private boolean retry; } /* * 接收PHP回调消息 * * @param receiveDataReqVo 请求对象 * @return 结果 */ @ApiOperation("接收PHP回调消息") @PostMapping("/receive") public RestResultVo<?> receive(@Validated @RequestBody ReceiveDataReqVo receiveDataReqVo) { return receiveCallbackService.receive(receiveDataReqVo); } ``` ### 4. 平台创建重试推送任务 ```java /** * 类<code>RsaKeyConfig</code>说明:启动开发人员放入内存 * * @Author fangluo * @Email fangluo@mabangerp.com * @Since 2022/6/27 */ @Component @RequiredArgsConstructor public class InitScheduleTask implements ApplicationRunner { /** * 事件发布器 */ private final ApplicationEventPublisher applicationEventPublisher; /** * 自定义配置 */ private final CustomProperties customProperties; @Override public void run(ApplicationArguments args) { // 循环构造周期任务 if (CollectionUtils.isNotEmpty(customProperties.getPlatformIds().values())) { // 循环发送事件创建周期性任务 customProperties.getPlatformIds().values().forEach(platFormId -> applicationEventPublisher.publishEvent(new CreateTimerTaskEvent(platFormId))); } } } @Slf4j @RequiredArgsConstructor @Component public class CompensateMessageCoreListener implements ApplicationListener<CreateTimerTaskEvent> { /** * 创建TimerTask (PS: 也可以使用EventListener, 只不过现在这样继承ApplicationListener分方式更容易让人理解) * * @param createTimerTaskEvent 创建timerTask事件 */ @Override public void onApplicationEvent(CreateTimerTaskEvent createTimerTaskEvent) { // 如果没有创建就put if (!isCreated(createTimerTaskEvent.getSource().toString())) { putTimerTaskByMsgType(createTimerTaskEvent.getSource().toString()); } } /** * 存放TimerTask * * @param platFormId platFormId */ private void putTimerTaskByMsgType(String platFormId) { // 定义频率 long per; // 获取对应queueName 运行频率 Object msgTypeRunForPer = redisTemplate.opsForValue().get(RedisKeyEnum.REDIS_SCHEDULED_TIME.getType() + platFormId); if (null == msgTypeRunForPer) { // 默认运行一次 per = customProperties.getDefaultPer(); } else { per = Long.parseLong(msgTypeRunForPer.toString()); } // 生成周期性任务 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(CommonConstants.INT_ONE, new DefaultThreadFactory("platFormId: " + platFormId + "Execute-PushMessage")); scheduledThreadPoolExecutor.scheduleWithFixedDelay(new CompensateMessageTask(platFormId, redissonClient, rocketMqProducer, elasticsearchBaseService, customProperties), CommonConstants.INT_ZERO, per, TimeUnit.SECONDS); // 把定时任务存放到map scheduledTaskMap.put(platFormId, scheduledThreadPoolExecutor); } } ``` ## 五.缺陷解决 ### 1. 网络抖动 广州PHP调用香港补偿服务接口, 但因通过ping域名的方式发现网络延时波动很大。因此由之前一个消息id变更为每次发送一批消息给到补偿服务。以此方式来减少跨地区之间的网络IO次数 ### 2. 消息遗失 因程序内是先把数据接收到存入到JVM内存。但是在存入JVM内存时是通过入Queue的方式进行存储。因此在代码里面贸然clear会造成数据丢失。因为一个周期性线程在从队列里面获取数据的同时会有另外的线程把新数据丢入队列。因此这采用了**后删法**解决 ```java if (CollectionUtils.isNotEmpty(callbackEoLinkedBlockingQueue)) { //new 新的bq进行操作,避免此时offer导致数据不一致,最后某些数据未被操作 LinkedBlockingQueue<MdcMessageCallbackEo> operateBq = new LinkedBlockingQueue<>(callbackEoLinkedBlockingQueue); // 为什么用forEach 不用saveBatch. 因为现在PHP调用请求是批量请求, 不会是单个消息id请求. 所以这里单个单个存入, 避免 URI [/_bulk?timeout=1m], status line [HTTP/1.1 413 Request Entity Too Large] 异常 operateBq.forEach(bq -> elasticsearchBaseService.saveByWrapper(new IndexChainWrapper<>(MdcMessageCallbackEo.class), bq)); callbackEoLinkedBlockingQueue.removeAll(operateBq); log.info("Execute-AsyncCallbackMessage执行完毕"); } ``` ### 3. 返回数据 因为ES里面每一个文档都存有消息体,所以每一个文档都是比较大的。因此为了接口能快速返回,在搜索ES时通过**指定SourceFilter**减少返回字段。从而达到响应数据大小阶梯式降低 ### 4. ES崩溃 在大批量消息id调用ES的情况时, 会让ES崩溃。原因有几点: 1. 测试环境ES 运行内存较小, 而且单节点1G 2. 大批量消息id(上千)不间断查询ES, 并且每个文档都很大 3. 用的是Filter查询, 而非普通Query查询 (**这一步最重要!**) 经过一系列排查原因以后,让DBA把测试环境ES运行内存从1G提升到2G。 同时大批量消息id从上千变更为500。最后使用Query查询让ES内部不再做结果集缓存(之前ES崩溃的根本原因就是因为不停的使用filter查询来缓存结果集, 导致ES JVM的GC耗时过程低于了请求结果缓存大小。从而导致ES崩溃) ## 六.注意的点 ### 1. 重启 如果在重启过程中内存有一系列更改状态的消息, 不用惊慌! 这些消息即使丢了也没事。周期性任务会把这些消息==重新给PHP, PHP那边会重新做接口回执== ### 2. 真实索引名 在项目当中基本上都是通过ES别名来做的查询。但其实在PushMessage中真正存放document的时候还是会通过别名去获取到当前最新物理写索引名。这样就会在更新的时候就会根据真实存放索引名来进行分组批量更新 ### 3. 数据影响范围 在现阶段, 所有文档消息数据都只是刊登业务数据。等该项目渡过最开始平稳期以后才会考虑把其它业务线与PHP交互的消息进行存储重推
thread
2023年8月23日 14:18
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码