LearnerZooKeeperServer是所有Follower和Observer的父类,在LearnerZooKeeperServer里有2个重要的属性:
//提交请求处理器
protected CommitProcessor commitProcessor;
//同步处理器
protected SyncRequestProcessor syncProcessor;
创新互联-专业网站定制、快速模板网站建设、高性价比忻城网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式忻城网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖忻城地区。费用合理售后完善,10年实体公司更值得信赖。
FollowerZooKeeperServer和ObserverZooKeeperServer都继承了LearnerZooKeeperServer服务器。
//待同步的请求
ConcurrentLinkedQueue pendingSyncs;
//待处理的事务请求
LinkedBlockingQueue pendingTxns = new LinkedBlockingQueue();
构建请求处理链,FollowerZooKeeperServer的请求处理链是:
FollowerRequestProcessor -> CommitProcessor ->FinalRequestProcessor
@Override
protected void setupRequestProcessors() {
//最后的处理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
//第二个处理器
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
//第一个请求处理器FollowerRequestProcessor
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
该函数将请求进行记录(放入到对应的队列中),等待处理。
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
//zxid不等于0,说明此服务器已经处理过请求
if ((request.zxid & 0xffffffffL) != 0) {
// 将该请求放入pendingTxns中,等待事务处理
pendingTxns.add(request);
}
// 使用SyncRequestProcessor处理请求(其会将请求放在队列中,异步进行处理)
syncProcessor.proce***equest(request);
}
函数会提交zxid对应的请求(pendingTxns的队首元素),其首先会判断队首请求对应的zxid是否为传入的zxid,然后再进行移除和提交(放在committedRequests队列中)。
public void commit(long zxid) {
// 没有还在等待处理的事务
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
// 队首元素的zxid
long firstElementZxid = pendingTxns.element().zxid;
// 如果队首元素的zxid不等于需要提交的zxid,则退出程序
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
// 从待处理事务请求队列中移除队首请求
Request request = pendingTxns.remove();
// 提交该请求
commitProcessor.commit(request);
}
// 同步处理器是否可用,系统参数控制
private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
// 待同步请求队列
ConcurrentLinkedQueue pendingSyncs =
new ConcurrentLinkedQueue();
构建请求处理链,ObserverZooKeeperServer的请求处理链是:ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor,可能会存在SyncRequestProcessor。
@Override
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
/*
* Observer should write to disk, so that the it won't request
* too old txn from the leader which may lead to getting an entire
* snapshot.
*
* However, this may degrade performance as it has to write to disk
* and do periodic snapshot which may double the memory requirements
*/
//是否使用同步处理器,看系统参数配置,会影响性能
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
同步处理器可用,则使用同步处理器进行处理(放入同步处理器的queuedRequests队列中),然后提交请求(放入提交请求处理器的committedRequests队列中)
public void commitRequest(Request request) {
if (syncRequestProcessorEnabled) {
// Write to txnlog and take periodic snapshot
//写事务日志,并定期快照
syncProcessor.proce***equest(request);
}
commitProcessor.commit(request);
}