路由注册是通过Broker和NameServer之间的心跳功能来实现

2021-09-26 17:25:46 浏览数 (13)

明:Broker上的FilterServer列表,消息过滤服务器列表,后续介绍Consumer时会介绍,consumer拉取数据是通过filterServer拉取,consumer向Broker注册。

数据结构:HashMap结构,key是Broker地址,value是记录了filterServer地址的List集合。

路由注册

路由注册是通过Broker和NameServer之间的心跳功能来实现的。主要分为两步:

Step1: Broker启动时向集群中所有NameServer发送心跳语句,每隔30秒(默认30s,时间间隔在10秒到60秒之间)再发一次。 Step2: NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable。

Broker发送心跳包

发送心跳包的核心逻辑是在Broker启动逻辑里,代码入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重点关注的是发送心跳包的逻辑实现,只列出发送心跳包的核心代码

创建了一个线程池注册Broker,程序启动10秒后执行,每隔30秒(默认30s,时间间隔在10秒到60秒之间,BrokerConfig.getRegisterNameServerPeriod()的默认值是30秒)执行一次。

代码语言:javascript复制
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

0 人点赞