Dubbo源码解析-Consumer发送请求全过程
2021/11/30 12:36:31
本文主要是介绍Dubbo源码解析-Consumer发送请求全过程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
前言:
之前的文章已经从调用结构方面从前到后整个梳理了一下全过程。
本篇就从实战调用角度来分析下整个过程,之前是抽象,现在就是实战。
1.示例代码
代码的话跟之前是一样的,笔者在这里再贴一下
1.1 provider
public class ProviderApplication { public static void main(String[] args) throws Exception { ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>(); service.setInterface(DemoService.class); service.setRef(new DemoServiceImpl()); service.setApplication(new ApplicationConfig("dubbo-demo-api-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); service.export(); System.out.println("dubbo service started"); new CountDownLatch(1).await(); } }
1.2 comsumer
public class ConsumerApplication { public static void main(String[] args) { ReferenceConfig<DemoService> reference = new ReferenceConfig<>(); reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer")); reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); reference.setInterface(DemoService.class); reference.setScope("remote"); DemoService service = reference.get(); String message = service.sayHello("dubbo"); System.out.println(message); } }
2.消费者调用全过程
之前的博客已经分析过主要的步骤,在这里笔者快速过一下
我们就从String message = service.sayHello("dubbo"); 这句调用开始
2.1 service即Proxy0
通过javassist创建的Proxy0继承了Proxy抽象类。简略内容如下所示:
public final class $Proxy0 extends Proxy implements Subject { private static Method m1; private static Method m2; private static Method m3; private static Method m0; public $Proxy0(InvocationHandler paramInvocationHandler) { super(paramInvocationHandler); } @Override public final String sayHello(String paramString) { try { // 本质上是对InvocationHandler的调用 return (String) this.h.invoke(this, m3, new Object[]{paramString}); } catch (Error | RuntimeException localError) { throw localError; } catch (Throwable localThrowable) { throw new UndeclaredThrowableException(localThrowable); } } }
Proxy代理类对方法的调用,最终都反映到InvokerInvocationHandler的调用上
2.2 InvokerInvocationHandler.invoke()
public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; private ConsumerModel consumerModel; ... @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); ... // 将请求的所有信息都包装进来,请求方法、接口信息、参数信息 RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); if (consumerModel != null) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } // 这里的invoker=MockClusterInvoker return invoker.invoke(rpcInvocation).recreate(); } }
请求体所有信息都包装在RpcInvocation,交由下一个Invoker(MockClusterInvoker)处理
2.3 ClusterInvoker.invoke()
ClusterInvoker的调用链为:MockClusterInvoker --> FailoverClusterInvoker
MockClusterInvoker主要作用就是进行Mock访问,这个我们后续再仔细说明;
FailoverClusterInvoker意义比较重大,它提供的是一种消费者调用时的集群容错方案,在多个服务提供者情况下,当前消费者调用A提供者失败时,会自动切换到B提供者再次调用,最多重试N次。
当然除了FailoverClusterInvoker,还有其他多种集群容错方案可供选择,这个后续会有系列文章进行说明。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); // 重试次数 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } ... for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); checkInvokers(copyInvokers, invocation); } // 选择合适的dubbo provider Invoker Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 针对这个Invoker发起调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { ... } return result; // 异常,则再次进入循环,进入下一次调用 } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } ... } }
在ClusterInvoker的包装下,完成了Dubbo集群容错与负载均衡策略的实现。后续会更加详细的对这两方面进行介绍。
2.4 ProtocolFilterWrapper.invoke()
Filter包装类的调用,在真正调用到DubboProtocol之前,会先经过一系列的Filter的调用
public class ProtocolFilterWrapper implements Protocol { private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 通过SPI的方式获取所有的Filter(属于Consumer组) List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { ... @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { // 逐个调用Filter asyncResult = filter.invoke(next, invocation); } ... }; } } return last; } }
似乎每个框架都有Filter层,可以在真正调用前做一些全局操作,后续会有专门的Filter专题来介绍,这里我们知道即可。
2.5 ListenerInvokerWrapper.invoke()
public class ListenerInvokerWrapper<T> implements Invoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); } }
主要是创建一系列的监听器,后续分析
2.6 同步?异步调用?
public class AsyncToSyncInvoker<T> implements Invoker<T> { private Invoker<T> invoker; ... @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { // 如果是同步调用,则一直等待结果集 if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } ... return asyncResult; } }
Dubbo为了提高性能,提供了异步调用方式,后续专门介绍。
默认都是同步调用,会一直等待结果集
2.7 DubboInvoker.doInvoker()
最终执行到真正的调用类
public class DubboInvoker<T> extends AbstractInvoker<T> { protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); // 轮询获取可用client ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 最终通过调用currentClient.send()来进行同步或异步调用 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }
2.8 HeaderExchangeChannel.request()
final class HeaderExchangeChannel implements ExchangeChannel { public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // 创建request对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); // 主要内容就是request,包含了接口名、方法名、参数信息 req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); try { // 交由channel发送出去 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } }
快到尾声了,后续的具体发送工作交由NettyClient执行(默认)
2.9 NettyClient.send()
public class NettyClient extends AbstractClient { public void send(Object message, boolean sent) throws RemotingException { // 未创建连接的情况下,则先创建连接 if (needReconnect && !isConnected()) { connect(); } Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } // 最后交由channel发送出去 channel.send(message, sent); } }
2.10 NettyChannel.send()
final class NettyChannel extends AbstractChannel { public void send(Object message, boolean sent) throws RemotingException { // whether the channel is closed super.send(message, sent); boolean success = true; int timeout = 0; try { // 最终通过channel发送出去 ChannelFuture future = channel.writeAndFlush(message); if (sent) { // wait timeout ms timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } } }
最终,还是通过Netty的channel将请求发送出去了
过程有点长,通过时序图来展示下
这篇关于Dubbo源码解析-Consumer发送请求全过程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享