[从源码学设计]蚂蚁金服SOFARegistry网络操作之连接管理
2021/4/26 12:28:54
本文主要是介绍[从源码学设计]蚂蚁金服SOFARegistry网络操作之连接管理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。本文为第三篇,介绍SOFARegistry网络操作之连接管理。
[从源码学设计]蚂蚁金服SOFARegistry网络操作之连接管理
目录
- [从源码学设计]蚂蚁金服SOFARegistry网络操作之连接管理
- 4.1 DataServerNodeFactory
- 4.2 业务流程
- 4.1.1 DataServerNode
- 4.1.2 DataServerNodeFactory
- 4.2.1 注册
- 4.2.2 使用
- 3.1 Connection对象
- 3.2 Connection类定义
- 3.3 ConnectionFactory
- 3.4 MetaServerConnectionFactory
- 3.5 DataServerConnectionFactory
- 3.6 SessionServerConnectionFactory
- 3.7 SessionServerConnectionFactory业务流程
- 3.5.1 注册
- 3.5.2 获取
- 3.5.3 DataSyncServerConnectionHandler
- 3.6.1 问题
- 3.6.2 逻辑概念和关系
- 3.6.3 示例图
- 3.6.4 主要变量
- 3.6.5 Pair
- 3.6.6 processId
- 3.7.1 SessionServerRegisterRequest
- 3.7.2 PublishDataRequest
- 3.7.3 DatumLeaseManager
- 2.1 连接管理
- 2.2 管理内容
- 1.1 应用场景
- 0x00 摘要
- 0x01 业务领域
- 0x02 管理内容
- 0x03 Connection管理
- 0x04 节点管理
- 0x05 总结
- 0xFF 参考
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第三篇,介绍SOFARegistry网络操作之连接管理。
0x01 业务领域
上文我们讲解了SOFARegistry的网络封装和操作,本文继续网络相关部分。
虽然SOFABolt底层已经做了连接管理,比如有Manager,pool,但是SOFARegistry在上层结合业务也做了连接管理,很有特色,所以我们专文讲解。
1.1 应用场景
这里我们集中从DataServer角度出发讲解,此处和业务紧密结合。
让我们大致想想DataServer需要管理哪些连接或者类似概念。
- MetaServer Connection:本DataServer与Meta Server的连接,用来和Meta Server交互;
- DataServer Connection:本DataServer与其他dataServer的连接,用来数据同步;
- 扩展开来,其他Data Server节点也需要管理;
- SessionServer Connection,本DataServer与Session server的连接,这个非常复杂,这里会重点讲解;
- 在SessionServer方面,又需要区分具体每个Publisher;
这就让我们来思考几个问题:
- 究竟什么可以唯一标示一个SessionServer?
- 什么可以唯一标示一个Publisher?ip : port?或者其他?
- 业务上有没有特殊考虑的需要?
具体我们在后文会详述阿里的思路。
0x02 管理内容
2.1 连接管理
首先讲讲普遍意义的连接管理。
连接管理是网络操作中的核心。我们知道,一次 tcp 请求大致分为三个步骤:建立连接、通信、关闭连接。每次建立新连接都会经历三次握手,中间包含三次网络传输,对于高并发的系统,这是一笔不小的负担;关闭连接同样如此。为了减少每次网络调用请求的开销,对连接进行管理、复用,可以极大的提高系统的性能。
为了提高通信效率,我们需要考虑复用连接,减少 TCP 三次握手的次数,因此需要有连接管理的机制。
关于连接管理,SOFARegistry有两维度层次的连接管理,分别是 Connection 和 Node。
2.2 管理内容
普遍意义的连接管理,通常需要处理:
- 连接创建与销毁
- 心跳管理
- 空闲连接管理
- 断线重连
- 慢连接处理
- 作为一个框架,当然还需要把各种连接事件分派给用户进行定制
因为SOFABolt底层已经做了底层连接管理,所以SOFARegistry只要做顶层部分连接管理即可,就是从业务角度区分保存连接。
0x03 Connection管理
3.1 Connection对象
这里说的Connection我们特指sofa-bolt的Connection对象com.alipay.remoting.Connection
。前文提到,SOFARegistry把sofa-bolt的Connection对象直接暴露出来。
面向连接的TCP协议要求每次peer间通信前建立一条TCP连接,该连接可抽象为一个4元组(four-tuple,有时也称socket pair):socket(localIp, localPort, remoteIp, remotePort )
,这4个元素唯一地代表一条TCP连接。
在Netty中用Channel来表示一条TCP连接,在sofa-bolt使用Connection对象来抽象一个连接,一个连接在client跟server端各用一个connection对象表示。
有了Connection这个抽象之后,自然的需要提供接口来管理Connection, 这个接口就是ConnectionFactory。
那么Connection是如何跟Netty进行联动呢。我们知道在Netty中,client连接到server后,server会回调initChannel
方法,在这个方法我们会初始化各种事件handler,sofa-bolt就在这里创建Connection,并在Netty的Channel对象上打上Connection标,后续通过Channel就可以直接找到这个Connection。
3.2 Connection类定义
Connection其删减版定义如下,可以看到其主要成员就是 Netty channel 实例:
public class Connection { private Channel channel; private final ConcurrentHashMapinvokeFutureMap = new ConcurrentHashMap(4); /** Attribute key for connection */ public static final AttributeKeyCONNECTION = AttributeKey.valueOf("connection"); /** Attribute key for heartbeat count */ public static final AttributeKeyHEARTBEAT_COUNT = AttributeKey.valueOf("heartbeatCount"); /** Attribute key for heartbeat switch for each connection */ public static final AttributeKeyHEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch"); /** Attribute key for protocol */ public static final AttributeKeyPROTOCOL = AttributeKey.valueOf("protocol"); /** Attribute key for version */ public static final AttributeKeyVERSION = AttributeKey.valueOf("version"); private Url url; private final ConcurrentHashMapid2PoolKey = new ConcurrentHashMap(256); private SetpoolKeys = new ConcurrentHashSet(); private final ConcurrentHashMapattributes = new ConcurrentHashMap(); }
省去 AtributeKey 类型定义以及 Log 配置,以上是Connection中主要的成员变量。包括几个方面:
- 连接:Channel、Url
- 版本:protocolCode、version
- 调用:invokeFutureMap
- 附着:attributes
- 引用:referenceCount、id2PoolKey、poolKeys
这里提一下 protocolCode 和 version,版本信息会被携带至对端,用于连接的协商。总的来说,通过对于 Channel 的包装,Connection 提供了丰富的上下文及引用信息,是 SOFABolt 连接管理的直接对象。
3.3 ConnectionFactory
SOFARegistry建立了ConnectionFactory 连接工厂,负责创建连接、检测连接等。
这里我对Connection进行了种类,分类是我从业务角度出发,强行分为三种Connection,只是为了讲解方便。
- MetaServerConnectionFactory,是Meta Server的连接,用来和Meta Server交互。
- DataServerConnectionFactory ,是其他dataServer的连接,用来数据同步;
- SessionServerConnectionFactory,是Session server的连接,这个非常复杂,后续会重点讲解。
3.4 MetaServerConnectionFactory
MetaServerConnectionFactory 就是用来对com.alipay.remoting.Connection
进行连接管理。
其核心变量是一个双层Map,可以理解为一个矩阵,其维度是 Map<dataCenter, Map。
其内部函数比较简单,望名生意。
public class MetaServerConnectionFactory { private final Map<String, Map> MAP = new ConcurrentHashMap<>(); /** * @param dataCenter * @param ip * @param connection */ public void register(String dataCenter, String ip, Connection connection) { MapconnectionMap = MAP.get(dataCenter); if (connectionMap == null) { MapnewConnectionMap = new ConcurrentHashMap<>(); connectionMap = MAP.putIfAbsent(dataCenter, newConnectionMap); if (connectionMap == null) { connectionMap = newConnectionMap; } } connectionMap.put(ip, connection); } /** * @param dataCenter * @param ip */ public Connection getConnection(String dataCenter, String ip) { if (MAP.containsKey(dataCenter)) { Mapmap = MAP.get(dataCenter); if (map.containsKey(ip)) { return map.get(ip); } } return null; } /** * @param dataCenter */ public MapgetConnections(String dataCenter) { if (MAP.containsKey(dataCenter)) { return MAP.get(dataCenter); } return new HashMap<>(); } /** * @param dataCenter */ public SetgetIps(String dataCenter) { if (MAP.containsKey(dataCenter)) { Mapmap = MAP.get(dataCenter); if (map != null) { return map.keySet(); } } return new HashSet<>(); } /** * @param dataCenter */ public void remove(String dataCenter) { Mapmap = getConnections(dataCenter); if (!map.isEmpty()) { for (Connection connection : map.values()) { if (connection.isFine()) { connection.close(); } } } MAP.remove(dataCenter); } /** * @param dataCenter * @param ip */ public void remove(String dataCenter, String ip) { if (MAP.containsKey(dataCenter)) { Mapmap = MAP.get(dataCenter); if (map != null) { map.remove(ip); } } } public SetgetAllDataCenters() { return MAP.keySet(); } }
3.5 DataServerConnectionFactory
DataServerConnectionFactory 就是用来对com.alipay.remoting.Connection
进行连接管理。
其核心变量是以ip:port作为key,Connection作为value的一个Map。
其内部函数比较简单,望名生意。
import com.alipay.remoting.Connection; /** * the factory to hold connections that other dataservers connected to local server */ public class DataServerConnectionFactory { /** * collection of connections * key:connectId ip:port */ private final MapMAP = new ConcurrentHashMap<>(); /** * register connection * * @param connection */ public void register(Connection connection) { MAP.put(getConnectId(connection), connection); } /** * remove connection by specific ip+port * * @param connection */ public void remove(Connection connection) { MAP.remove(getConnectId(connection)); } /** * get connection by ip * * @param ip * @return */ public Connection getConnection(String ip) { return MAP.values().stream().filter(connection -> ip.equals(connection.getRemoteIP()) && connection.isFine()).findFirst().orElse(null); } private String getConnectId(Connection connection) { return connection.getRemoteIP() + ":" + connection.getRemotePort(); } }
3.5.1 注册
当需要管理连接时候,可以通过如下来进行注册。
public void connected(Channel channel) throws RemotingException { super.connected(channel); dataServerConnectionFactory.register(((BoltChannel) channel).getConnection()); }
这样往ConcurrentHashMap client放入,是根据IP和port构建了url,然后url作为key。
3.5.2 获取
com.alipay.remoting.Connection
可以通过Channel进行获取。
DataNodeExchanger就采用如下方式获取Client。
conn = ((BoltChannel) dataNodeExchanger.connect(new URL(ip, dataServerConfig .getSyncDataPort()))).getConnection();
3.5.3 DataSyncServerConnectionHandler
有了注册与获取,接下来我们看看连接事件响应。
DataSyncServerConnectionHandler 是 server 的handler。
前文提到了,DataSyncServerConnectionHandler是连接事件处理器 (ConnectionEventProcessor),用来监听建连事件(ConnectionEventType.CONNECT)与断连事件(ConnectionEventType.CLOSE)。
这里就是针对各种事件,简单的对Connection做相应维护。
public class DataSyncServerConnectionHandler extends AbstractServerHandler { @Autowired private DataServerConnectionFactory dataServerConnectionFactory; @Override public ChannelHandler.HandlerType getType() { return ChannelHandler.HandlerType.LISENTER; } @Override public void connected(Channel channel) throws RemotingException { super.connected(channel); dataServerConnectionFactory.register(((BoltChannel) channel).getConnection()); } @Override public void disconnected(Channel channel) throws RemotingException { super.disconnected(channel); dataServerConnectionFactory.remove(((BoltChannel) channel).getConnection()); } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
3.6 SessionServerConnectionFactory
SessionServerConnectionFactory 包括复杂的逻辑。
3.6.1 问题
回顾前面问题:
- 究竟什么可以唯一标示一个SessionServer?
- 什么可以唯一标示一个Publisher?
- ip : port?或者其他?
- 业务上有没有特殊考虑的需要?
下面我们就一一看看阿里如何处理。
3.6.2 逻辑概念和关系
首先要讲讲阿里的几个逻辑概念:
- process Id 代表了Session Server,格式是类似uid的构建,每个Session Server有一个唯一的process Id,Session Server与process Id是一对一的关系;
- Connection 就是一个 Session Server 和 Data Server 之间的 Connection;
- connect Id 代表了Publisher,格式是 ip : port。connect Id与Publisher是一对一的关系;
- 一个Session Server包括许多Publiser,即许多connection id;
- Session Server address 是一个 ip : port 的组合,代表一个 Connection 的 session server 那一端;
- 一个 Session Server 可能对于一个data Server有多个连接;这个目前原因不知,没有发现业务原因,可能推测如下:因为连接敏感性,网络不稳定性,所以SOFABolt重连时候会选择一个新端口,所以会有多个Connection存在。所以一个processID对应多个sessionConnAddress;
具体就是,SOFARegistry 将服务数据 (PublisherRegister) 和 服务发布者 (Publisher) 的连接的生命周期绑定在一起:每个 PublisherRegister 定义属性 connId,connId 由注册本次服务的 Publisher 的连接标识 (IP 和 Port)构成,也就是只要该 Publisher 和 SessionServer 断连,服务信息数据即失效。客户端重新建连成功后重新注册服务数据,重新注册的服务数据会被当成新的数据,考虑更换长连接后 Publisher 的 connId 是 Renew 新生成的。
3.6.3 示例图
我们假设一个Session server内部有两个 Publisher,都连接到一个Data Server上。
这些 address 格式都是 ip : port,举例如下:
SessionServer address 1 是 :1.1.2.3 : 1
SessionServer address 2 是 :1.1.2.3 : 2
SessionServer address 3 是 :1.1.2.3 : 3
SessionServer address 4 是 :1.1.2.3 : 4
DataServer address 1 是 :2.2.2.3 : 1
DataServer address 2 是 :2.2.2.3 : 2
具体逻辑如图:
+----------+ +----------+ | Client | | Client | +----+-----+ +----+-----+ | | | | | | | | | SessionServer address 1 | SessionServer address 2 v v +--------+-----------------------------------+----------------+ | Session Server(process Id) | | | | +------------------------+ +-----------------------+ | | | Publisher(connect Id) | ... | Publisher(connect Id) | | | +------------------------+ +-----------------------+ | +-------------------------------------------------------------+ | SessionServer address 3 | SessionServer address 4 | | | | | | | | +----------> +---------------+ <---------+ DataServer address 1 | Data Server | DataServer address 2 +---------------+
3.6.4 主要变量
所以,SessionServerConnectionFactory的几个变量就对应了上述这些逻辑关系,具体如下:
- SESSION_CONN_PROCESS_ID_MAP : Map
- PROCESS_ID_CONNECT_ID_MAP : Map<SessionServer processId, Set
- PROCESS_ID_SESSION_CONN_MAP : Map
这些都代表了本 Data Server 和 其 Session Server 之间的关系。
+-----------------------------------------------------------------------------------------+ +--------------------------------+ | SessionServerConnectionFactory | | SessionServer | | | | | | | | +-------------------------+ | | +---------------------------------------------------------+ | | | SessionServer address | | | | SESSION_CONN_PROCESS_ID_MAP | | | | | | | | | | | | +----------------+ | | | | | +----------------------------------->+ | process Id | | | | | Map| | | +-------------------------+ | | | | | | | | | | +---------------------------------------------------------+ | | | Publisher | | | | | +--+-------------+ | | | | ^ | | +---------------------------------------------------------+ | | | | | | PROCESS_ID_CONNECT_ID_MAP | | +------------------------+-------+ | | | | | ^ | | Map<SessionServer processId, Set> +-------------------------------------------+ | | | | | | | +---------------------------------------------------------+ | | | | | | | | | +------------------------------------------------------------------------------------+ | +------------+ | | |PROCESS_ID_SESSION_CONN_MAP +-------> | Connection +---------+ | | | | +------------+ | | | | | |Map| | | | | | | +------------------------------------------------------------------------------------+ | +-----------------------------------------------------------------------------------------+
手机上如下图:
具体类定义如下:
public class SessionServerConnectionFactory { private static final int DELAY = 30 * 1000; private static final Map EMPTY_MAP = new HashMap(0); /** * key : SessionServer address * value: SessionServer processId */ private final MapSESSION_CONN_PROCESS_ID_MAP = new ConcurrentHashMap<>(); /** * key : SessionServer processId * value: ip:port of clients */ private final Map<String, Set> PROCESS_ID_CONNECT_ID_MAP = new ConcurrentHashMap<>(); /** * key : SessionServer processId * value: pair(SessionServer address, SessionServer connection) */ private final MapPROCESS_ID_SESSION_CONN_MAP = new ConcurrentHashMap<>(); @Autowired private DisconnectEventHandler disconnectEventHandler; }
3.6.5 Pair
这是SessionServerConnectionFactory的内部类。
PROCESS_ID_SESSION_CONN_MAP是 Map<SessionServer processId, pair(SessionServer address, SessionServer connection)>,代表了一个 Session Server 包括哪些Connection,每个Connection 被其Session Server 端的address 唯一确定。
Pair就是SessionServer address, SessionServer connection的组合,定义如下:
private static class Pair { private AtomicInteger roundRobin = new AtomicInteger(-1); private Mapconnections; private String lastDisconnectedSession; private Pair(Mapconnections) { this.connections = connections; } @Override public boolean equals(Object o) { return connections.equals(((Pair) o).getConnections()) && (((Pair) o).lastDisconnectedSession.equals(lastDisconnectedSession)); } /** * Getter method for property connections. * @return property value of connections */ private MapgetConnections() { return connections; } }
当生成时,Session Server 端的address,这是由InetSocketAddress转换而来。此类用于实现 IP 套接字地址 (IP 地址+端口号),用于socket 通信;
public void registerSession(String processId, SetconnectIds, Connection connection) { String sessionConnAddress = NetUtil.toAddressString(connection.getRemoteAddress()); SESSION_CONN_PROCESS_ID_MAP.put(sessionConnAddress, processId); SetconnectIdSet = PROCESS_ID_CONNECT_ID_MAP .computeIfAbsent(processId, k -> ConcurrentHashMap.newKeySet()); connectIdSet.addAll(connectIds); Pair pair = PROCESS_ID_SESSION_CONN_MAP.computeIfAbsent(processId, k -> new Pair(new ConcurrentHashMap<>())); pair.getConnections().put(sessionConnAddress, connection); }
3.6.6 processId
processId是在Session Server之中生成,可以看出,是IP,时间戳,循环递增整数构建。这样就可以唯一确定一个SessionServer。
public class SessionProcessIdGenerator { /** * Generate session processId. */ public static String generate() { String localIp = NetUtil.getLocalSocketAddress().getAddress().getHostAddress(); if (localIp != null && !localIp.isEmpty()) { return getId(getIPHex(localIp), System.currentTimeMillis(), getNextId()); } return EMPTY_STRING; } }
3.7 SessionServerConnectionFactory业务流程
因为高层连接管理与业务密切耦合,所以我们接下来分析业务。看看调用 SessionServerConnectionFactory的业务流程。
具体registerSession从何处调用,这就涉及到两个消息:SessionServerRegisterRequest 和PublishDataRequest。即有两个途径会调用。而且业务涉及到Session Server与DataServer。
3.7.1 SessionServerRegisterRequest
当重新连接的时候,会统一注册 Session Server 本身包含的所有Publisher。对应在Session Server之中,如下可以看到:
- 从sessionServer获取connectIds。
- 建立SessionServerRegisterRequest,然后发送。
代码如下:
public class SessionRegisterDataTask extends AbstractSessionTask { @Override public void setTaskEvent(TaskEvent taskEvent) { //taskId create from event if (taskEvent.getTaskId() != null) { setTaskId(taskEvent.getTaskId()); } Object obj = taskEvent.getEventObj(); if (obj instanceof BoltChannel) { this.channel = (BoltChannel) obj; } Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort()); if (sessionServer != null) { Collectionchs = sessionServer.getChannels(); SetconnectIds = new HashSet<>(); chs.forEach(channel -> connectIds.add(NetUtil.toAddressString(channel.getRemoteAddress()))); sessionServerRegisterRequest = new SessionServerRegisterRequest( SessionProcessIdGenerator.getSessionProcessId(), connectIds); } } }
来到DataServer,SessionServerRegisterHandler会进行处理调用,用到了sessionServerConnectionFactory。
public class SessionServerRegisterHandler extends AbstractServerHandler{ @Override public Object doHandle(Channel channel, SessionServerRegisterRequest request) { SetconnectIds = request.getConnectIds(); if (connectIds == null) { connectIds = new HashSet<>(); } sessionServerConnectionFactory.registerSession(request.getProcessId(), connectIds, ((BoltChannel) channel).getConnection()); return CommonResponse.buildSucce***esponse(); } }
3.7.2 PublishDataRequest
当注册Publisher时候。在Session Server之中,可以看到建立了请求。
private RequestbuildPublishDataRequest(Publisher publisher) { return new Request() { private AtomicInteger retryTimes = new AtomicInteger(); @Override public PublishDataRequest getRequestBody() { PublishDataRequest publishDataRequest = new PublishDataRequest(); publishDataRequest.setPublisher(publisher); publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator .getSessionProcessId()); return publishDataRequest; } @Override public URL getRequestUrl() { return getUrl(publisher.getDataInfoId()); } @Override public AtomicInteger getRetryTimes() { return retryTimes; } }; }
在data server之中,会调用处理,用到了sessionServerConnectionFactory。
public class PublishDataHandler extends AbstractServerHandler{ @Override public Object doHandle(Channel channel, PublishDataRequest request) { Publisher publisher = Publisher.internPublisher(request.getPublisher()); if (forwardService.needForward()) { CommonResponse response = new CommonResponse(); response.setSuccess(false); response.setMessage("Request refused, Server status is not working"); return response; } dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter()); if (publisher.getPublishType() != PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId); // record the renew timestamp datumLeaseManager.renew(connectId); } return CommonResponse.buildSucce***esponse(); } }
3.7.3 DatumLeaseManager
上述代码提到了DatumLeaseManager,这里可以看到就是对connectId,即Publisher进行续约:
connectIdRenewTimestampMap : 记录了renew时间;
locksForConnectId :只有一个task能够更新;
renew 函数记录本次renew时间戳,启动evict task,如果到期没有renew,就去除。
public class DatumLeaseManager implements AfterWorkingProcess { /** record the latest heartbeat time for each connectId, format: connectId -> lastRenewTimestamp */ private final MapconnectIdRenewTimestampMap = new ConcurrentHashMap<>(); /** lock for connectId , format: connectId -> true */ private ConcurrentHashMaplocksForConnectId = new ConcurrentHashMap(); /** * record the renew timestamp */ public void renew(String connectId) { // record the renew timestamp connectIdRenewTimestampMap.put(connectId, System.currentTimeMillis()); // try to trigger evict task scheduleEvictTask(connectId, 0); } }
0x04 节点管理
除了具体连接之外,SOFARegistry也对Data 节点进行另一个维度的连接管理。具体在DataServerNodeFactory完成。
4.1 DataServerNodeFactory
4.1.1 DataServerNode
就是简单的数据结构,没有建立Bean。
public class DataServerNode implements HashNode { private String ip; private String dataCenter; private Connection connection; }
4.1.2 DataServerNodeFactory
对应Node的连接管理 则是 DataServerNodeFactory。
在具体模块控制上,DataServerNodeFactory拥有自己的Bean。DataServerConnectionFactory 则全部是Static类型,直接static使用。
DataServerNodeFactory的关键变量有两个:
- MAP是以dataCenter和ip作为维度的一个Node矩阵,是数据节点相关数据;
- CONSISTENT_HASH_MAP则是用dataCenter作为key,ConsistentHash
具体定义如下:
public class DataServerNodeFactory { /** * row: dataCenter * column: ip * value dataServerNode */ private static final Map<String, Map> MAP = new ConcurrentHashMap<>(); /** * key: dataCenter * value: consistentHash */ private static final Map<String, ConsistentHash> CONSISTENT_HASH_MAP = new ConcurrentHashMap<>(); }
4.2 业务流程
4.2.1 注册
具体在LocalDataServerChangeEventHandler 和 DataServerChangeEventHandler 全都有涉及。
public class LocalDataServerChangeEventHandler extends AbstractEventHandler{ private void connectDataServer(String dataCenter, String ip) { Connection conn = null; for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { conn = ((BoltChannel) dataNodeExchanger.connect(new URL(ip, dataServerConfig.getSyncDataPort()))).getConnection(); break; } } //maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); } } }
以及
public class DataServerChangeEventHandler extends AbstractEventHandler{ private void connectDataServer(String dataCenter, String ip) { Connection conn = null; for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { conn = ((BoltChannel) dataNodeExchanger.connect(new URL(ip, dataServerConfig .getSyncDataPort()))).getConnection(); break; } catch (Exception e) { TimeUtil.randomDelay(3000); } } //maybe get dataNode from metaServer,current has not start! register dataNode info to factory,wait for connect task next execute DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); } }
4.2.2 使用
使用就是从MAP与CONSISTENT_HASH_MAP中提取Node,这里把从CONSISTENT_HASH_MAP提取的代码摘录如下:
/** * get dataserver by specific datacenter and dataInfoId * * @param dataCenter * @param dataInfoId * @return */ public static DataServerNode computeDataServerNode(String dataCenter, String dataInfoId) { ConsistentHashconsistentHash = CONSISTENT_HASH_MAP.get(dataCenter); if (consistentHash != null) { return consistentHash.getNodeFor(dataInfoId); } return null; } public static ListcomputeDataServerNodes(String dataCenter, String dataInfoId, int backupNodes) { ConsistentHashconsistentHash = CONSISTENT_HASH_MAP.get(dataCenter); if (consistentHash != null) { return consistentHash.getNUniqueNodesFor(dataInfoId, backupNodes); } return null; }
0x05 总结
关于连接管理,SOFARegistry有两维度层次的连接管理,分别是 Connection 和 Node。
因为SOFABolt底层已经做了底层连接管理,所以SOFARegistry只要做顶层部分连接管理即可,就是从业务角度区分注册,保存,获取连接。具体就是:
- Connection 就是一个 Session Server 和 Data Server 之间的 Connection;
- 一个 Session Server 可能对于一个data Server有多个连接;
- 一个Session Server包括许多Publiser;
- Connection与Publiser一一对应;
SOFARegistry 将服务数据 (PublisherRegister) 和 服务发布者 (Publisher) 的连接的生命周期绑定在一起:每个 PublisherRegister 定义属性 connId,connId 由注册本次服务的 Publisher 的连接标识 (IP 和 Port)构成。
只要该 Publisher 和 SessionServer 断连,服务信息数据即失效。客户端重新建连成功后重新注册服务数据,重新注册的服务数据会被当成新的数据,考虑更换长连接后 Publisher 的 connId 是 Renew 新生成的。
如下图所示:
+----------+ +----------+ | Client | | Client | +----+-----+ +----+-----+ | | | | | | | | | | | | +-------------------------------------------------------------+ | | Session Server(process Id) | | | v v | | +------+-----------------+ +--------+--------------+ | | | Publisher(connect Id) | ... | Publisher(connect Id) | | | +------------------------+ +-----------------------+ | +-------------------------------------------------------------+ | | | | | Connection Connection | | | | | | | v v +---------------------+------------------------------------------+--------------------+ | Data Server | | | | Map| | | | Map<SessionServer processId, Set> | | | | Map| | | +-------------------------------------------------------------------------------------+
0xFF 参考
https://timyang.net/architecture/cell-distributed-system/
SOFABolt 源码分析12 - Connection 连接管理设计
SOFABolt 源码分析2 - RpcServer 服务端启动的设计
SOFABolt 源码分析3 - RpcClient 客户端启动的设计
这篇关于[从源码学设计]蚂蚁金服SOFARegistry网络操作之连接管理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南