这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
创新互联专注于同江网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供同江营销型网站建设,同江网站制作、同江网页设计、同江网站官网定制、小程序制作服务,打造同江网络公司原创品牌,更为您提供同江网站排名全网营销落地服务。
在consumer拉取消息时,broker首先会根据待拉取的topic+queueId得到对应的ConsumeQueue,再根据消费offset从ConsumeQueue相应的偏移位置中获取该消息在commitlog里真实的offset/msgsize/tagscode信息,最后再从commitlog查出消息体。
消息拉取在broker存储层的调用入口为DefaultMessageStore.getMessage方法。核心逻辑如下:
// DefaultMessageStore.java public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { // ... // 1. 定位ConsumeQueue ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { // 2. 从ConsumeQueue中读取消费偏移offset处的内容 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 单个请求最大拉取数据量 final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // commitlog offset 8bytes int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes // ... // 3. 通过tagscode快速过滤 if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } // 4. 从commitlog获取消息体 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 5. 通过消息体过滤 if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); // 6.添加到返回结果 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } // ... } finally { bufferConsumeQueue.release(); } } else { // ... } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } // ... return getResult; }
ConsumeQueue中存储的是固定长度(每个消息20字节)的内容,因此访问比较简单:
// ConsumeQueue.java public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消费者offset * 固定20字节长度 if (offset >= this.getMinLogicOffset()) { // 定位到所属的MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); // 从MappedFile中读取实际的数据 return result; } } return null; }
通过ConsumeQueue获取消息在commitlog中的偏移量以及消息大小之后,获取消息体的方法如下
// CommitLog.java public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); // 定位消息所在的MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); // 从MappedFile中获取消息体 } return null; }
消息拉取整体流程如下
感谢各位的阅读!关于“RocketMQ中broker消息存储之如何实现拉取消息”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!