当前位置: 首页 > news >正文

wordpress购物车如何做优化排名

wordpress购物车,如何做优化排名,seo网站优化代码,做简易动画的网站文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1)提交消费请求2)消费消息 一、消息消费实现 消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现…

文章目录

  • 一、消息消费实现
  • 二、消息消费过程
    • 1、消息拉取
    • 2、消息消费
      • 1)提交消费请求
      • 2)消费消息

一、消息消费实现

  消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。
在这里插入图片描述

二、消息消费过程

1、消息拉取

  1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成

//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}

  2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行,// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};

2、消息消费

1)提交消费请求

  pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常,延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量,分批for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}

  submitConsumeRequest方法中,

  • 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
  • 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
  • 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交

2)消费消息

  ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run@Overridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}

  ConsumeRequest线程中,执行步骤如下

  • 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
  • 恢复重试消息的topic和namespace
  • 如果存在钩子函数,则执行前置钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)
  • 调用消息监听器消费消息listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl)
  • 如果存在后置钩子,则执行后置钩子函数
  • 消息消费结果处理
http://www.shuangfujiaoyu.com/news/37462.html

相关文章:

  • 备案中网站打不开国外b站视频推广网站
  • 网站设计 企业 济南网站建设有哪些公司
  • 网站获取qq号码 原理百度app下载最新版
  • 河北廊坊建设银行网站宁波网络推广运营公司电话
  • 广告策划书案例完整版sem与seo的区别
  • 长沙网站建设公司哪家专业爱站网关键词挖掘
  • 怎么用虚拟主机做网站步骤佛山百度关键词seo外包
  • iis 网站显示建设中百度指数分是什么
  • 做特殊任务的网站什么是网络营销战略
  • 鄞州区网站建设报价18款禁用网站app直播
  • 网站建设 响应式 北京新东方在线教育平台官网
  • 怎么做电商网站推广图们网络推广
  • 简单网页制作模板免费seo规范培训
  • 实例网站制作教程大地seo视频
  • 做网站+广告费+步骤软文生成器
  • 招商网站大全免费seo外链增加
  • axure中继器做网站电商网站设计
  • b2c网站推广方案快速整站排名seo教程
  • 湖南省长沙建设工程造价站网站网上销售平台
  • 免费做网站推广上海培训机构
  • 网站都可以做哪些主题手机百度推广怎么打广告
  • 做网站 分工苏州疫情最新通知
  • 巨耀网站建设公司推广软件有哪些
  • 网站推广的优点sem优化托管公司
  • 阳江市网站建设青岛seo杭州厂商
  • wordpress 301 redirection广州seo推广
  • 做3d图的网站有哪些软件网址导航下载到桌面
  • led网站免费模板seo推广优化官网
  • 网站怎么收费网络服务器是指什么
  • 网站设计这个专业怎么样百度推广后台登陆