[从源码学设计]蚂蚁金服SOFARegistry 之 ChangeNotifier

2021/4/26 12:27:24

本文主要是介绍[从源码学设计]蚂蚁金服SOFARegistry 之 ChangeNotifier,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。 本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。 本文为第十五篇,分析如何执行ChangeNotifier 来通知相关模块:hi,这里有新数据变化来到了,兄弟们走起来。

[从源码学设计]蚂蚁金服SOFARegistry 之 ChangeNotifier


目录

  • [从源码学设计]蚂蚁金服SOFARegistry 之 ChangeNotifier
    • 3.1 放入消息
    • 3.2 消费消息&发送通知
    • 3.1.1 PublishDataHandler
    • 3.1.2 DataChangeEventCenter
    • 3.2.7.1 时间轮
    • 3.2.1 DataChangeHandler
    • 3.2.2 类定义
    • 3.2.3 执行引擎
    • 3.2.4 业务执行
    • 3.2.5 通知
    • 3.2.6 BackUpNotifier同步
    • 3.2.7 SessionServerNotifier通知数据变化
    • 2.1 接口定义
    • 2.2 派生类
    • 2.3 Bean
    • 1.1 概述
    • 1.2 数据变化
    • 0x00 摘要
    • 0x01 业务范畴
    • 0x02 数据结构
    • 0x03 流程
    • 0x04 总结
    • 0xFF 参考


0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

本文为第十五篇,分析如何执行ChangeNotifier 来通知相关模块:hi,这里有新数据变化来到了,兄弟们走起来。

0x01 业务范畴

1.1 概述

当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler,Handler 会往 dataChangeEventCenter 中添加一个数据变更事件,用于异步地通知事件变更中心数据的变更。事件变更中心收到该事件之后,会往队列中加入事件。

  1. 此时 dataChangeEventCenter 会根据不同的事件类型异步地对上下线数据进行相应的处理,即把这个事件变更信息变成ChangeNotifier ,进而变成Operator,放到AbstractAcceptorStore;

  2. 与此同时 DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步。

因为篇幅限制,前文对 ChangeNotifier 这部分只是略过,本文就详细讲解下事件变更通知ChangeNotifier。 这里会再把整理流程串起来,会涉及到前面某些文章内容。

先给出图示以便大家了解 ChangeNotifier 的作用。

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+
                 |
                 |
                 |  ChangeData
                 v
         +-------+----------+
         | DataChangeHandler|
         +-------+----------+
                 |
                 |
                 |  ChangeData
                 v
          +------+--------+              +------------+
          | ChangeNotifier|  +-------->  | datumCache |
          +------+--------+              +------------+
                 |
                 |
                 v
             +---+------+
             | notifier |
             +---+------+
                 |
                 v
     +-----------+---------------+
     |                           |
     v                           v
+----+----------------+   +------+----------+
|SessionServerNotifier|   |  BackUpNotifier |
+----+----------------+   +------+----------+
     |                           |
     |                           |
     |                           |
     |                           v
  +--v------------+       +------+----------------+
  | sessionServer |       | AbstractAcceptorStore |
  +---------------+       +-----------------------+

1.2 数据变化

数据变化有两个方向

  • 数据服务器节点变化;

  • 数据的变化,即Publisher和Scriber的变化;

ChangeNotifier就是负责把 Publisher和Scriber的变化 通知给相关模块。变更通知就是一种解耦。

0x02 数据结构

我们首先需要看看通知的数据结构。

2.1 接口定义

IDataChangeNotifier是通知的接口定义:

public interface IDataChangeNotifier {
    SetgetSuitableSource();

    /**
     *
     * @param datum
     * @param lastVersion
     */
    void notify(Datum datum, Long lastVersion);
}

2.2 派生类

IDataChangeNotifier 有四个派生类,分别对应了具体数据变化的四种可能,从名字大约可以判断出用途。

public class BackUpNotifier implements IDataChangeNotifier

public class SessionServerNotifier implements IDataChangeNotifier

public class SnapshotBackUpNotifier implements IDataChangeNotifier

public class TempPublisherNotifier implements IDataChangeNotifier

2.3 Bean

对应的Bean如下:

@Bean(name = "dataChangeNotifiers")
public ListdataChangeNotifiers() {
    Listlist = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}

0x03 流程

我们从头理一下流程。

3.1 放入消息

当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变更事件,用于异步地通知事件变更中心数据的变更。事件变更中心收到该事件之后,会往队列中加入事件。

在DataServer这里,具体流程如下:

3.1.1 PublishDataHandler

PublishDataHandler 响应 PublishDataRequest。当有Publisher时候,就往DataChangeEventCenter放入消息。即调用下面来放入消息

dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

具体代码如下:

public class PublishDataHandler extends AbstractServerHandler{

    @Autowired
    private ForwardService                 forwardService;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter          dataChangeEventCenter;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DatumLeaseManager              datumLeaseManager;

    @Autowired
    private ThreadPoolExecutor             publishProcessorExecutor;

    @Override
    public Object doHandle(Channel channel, PublishDataRequest request) {
        Publisher publisher = Publisher.internPublisher(request.getPublisher());
        if (forwardService.needForward()) {
            CommonResponse response = new CommonResponse();
            response.setSuccess(false);
            response.setMessage("Request refused, Server status is not working");
            return response;
        }

        dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            String connectId = WordCache.getInstance().getWordCache(
                publisher.getSourceAddress().getAddressString());
            sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
                connectId);
            // record the renew timestamp
            datumLeaseManager.renew(connectId);
        }

        return CommonResponse.buildSucce***esponse();
    }
}

此时具体逻辑如下:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+

3.1.2 DataChangeEventCenter

DataChangeEventCenter 的核心是一个DataChangeEventQueue数组,

DataChangeEventCenter . onChange函数会首先根据Publisher的DataInfoId获取hash,根据这个hash数值来决定把  DataChangeEvent 消息放入哪个queue来处理,就是调用这个 queue的 onChange 函数。

public class DataChangeEventCenter {
    /**
     * queues of DataChangeEvent
     */
    private DataChangeEventQueue[] dataChangeEventQueues;

    @Autowired
    private DatumCache             datumCache;

    @PostConstruct
    public void init() {
        if (isInited.compareAndSet(false, true)) {
            queueCount = dataServerConfig.getQueueCount();
            dataChangeEventQueues = new DataChangeEventQueue[queueCount];
            for (int idx = 0; idx < queueCount; idx++) {
                dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,
                    datumCache);
                dataChangeEventQueues[idx].start();
            }
        }
    }
  
    /**
     * receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue
     *
     * @param publisher
     * @param dataCenter
     */
    public void onChange(Publisher publisher, String dataCenter) {
        int idx = hash(publisher.getDataInfoId());
        Datum datum = new Datum(publisher, dataCenter);
        if (publisher instanceof UnPublisher) {
            datum.setContainsUnPub(true);
        }
        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));
        } else {
            dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB_TEMP, datum));
        }
    }
}

DataChangeEventQueue 的主要数据成员如下:

public class DataChangeEventQueue {
    /**
     * a block queue that stores all data change events
     */
    private final BlockingQueueeventQueue;

    private final Map<String, Map> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

    private final DelayQueueCHANGE_QUEUE              = new DelayQueue();

    private DataChangeEventCenter                      dataChangeEventCenter;

    private DatumCache                                 datumCache;
}

其执行引擎是一个线程,其block在 BlockingQueue eventQueue 之上,当有消息时候,就取出消息,针对消息类型做不同处理。

public void start() {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
    executor.execute(() -> {
        while (true) {
            try {
                IDataChangeEvent event = eventQueue.take();
                DataChangeScopeEnum scope = event.getScope();
                if (scope == DataChangeScopeEnum.DATUM) {
                    DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
                    //Temporary push data will be notify as soon as,and not merge to normal pub data;
                    if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                        addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
                                dataChangeEvent.getSourceType());
                    } else {
                        handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(),
                                dataChangeEvent.getDatum());
                    }
                } else if (scope == DataChangeScopeEnum.CLIENT) {
                    handleClientOff((ClientChangeEvent) event);
                } else if (scope == DataChangeScopeEnum.SNAPSHOT) {
                    handleSnapshot((DatumSnapshotEvent) event);
                }
            } 
        }
    });
}

对于 Publisher 消息类型,handleDatum 函数会根据changeType是 COVER 还是 MERGE 来做不同处理。

在此步骤中,也会把 ChangeData 放入 CHANGE_QUEUE.put(changeData);

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
    lock.lock();
    try {
        //get changed datum
        ChangeData changeData = getChangeData(targetDatum.getDataCenter(),
            targetDatum.getDataInfoId(), sourceType, changeType);
        Datum cacheDatum = changeData.getDatum();
        if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
            changeData.setDatum(targetDatum);
        } else {
            MaptargetPubMap = targetDatum.getPubMap();
            MapcachePubMap = cacheDatum.getPubMap();
            for (Publisher pub : targetPubMap.values()) {
                String registerId = pub.getRegisterId();
                Publisher cachePub = cachePubMap.get(registerId);
                if (cachePub != null) {
                    // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means
                    // that pub is not the newest data, should be ignored
                    if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {
                        continue;
                    }
                    // if pub and cachePub both are publisher, and sourceAddress of both are equal,
                    // and version of cachePub is greater than version of pub, should be ignored
                    if (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher)
                        && pub.getSourceAddress().equals(cachePub.getSourceAddress())
                        && cachePub.getVersion() > pub.getVersion()) {
                        continue;
                    }
                }
                cachePubMap.put(registerId, pub);
                cacheDatum.setVersion(targetDatum.getVersion());
            }
        }
    } finally {
        lock.unlock();
    }
}

此时具体逻辑如下:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+

3.2 消费消息&发送通知

DataChangeHandler 会针对每个DataChangeEventQueue进行消费通知。

public class DataChangeHandler {

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private ListdataChangeNotifiers;

    @PostConstruct
    public void start() {
        DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
        int queueCount = queues.length;
        Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
        Executor notifyExecutor = ExecutorFactory
                .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
        for (int idx = 0; idx < queueCount; idx++) {
            final DataChangeEventQueue dataChangeEventQueue = queues[idx];
            final String name = dataChangeEventQueue.getName();
            executor.execute(() -> {
                while (true) {
                    try {
                        final ChangeData changeData = dataChangeEventQueue.take();
                        notifyExecutor.execute(new ChangeNotifier(changeData, name));
                    } 
                }
            });
        }
    }
}

3.2.1 DataChangeHandler

DataChangeHandler 会定期提取DataChangeEventCenter中的消息,然后进行处理。

3.2.2 类定义

public class DataChangeHandler {

    @Autowired
    private DataServerConfig          dataServerConfig;

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private ListdataChangeNotifiers;
}

3.2.3 执行引擎

这里是一个双层线程模型。

  • executor = ExecutorFactory.newFixedThreadPool(queueCount)

  • notifyExecutor= ExecutorFactory.newFixedThreadPool(dataServerConfig.getQueueCount() * 5)

可以认为 executor 是控制线程,notifierExecutor是工作线程,工作线程是控制线程的5倍。

  • DataChangeHandler 会遍历 DataChangeEventCenter 中所有 DataChangeEventQueue,
  • 针对每一个dataChangeEventQueue调用executor的一个控制线程,
  • 在这个控制线程里面,可以从 DataChangeEventQueue 之中取出ChangeData,针对每一个ChangeData,调用notifyExecutor的一个工作线程,生成一个ChangeNotifier进行处理。
@PostConstruct
public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
  
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                 final ChangeData changeData = dataChangeEventQueue.take();
                 notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

3.2.4 业务执行

对于 ChangeData,会生成 ChangeNotifier 进行处理。会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步。

在 ChangeNotifier 之中,会判断changeData的类型做不同处理。

  • 如果是SnapshotData,则:
    • 生成SnapshotData;
    • 调用 datumCache.putSnapshot 做存储;
    • 调用notify做通知;
  • 如果是其他类型,则:
    • 对于pub or unPub merge,需要datum.updateVersion();
    • 如果是 PUB_TEMP,则notifyTempPub(datum, sourceType, changeType);
    • 如果是版本更新,则notify(datum, sourceType, lastVersion);

具体如下:

private class ChangeNotifier implements Runnable {

    private ChangeData changeData;
    private String     name;

    @Override
    public void run() {
        if (changeData instanceof SnapshotData) {
           ......
        } else {
            Datum datum = changeData.getDatum();

            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            DataSourceTypeEnum sourceType = changeData.getSourceType();
            DataChangeTypeEnum changeType = changeData.getChangeType();

            if (changeType == DataChangeTypeEnum.MERGE
                && sourceType != DataSourceTypeEnum.BACKUP
                && sourceType != DataSourceTypeEnum.SYNC) {
                //update version for pub or unPub merge to cache
                //if the version product before merge to cache,it may be cause small version override big one
                datum.updateVersion();
            }

            long version = datum.getVersion();

            try {
                if (sourceType == DataSourceTypeEnum.CLEAN) {
                    if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                      ......
                    }

                } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                    notifyTempPub(datum, sourceType, changeType);
                } else {
                    MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                    Long lastVersion = mergeResult.getLastVersion();

                    if (lastVersion != null
                        && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                        return;
                    }

                    //lastVersion null means first add datum
                    if (lastVersion == null || version != lastVersion) {
                        if (mergeResult.isChangeFlag()) {
                            notify(datum, sourceType, lastVersion);
                        }
                    }
                }
            } 
        }
    }
}

此时具体逻辑如下:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+
                 |
                 |
                 |  ChangeData
                 v
         +-------+----------+
         | DataChangeHandler|
         +-------+----------+
                 |
                 |
                 |  ChangeData
                 v
          +------+--------+              +------------+
          | ChangeNotifier|  +-------->  | datumCache |
          +------+--------+              +------------+

3.2.5 通知

notify函数会遍历dataChangeNotifiers,找出可以支持本Datum对应SourceType的Notifier来执行。

具体如何支持哪些函数,是由getSuitableSource设置的。

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if (notifier.getSuitableSource().contains(sourceType)) {
            notifier.notify(datum, lastVersion);
        }
    }
}

对应的Bean是:

@Bean(name = "dataChangeNotifiers")
public ListdataChangeNotifiers() {
    Listlist = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}

3.2.6 BackUpNotifier同步

就是调用 syncDataService.appendOperator 进行通知,其实就是把 Datum 变成 Operator,存到AbstractAcceptorStore。

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public SetgetSuitableSource() {
        Setset = new HashSet<>();
        set.add(DataSourceTypeEnum.PUB);
        return set;
    }

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

3.2.7 SessionServerNotifier通知数据变化

SessionServerNotifier 则要复杂很多。

public class SessionServerNotifier implements IDataChangeNotifier {

    private AsyncHashedWheelTimer          asyncHashedWheelTimer;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private Exchange                       boltExchange;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DatumCache                     datumCache;
  
    @Override
    public SetgetSuitableSource() {
        Setset = new HashSet<>();
        set.add(DataSourceTypeEnum.PUB);
        set.add(DataSourceTypeEnum.SYNC);
        set.add(DataSourceTypeEnum.SNAPSHOT);
        return set;
    }  
}
3.2.7.1 时间轮

建立了一个500毫秒的时间轮。

@PostConstruct
public void init() {
    ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
    threadFactoryBuilder.setDaemon(true);
    asyncHashedWheelTimer = new AsyncHashedWheelTimer(threadFactoryBuilder.setNameFormat(
        "Registry-SessionServerNotifier-WheelTimer").build(), 500, TimeUnit.MILLISECONDS, 1024,
        dataServerConfig.getSessionServerNotifierRetryExecutorThreadSize(),
        dataServerConfig.getSessionServerNotifierRetryExecutorQueueSize(), threadFactoryBuilder
            .setNameFormat("Registry-SessionServerNotifier-WheelExecutor-%d").build(),
        new TaskFailedCallback() {
            @Override
            public void executionRejected(Throwable e) {
                LOGGER.error("executionRejected: " + e.getMessage(), e);
            }

            @Override
            public void executionFailed(Throwable e) {
                LOGGER.error("executionFailed: " + e.getMessage(), e);
            }
        });
}

从业务角度看,当有publisher相关消息来临时候,

DataChangeHandler的notify函数会遍历dataChangeNotifiers,找出可以支持本Datum对应SourceType的Notifier来执行。

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if (notifier.getSuitableSource().contains(sourceType)) {
            notifier.notify(datum, lastVersion);
        }
    }
}

到了SessionServerNotifier这里的notify函数,会遍历目前缓存的所有Connection,逐一通知。

@Override
public void notify(Datum datum, Long lastVersion) {
    DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(),
        datum.getDataCenter(), datum.getVersion());
    Listconnections = sessionServerConnectionFactory.getSessionConnections();
    for (Connection connection : connections) {
        doNotify(new NotifyCallback(connection, request));
    }
}

具体通知函数:

private void doNotify(NotifyCallback notifyCallback) {
    Connection connection = notifyCallback.connection;
    DataChangeRequest request = notifyCallback.request;
    try {
        //check connection active
        if (!connection.isFine()) {
            return;
        }
        Server sessionServer = boltExchange.getServer(dataServerConfig.getPort());
      sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()),
            request, notifyCallback, dataServerConfig.getRpcTimeout());
    } catch (Exception e) {

        onFailed(notifyCallback);
    }
}

而时间轮是在调用失败的重试中使用。

就是当没有达到失败重试最大次数时,进行定时重试。

private void onFailed(NotifyCallback notifyCallback) {

    DataChangeRequest request = notifyCallback.request;
    Connection connection = notifyCallback.connection;
    notifyCallback.retryTimes++;

    //check version, if it's fall behind, stop retry
    long _currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
    if (request.getVersion() != _currentVersion) {
        return;
    }

    if (notifyCallback.retryTimes  {
            //check version, if it's fall behind, stop retry
            long currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
            if (request.getVersion() == currentVersion) {
                doNotify(notifyCallback);
            } 
        }, getDelayTimeForRetry(notifyCallback.retryTimes), TimeUnit.MILLISECONDS);
    } 
}

具体逻辑如下:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+
                 |
                 |
                 |  ChangeData
                 v
         +-------+----------+
         | DataChangeHandler|
         +-------+----------+
                 |
                 |
                 |  ChangeData
                 v
          +------+--------+              +------------+
          | ChangeNotifier|  +-------->  | datumCache |
          +------+--------+              +------------+
                 |
                 |
                 v
             +---+------+
             | notifier |
             +---+------+
                 |
                 v
     +-----------+---------------+
     |                           |
     v                           v
+----+----------------+   +------+----------+
|SessionServerNotifier|   |  BackUpNotifier |
+----+----------------+   +------+----------+
     |                           |
     |                           |
     |                           |
     |                           v
  +--v------------+       +------+----------------+
  | sessionServer |       | AbstractAcceptorStore |
  +---------------+       +-----------------------+

0x04 总结

本文是把注册中的一个点“事件变更通知ChangeNotifie“进行细化展开,以 SessionServerNotifier 和 BackUpNotifier 为例,为大家进行解释ChangeNotifier的原理和使用。把包括 dataChangeEventCenter 等功能也梳理了一下,希望对大家有所帮助。

在 DataServer,数据变化有两个方向:

  • 数据服务器节点变化;

  • 数据的变化,即 Publisher 和 Scriber 的变化;

ChangeNotifier就是负责把 Publisher 和 Scriber 的变化 通知给相关模块。变更通知就是一种解耦。

0xFF 参考

[从源码学设计]蚂蚁金服SOFARegistry之服务上线

[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志



这篇关于[从源码学设计]蚂蚁金服SOFARegistry 之 ChangeNotifier的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程