当前位置: 首页 > news >正文

网站运营网站建设贵阳网站建设公司

网站运营网站建设,贵阳网站建设公司,商城网站开发那家好,营销型网站与展示型网站消息刷盘 同步入口:org.apache.rocketmq.store.CommitLog.GroupCommitService 异步入口:org.apache.rocketmq.store.CommitLog.FlushRealTimeService 刷盘有同步和异步两种,在实例化Commitlog的时候,会根据配置创建不同的服务 p…

消息刷盘

同步入口:org.apache.rocketmq.store.CommitLog.GroupCommitService

异步入口:org.apache.rocketmq.store.CommitLog.FlushRealTimeService

刷盘有同步和异步两种,在实例化Commitlog的时候,会根据配置创建不同的服务

public CommitLog(final DefaultMessageStore defaultMessageStore) {this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());this.defaultMessageStore = defaultMessageStore;if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 如果是同步刷盘,则初始化GroupCommitService服务this.flushCommitLogService = new GroupCommitService();} else {// 如果是异步刷盘,则初始化GroupCommitService服务this.flushCommitLogService = new FlushRealTimeService();}// 异步转存数据服务:将堆外内存的数据提交到fileChannelthis.commitLogService = new CommitRealTimeService();this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {@Overrideprotected PutMessageThreadLocal initialValue() {return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());}};this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();}

提交刷盘

org.apache.rocketmq.store.CommitLog#submitFlushRequest

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// Synchronization flush// 同步刷盘if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {// 同步等待,即等待刷盘结果GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());service.putRequest(request);return request.future();} else {// 同步刷盘但是不需要等待刷盘结果,那么唤醒同步刷盘线程,随后直接返回PUT_OKservice.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// Asynchronous flush// 异步刷盘else {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 如果没有启动了堆外缓存,那么唤醒异步刷盘服务 FlushRealTimeServiceflushCommitLogService.wakeup();} else  {// 如果启动了堆外缓存,那么唤醒异步转存服务 CommitRealTimeServicecommitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}
}

同步刷盘

org.apache.rocketmq.store.CommitLog.GroupCommitService

同步刷盘服务,如果没有开启同步等待,则将消息全部刷入磁盘。

同步刷盘服务中有一对读写队列,每当有刷盘请求进来,会写入到写队列,然后交换读写队列,对读队列进行请求处理,执行刷盘。

如果开启同步等待,则在提交刷盘时,提交一个request到写队列,然后唤醒刷盘服务,在刷盘服务中,会先进行读写交换,然后对读队列的请求进行刷盘处理。

在刷盘方法中,会根据传入的最小刷盘页数进行刷盘,为0则是全部刷盘。不为0,则需要判断目前需要刷盘的数据大小是否达到了传入的页数数量,若是则刷盘,若不是则不刷盘。

最后通过this.mappedByteBuffer.force();方法强制刷盘并更新刷盘点位等信息

/*** GroupCommit Service* 同步刷盘服务*/
class GroupCommitService extends FlushCommitLogService {private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();private final PutMessageSpinLock lock = new PutMessageSpinLock();/*** 加锁存入写队列* @param request*/public synchronized void putRequest(final GroupCommitRequest request) {lock.lock();try {this.requestsWrite.add(request);} finally {lock.unlock();}this.wakeup();}private void swapRequests() {// 刷盘时交换读写队列后,写队列为空,读队列供刷盘消费// 消费完成后,读队列置为空,在下一次交换时,写队列又为空// 如此循环lock.lock();try {LinkedList<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;} finally {lock.unlock();}}private void doCommit() {// 调用此方法之前,都交换了读写队列if (!this.requestsRead.isEmpty()) {for (GroupCommitRequest req : this.requestsRead) {// There may be a message in the next file, so a maximum of// two times the flushboolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();// 这里可能会刷盘两次的原因是,可能第一次刷完发现文件满了,就没有达到req传入的刷盘点,则需要再刷盘一次for (int i = 0; i < 2 && !flushOK; i++) {CommitLog.this.mappedFileQueue.flush(0);flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}// requestsRead重新创建一个空的队列,// 当下一次交换队列的时候(doCommit之前会交换队列),requestsWrite又会成为一个空队列,以备向其中写入消息this.requestsRead = new LinkedList<>();} else {// Because of individual messages is set to not sync flush, it// will come to this process// 队列中没有元素是因为某些消息的设置是同步刷盘但是不等待// 因此这里直接调用mappedFileQueue.flush(0)方法进行一次同步刷盘即可,无需唤醒线程等操作。// 0表示最少刷0页,也就是全部刷入CommitLog.this.mappedFileQueue.flush(0);}}public void run() {CommitLog.log.info(this.getServiceName() + " service started");// 如果没有停止, 就一直循环while (!this.isStopped()) {try {// 若被wakeup, 则交换读写队列, 否则等待10msthis.waitForRunning(10);this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// Under normal circumstances shutdown, wait for the arrival of the request, and then flushtry {Thread.sleep(10);} catch (InterruptedException e) {CommitLog.log.warn("GroupCommitService Exception, ", e);}synchronized (this) {this.swapRequests();}this.doCommit();CommitLog.log.info(this.getServiceName() + " service end");}

异步刷盘

org.apache.rocketmq.store.CommitLog.FlushRealTimeService

异步刷盘是在一个循环中,不断的执行刷盘逻辑

默认不是定时刷盘,若是定时刷盘,则固定隔一段时间(默认500ms)刷盘一次

若不是定时刷盘,则会被wakeup()方法唤醒,执行刷盘

执行刷盘前会判断配置的最小刷盘页数(默认4页即16K),若未达到此数,则不执行刷盘

但当距离上一次刷盘超过配置时间(默认10s),即使未达到最小刷盘数,也会执行刷盘

/*** 异步刷盘*/
class FlushRealTimeService extends FlushCommitLogService {private long lastFlushTimestamp = 0;private long printTimes = 0;public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {// 是否是定时刷盘,默认是false,即不开启boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();// 获取刷盘间隔时间,默认500ms,可通过flushIntervalCommitLog配置int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();// 获取刷盘的最少页数,默认4,即16k,可通过flushCommitLogLeastPages配置int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();// 最大刷盘间隔时间,默认10s,即到了10s后,刷盘页数小于4也会刷盘int flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// Print flush progresslong currentTimeMillis = System.currentTimeMillis();//如果当前时间距离上次刷盘时间大于等于10s,那么必定刷盘if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {//更新刷盘时间戳为当前时间this.lastFlushTimestamp = currentTimeMillis;//最少刷盘页数为0,即不管页数是否超过4,都会刷盘flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0;}try {if (flushCommitLogTimed) {//如果定时刷盘,那么当前线程睡眠指定的间隔时间Thread.sleep(interval);} else {// 如果不是定时刷盘,那么调用waitForRunning方法,线程最多睡眠500ms// 可以被中途的wakeup方法唤醒进而直接尝试进行刷盘this.waitForRunning(interval);}if (printFlushProgress) {this.printFlushProgress();}// 开始刷盘long begin = System.currentTimeMillis();//  刷入指定的页,内部如果没有达到对应页数则不会刷盘CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}// 刷盘耗时long past = System.currentTimeMillis() - begin;if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}}// Normal shutdown, to ensure that all the flush before exit/** 停止时逻辑* 在正常情况下服务关闭时,一次性执行10次刷盘操作*/boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.flush(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}this.printFlushProgress();CommitLog.log.info(this.getServiceName() + " service end");}

刷盘方法

org.apache.rocketmq.store.MappedFileQueue#flush

org.apache.rocketmq.store.MappedFile#flush

public boolean flush(final int flushLeastPages) {boolean result = true;// flushedWhere: 刷盘位置// 根据commitlog全局的刷盘点找到文件MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用此文件的刷盘,返回此次的刷盘位置int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;// 刷盘结果result = where == this.flushedWhere;// 更新刷盘物理位置this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result;
}
public int flush(final int flushLeastPages) {// 判断能否刷盘, 是否达到最小刷盘页数flushLeastPagesif (this.isAbleToFlush(flushLeastPages)) {//增加对该MappedFile的引用次数if (this.hold()) {// 获取写入位置int value = getReadPosition();try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {// 如果使用了堆外内存,那么通过fileChannel强制刷盘,这是异步堆外内存走的逻辑this.fileChannel.force(false);} else {// 如果没有使用堆外内存,那么通过mappedByteBuffer强制刷盘,这是同步或者异步刷盘走的逻辑this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}// 将刷盘位置设置为写入位置this.flushedPosition.set(value);// 减少对该MappedFile的引用次数,表示使用结束,在netty中的ByteBuf就有类似的设计,如果引用为0则内存会被回收this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}// 获取最新的刷盘位置,也就是刚刚刷到的地方return this.getFlushedPosition();
}
http://www.shuangfujiaoyu.com/news/59958.html

相关文章:

  • 网站模板登录模块推推蛙品牌策划
  • 用香港服务器建网站做微商厦门网站关键词推广
  • 邢台做网站推广报价免费发布推广信息网站
  • 加若格网站做么样快手推广网站
  • 服务行业做网站关键词自助优化
  • 怎样做平台网站百度seo优化技巧
  • 河北省建设厅网站刷身份证流程西安网站建设哪家好
  • 西安公司企业网站建设商城小程序
  • 南宁专业网站建设seo免费优化软件
  • 知名电子商务网站有哪些谷歌手机网页版入口
  • 引导企业做网站seo云优化是什么意思
  • wordpress4.6获取用户名方法惠州seo推广外包
  • 毕业设计做音乐网站可以吗网络营销的优势包括
  • 郑州网站建设哪家有google海外推广
  • 网站域名使用怎么做待摊分录网站做seo教程
  • ftp网站 免费外贸推广平台有哪几个
  • 衡水专业网站建设公司百度搜索引擎竞价排名
  • 漂亮企业网站源码长沙seo搜索
  • 昆明网站建设系统上海网络推广培训学校
  • 怎么注册公司都需要什么手续百度自然搜索排名优化
  • 在线做图软件网站建设及推广优化
  • 大数据比赛网站建设重庆网站推广专家
  • 网站备案升级今天刚刚发生的新闻
  • 深圳企业网站建设制作设计公司西安疫情最新数据消息5分钟前
  • 沈阳网站建设方法网站友链外链
  • 深圳建设交易网站百度云登录
  • 网站空间查询域名注册优惠
  • 知名企业网站人才招聘情况引流推广网站平台
  • 深圳设计周展会2023成都seo技术经理
  • 家装设计网站开发彩虹云商城网站搭建