springcloud ribbon @LoadBalance负载均衡源码流程分析
2021/12/13 14:22:18
本文主要是介绍springcloud ribbon @LoadBalance负载均衡源码流程分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、编写示例
1.服务端
pom.xml
<properties> <java.version>1.8</java.version> <spring-cloud.version>Finchley.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>0.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
2.提供者代码,就是一个普通的HTTP REST接口。
@SpringBootApplication @EnableDiscoveryClient @RestController @Slf4j public class ProviderApplication { public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); } @GetMapping("provider") public String provider(){ log.info(".................................."); return "hello,provider"; } }
3.消费端
POM
<properties> <java.version>1.8</java.version> <spring-cloud.version>Finchley.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <exclusions> <exclusion> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- <dependency>--> <!-- <groupId>org.springframework.retry</groupId>--> <!-- <artifactId>spring-retry</artifactId>--> <!-- </dependency>--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--<dependency>--> <!--<groupId>org.springframework.retry</groupId>--> <!--<artifactId>spring-retry</artifactId>--> <!--</dependency>--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>0.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
这里注意,如果不想要RetryTemplate,就去掉Retry的依赖,否则默认会引入的。
消费端代码
@SpringBootApplication @Slf4j @EnableDiscoveryClient @RestController public class ConsumeApplication { @Autowired RestTemplate restTemplate; @Bean @LoadBalanced public RestTemplate getRestTemplate(){ HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(); clientHttpRequestFactory.setConnectTimeout(5 * 1000); clientHttpRequestFactory.setReadTimeout(5 * 1000); return new RestTemplate(clientHttpRequestFactory); } public static void main(String[] args) { SpringApplication.run(ConsumeApplication.class, args); } @GetMapping("test") public String getContent(){ log.info("发起请求"); return restTemplate.getForObject("http://provider/provider",String.class); } }
二、初始化自动配置代码分析
1.org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration,这个为RIBBON私有的
LoadBalancerClient,LoadBalancer,IRule对象的初始化。
@Configuration @ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class}) @RibbonClients @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class}) @EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class}) public class RibbonAutoConfiguration { @Bean public SpringClientFactory springClientFactory() { SpringClientFactory factory = new SpringClientFactory(); factory.setConfigurations(this.configurations); return factory; } @Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } @Bean @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") @ConditionalOnMissingBean public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) { return new RibbonLoadBalancedRetryFactory(clientFactory); }
2.org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration为springcloud-common的对象自动初始化。
@Configuration @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } }); } @Autowired(required = false) private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, transformers); } @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } }
这里的 restTemplate只会取加了@LoadBalance注解的,因为@LoadBalance加上了@Qualifer,这个注解会有筛选过滤作用。
去除了RetryTemplate就会走默认,否则走重试balance.
1.首先创建LoadBalancerRequestFactory工厂对象。
2.创建LoadBalancerInterceptor对象
3.对RestTemplate列表对象,设置拦截器属性。
三、调用分析
1.RestTemplate.getObject方法开始。这个方法会调用doExecute
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations { @Nullable protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException { Assert.notNull(url, "URI is required"); Assert.notNull(method, "HttpMethod is required"); ClientHttpResponse response = null; try { ClientHttpRequest request = createRequest(url, method); if (requestCallback != null) { requestCallback.doWithRequest(request); } response = request.execute(); handleResponse(url, method, response); return (responseExtractor != null ? responseExtractor.extractData(response) : null); } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource); throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex); } finally { if (response != null) { response.close(); } } }
2.createRequest方法,RestTemplate是继承于InterceptingHttpAccessor和HttpAccessor
HttpAccessor protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException { ClientHttpRequest request = getRequestFactory().createRequest(url, method); if (logger.isDebugEnabled()) { logger.debug("Created " + method.name() + " request for \"" + url + "\""); } return request; }
3.getRequestFactory方法又是来源于InterceptingHttpAccessor,这里会对原有httpRequestFactory工厂包装。
public ClientHttpRequestFactory getRequestFactory() { List<ClientHttpRequestInterceptor> interceptors = getInterceptors(); if (!CollectionUtils.isEmpty(interceptors)) { ClientHttpRequestFactory factory = this.interceptingRequestFactory; if (factory == null) { factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors); this.interceptingRequestFactory = factory; } return factory; } else { return super.getRequestFactory(); } }
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper { private final List<ClientHttpRequestInterceptor> interceptors; /** * Create a new instance of the {@code InterceptingClientHttpRequestFactory} with the given parameters. * @param requestFactory the request factory to wrap * @param interceptors the interceptors that are to be applied (can be {@code null}) */ public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory, @Nullable List<ClientHttpRequestInterceptor> interceptors) { super(requestFactory); this.interceptors = (interceptors != null ? interceptors : Collections.emptyList()); } @Override protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) { return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod); } }
所以最后的ClientHttpRequest为InterceptingClientHttpRequest
3.接着我们回到第一步,response = request.execute();因为已经创建好request,现在开始执行。
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest { private final ClientHttpRequestFactory requestFactory; private final List<ClientHttpRequestInterceptor> interceptors; private HttpMethod method; private URI uri; protected InterceptingClientHttpRequest(ClientHttpRequestFactory requestFactory, List<ClientHttpRequestInterceptor> interceptors, URI uri, HttpMethod method) { this.requestFactory = requestFactory; this.interceptors = interceptors; this.method = method; this.uri = uri; } @Override public HttpMethod getMethod() { return this.method; } @Override public String getMethodValue() { return this.method.name(); } @Override public URI getURI() { return this.uri; } @Override protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException { InterceptingRequestExecution requestExecution = new InterceptingRequestExecution(); return requestExecution.execute(this, bufferedOutput); } private class InterceptingRequestExecution implements ClientHttpRequestExecution { private final Iterator<ClientHttpRequestInterceptor> iterator; public InterceptingRequestExecution() { this.iterator = interceptors.iterator(); } @Override public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); return nextInterceptor.intercept(request, body, this); } else { HttpMethod method = request.getMethod(); Assert.state(method != null, "No standard HTTP method"); ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value)); if (body.length > 0) { if (delegate instanceof StreamingHttpOutputMessage) { StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); } else { StreamUtils.copy(body, delegate.getBody()); } } return delegate.execute(); } } } }
InterceptingClientHttpRequest的execute会执行executeInternel方法,最后调用
InterceptingRequestExecution.execute
InterceptingRequestExecution.execute这个方法先会调用各种拦截器进行处理,处理完后,再会由拦截器重新调用这个对象的execute方法,做成责任链模式。最后调用缺省的HTTP代理请求工厂生成代理对象真正发起HTTP请求。
目前的拦截器就是从注册中心获取实例地址,替换URL.
4.接着进行LoadBalancerInterceptor.interropt方法。
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } }
5.这里的loadBalancer就是RibbonLoadBalancerClient,所以调用这个进行URL替换。
public class RibbonLoadBalancerClient implements LoadBalancerClient { private SpringClientFactory clientFactory; public RibbonLoadBalancerClient(SpringClientFactory clientFactory) { this.clientFactory = clientFactory; } @Override public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); URI uri; Server server; if (instance instanceof RibbonServer) { RibbonServer ribbonServer = (RibbonServer) instance; server = ribbonServer.getServer(); uri = updateToSecureConnectionIfNeeded(original, ribbonServer); } else { server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); } return context.reconstructURIWithServer(server, uri); } @Override public ServiceInstance choose(String serviceId) { Server server = getServer(serviceId); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); }
6.接着调用RibbonLoadBalancerClient.getLoadBalancer获取负载均衡器。
7.SpringClientFactory.getLoadBalancer
return getInstance(name, ILoadBalancer.class);这个会去容器工厂自动创建获取ILoadBalancer类。
8.ribbonLoadBalancingHttpClient为实现类。
package org.springframework.cloud.netflix.ribbon.apache; /** * @author Spencer Gibb */ @Configuration @ConditionalOnClass(name = "org.apache.http.client.HttpClient") @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true) public class HttpClientRibbonConfiguration { @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate") public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, CloseableHttpClient httpClient) { RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(httpClient, config, serverIntrospector); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; }
9.这个依赖于ribbonLoadBalancer,实际为这个类ZoneAwareLoadBalancer
package org.springframework.cloud.netflix.ribbon; /** * @author Dave Syer * @author Tim Ysewyn */ @SuppressWarnings("deprecation") @Configuration @EnableConfigurationProperties //Order is important here, last should be the default, first should be optional // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class}) public class RibbonClientConfiguration { @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); }
10.上面那个类又依赖于IRULE,所以最后创建 ribbonRule,实现类为
ZoneAvoidanceRule
11.创建完RULE后,创建loadBalance
12.创建ribbonLoadBalancer
13.我们再回到第5步, @Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);,开始获取server
14.选择好server后开始执行。
15.回到LoadBalancerRequestFactory.apply
16.又回到InterceptingRequestExecution.execute
17.接下来开始替换URL
18。RibbonLoadBalancerClient.reconstructURI
19.
LoadBalancerContext.reconstructURIWithServer
20.url替换完毕,调用默认的HTTP代理对象发起请求,返回
21 请求结束,处理消息体转换,返回数据。
21.请求完毕。
这篇关于springcloud ribbon @LoadBalance负载均衡源码流程分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-14后台交互资料入门指南
- 2024-11-14如何轻松创建项目环境:新手入门教程
- 2024-11-14如何抽离公共代码:初级开发者指南
- 2024-11-14Python编程入门指南
- 2024-11-14Python编程入门:如何获取参数
- 2024-11-14JWT 用户校验:简单教程与实践
- 2024-11-14Pre-commit 自动化测试入门指南
- 2024-11-14Python编程基础
- 2024-11-14Server Action入门教程:轻松掌握服务器操作
- 2024-11-14Server Component入门教程:轻松搭建服务器组件