2021/11/30 12:36:31
1.1 provider
public class ProviderApplication { public static void main(String[] args) throws Exception { ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>(); service.setInterface(DemoService.class); service.setRef(new DemoServiceImpl()); service.setApplication(new ApplicationConfig("dubbo-demo-api-provider")); service.setRegistry(new RegistryConfig("zookeeper://")); service.export(); System.out.println("dubbo service started"); new CountDownLatch(1).await(); } }
1.2 comsumer
public class ConsumerApplication { public static void main(String[] args) { ReferenceConfig<DemoService> reference = new ReferenceConfig<>(); reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer")); reference.setRegistry(new RegistryConfig("zookeeper://")); reference.setInterface(DemoService.class); reference.setScope("remote"); DemoService service = reference.get(); String message = service.sayHello("dubbo"); System.out.println(message); } }
我们就从String message = service.sayHello("dubbo"); 这句调用开始
2.1 service即Proxy0
public final class $Proxy0 extends Proxy implements Subject { private static Method m1; private static Method m2; private static Method m3; private static Method m0; public $Proxy0(InvocationHandler paramInvocationHandler) { super(paramInvocationHandler); } @Override public final String sayHello(String paramString) { try { // 本质上是对InvocationHandler的调用 return (String) this.h.invoke(this, m3, new Object[]{paramString}); } catch (Error | RuntimeException localError) { throw localError; } catch (Throwable localThrowable) { throw new UndeclaredThrowableException(localThrowable); } } }
2.2 InvokerInvocationHandler.invoke()
public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; private ConsumerModel consumerModel; ... @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); ... // 将请求的所有信息都包装进来,请求方法、接口信息、参数信息 RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); if (consumerModel != null) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } // 这里的invoker=MockClusterInvoker return invoker.invoke(rpcInvocation).recreate(); } }
2.3 ClusterInvoker.invoke()
ClusterInvoker的调用链为:MockClusterInvoker --> FailoverClusterInvoker
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); // 重试次数 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } ... for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); checkInvokers(copyInvokers, invocation); } // 选择合适的dubbo provider Invoker Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 针对这个Invoker发起调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { ... } return result; // 异常,则再次进入循环,进入下一次调用 } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } ... } }
2.4 ProtocolFilterWrapper.invoke()
public class ProtocolFilterWrapper implements Protocol { private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 通过SPI的方式获取所有的Filter(属于Consumer组) List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { ... @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { // 逐个调用Filter asyncResult = filter.invoke(next, invocation); } ... }; } } return last; } }
2.5 ListenerInvokerWrapper.invoke()
public class ListenerInvokerWrapper<T> implements Invoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); } }
2.6 同步?异步调用?
public class AsyncToSyncInvoker<T> implements Invoker<T> { private Invoker<T> invoker; ... @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { // 如果是同步调用,则一直等待结果集 if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } ... return asyncResult; } }
2.7 DubboInvoker.doInvoker()
public class DubboInvoker<T> extends AbstractInvoker<T> { protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); // 轮询获取可用client ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 最终通过调用currentClient.send()来进行同步或异步调用 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }
2.8 HeaderExchangeChannel.request()
final class HeaderExchangeChannel implements ExchangeChannel { public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // 创建request对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); // 主要内容就是request,包含了接口名、方法名、参数信息 req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); try { // 交由channel发送出去 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } }
2.9 NettyClient.send()
public class NettyClient extends AbstractClient { public void send(Object message, boolean sent) throws RemotingException { // 未创建连接的情况下,则先创建连接 if (needReconnect && !isConnected()) { connect(); } Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } // 最后交由channel发送出去 channel.send(message, sent); } }
2.10 NettyChannel.send()
final class NettyChannel extends AbstractChannel { public void send(Object message, boolean sent) throws RemotingException { // whether the channel is closed super.send(message, sent); boolean success = true; int timeout = 0; try { // 最终通过channel发送出去 ChannelFuture future = channel.writeAndFlush(message); if (sent) { // wait timeout ms timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } } }
