七:服务暴露剖析(三)
2022/1/7 6:06:14
本文主要是介绍七:服务暴露剖析(三),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
服务暴露
远程服务暴露
源码讲解:
直接从ServiceConfig的exportUrl开始讲解
//org.apache.dubbo.config.ServiceConfig#exportUrl private void exportUrl(URL url, List<URL> registryURLs) { String scope = url.getParameter(SCOPE_KEY); // don't export when none is configured if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { //本地暴露 exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { //远程暴露,执行到这里 url = exportRemote(url, registryURLs); MetadataUtils.publishServiceDefinition(url); } } this.urls.add(url); } private URL exportRemote(URL url, List<URL> registryURLs) { if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) { url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true"); } //if protocol is only injvm ,not register if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL); if (monitorUrl != null) { url = url.putAttribute(MONITOR_KEY, monitorUrl); } // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress()); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey()); } } doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true); } } else { if (MetadataService.class.getName().equals(url.getServiceInterface())) { localMetadataService.setMetadataServiceURL(url); } if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } //执行到这里 doExportUrl(url, true); } return url; } private void doExportUrl(URL url, boolean withMetaData) { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); if (withMetaData) { invoker = new DelegateProviderMetaDataInvoker(invoker, this); } //执行到这里,调用RegistryProtocol的export Exporter<?> exporter = protocolSPI.export(invoker); exporters.add(exporter); }
//org.apache.dubbo.registry.integration.RegistryProtocol#export @Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners(); overrideListeners.put(registryUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //我们主要看这里,这里是 服务端 进行服务暴露的入口 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(registryUrl); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { register(registry, registeredProviderUrl); } // register stated url on provider model registerStatedUrl(registryUrl, registeredProviderUrl, register); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); notifyExport(exporter); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { //判断是否已经暴露过, key的内容为:com.jiangzheng.course.dubbo.api.service.ServiceDemo:20880 String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); //执行到此处的protocol.export方法,最终调用DubboProtocol类 return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { checkDestroyed(); URL url = invoker.getUrl(); // 将业务调用的实现类或代理类相关 放入到缓存里 String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } } //指定到这里,开启服务 openServer(url); optimizeSerialization(url); return exporter; } private void openServer(URL url) { checkDestroyed(); // 获取本地地址或域名 String key = url.getAddress(); //客户端可以导出仅由服务器调用的服务 boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { //判断是否已经暴露过了 ProtocolServer server = serverMap.get(key); if (server == null) {//没有的话向下执行 synchronized (this) { server = serverMap.get(key); if (server == null) { //执行到这里,此处调用createServer方法进行创建服务 serverMap.put(key, createServer(url)); }else { server.reset(url); } } } else { // server supports reset, use together with override server.reset(url); } } } private ProtocolServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } //上面进行url的创建 ExchangeServer server; try { //执行到此处,调用Exchangers.bind方法 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } DubboProtocolServer protocolServer = new DubboProtocolServer(server); loadServerProperties(protocolServer); return protocolServer; }
//org.apache.dubbo.remoting.exchange.Exchangers#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler) public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //执行到此处,调用bind方法 return getExchanger(url).bind(url, handler); }
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //执行到此处,调用Transporters.bind方法 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
//org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...) public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //通过getTransporter获取到Transporter后调用 bind return getTransporter(url).bind(url, handler); } //此处通过SPI获取到了 org.apache.dubbo.remoting.transport.netty4.NettyTransporter public static Transporter getTransporter(URL url) { return url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
//org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind @Override public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException { //创建netty服务 return new NettyServer(url, handler); }
//org.apache.dubbo.remoting.transport.netty4.NettyServer#NettyServer public NettyServer(URL url, ChannelHandler handler) throws RemotingException { //调用supper super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); // read config before destroy serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel()); }
//org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); //获取本机地址 localAddress = getUrl().toInetSocketAddress(); //服务绑定的IP的信息 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); //服务绑定的端口号的信息 int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); try { //执行到这里,此处是现在NettyServer中 doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } executor = executorRepository.createExecutorIfAbsent(url); }
//org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen @Override protected void doOpen() throws Throwable { //netty固定api写法 bootstrap = new ServerBootstrap(); //线程数为1,主要用于netty的端口监听 bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME); //int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32); //线程数设置与CPU数量是相关的,主要用于真实处理调用请求的 workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), EVENT_LOOP_WORKER_POOL_NAME); //此处与业务相关,放于之后进行讲解,此处先忽略 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE); //bootstrap相关设置 bootstrap.group(bossGroup, workerGroup) //NettyEventLoopFactory.serverSocketChannelClass()此方法用于返回使用epoll还是NIO模式 .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_KEEPALIVE, keepalive) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override //当有channel过来的时候进行处理 protected void initChannel(SocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); //此处用于配置decoder和encoder NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {//是否需要开启SSL ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl())); } ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
netty启动之后,还需要维护它,如果服务挂掉 需要有一定的重启机制,所以我们看下 之前调用的地方org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //上面我们说的是 Transporters.bind 的逻辑,我们看下 new HeaderExchangeServer的逻辑 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeServer#HeaderExchangeServer public HeaderExchangeServer(RemotingServer server) { Assert.notNull(server, "server == null"); this.server = server; //执行到此处 startIdleCheckTask(getUrl()); } private void startIdleCheckTask(URL url) { if (!server.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); //获取一个超时时间,默认为3分钟 int idleTimeout = getIdleTimeout(url); //因为重试3次,所以此处获取获取心跳时间间隔,所以默认为1分钟 long idleTimeoutTick = calculateLeastDuration(idleTimeout); CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); //创建一个定时任务,每分钟获取下心跳,如果超过3分钟没有心跳 则服务重启 this.closeTimer = IDLE_CHECK_TIMER.get().newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); } }
这篇关于七:服务暴露剖析(三)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南