明:Broker上的FilterServer列表,消息过滤服务器列表,后续介绍Consumer时会介绍,consumer拉取数据是通过filterServer拉取,consumer向Broker注册。
数据结构:HashMap结构,key是Broker地址,value是记录了filterServer地址的List集合。
路由注册
路由注册是通过Broker和NameServer之间的心跳功能来实现的。主要分为两步:
Step1: Broker启动时向集群中所有NameServer发送心跳语句,每隔30秒(默认30s,时间间隔在10秒到60秒之间)再发一次。 Step2: NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable。
Broker发送心跳包
发送心跳包的核心逻辑是在Broker启动逻辑里,代码入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重点关注的是发送心跳包的逻辑实现,只列出发送心跳包的核心代码
创建了一个线程池注册Broker,程序启动10秒后执行,每隔30秒(默认30s,时间间隔在10秒到60秒之间,BrokerConfig.getRegisterNameServerPeriod()的默认值是30秒)执行一次。
代码语言:javascript复制this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);


