Clickhouse负载均衡客户端BalancedClickhouseDataSource源码分析
2021/6/16 12:21:00
本文主要是介绍Clickhouse负载均衡客户端BalancedClickhouseDataSource源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章目录
- BalancedClickhouseDataSource源码分析
- 结论
BalancedClickhouseDataSource源码分析
BalancedClickhouseDataSource的完整路径是ru.yandex.clickhouse.BalancedClickhouseDataSource,源码主要包括三部分,构造方法、获取连接、以及生成可用的地址列表。
BalancedClickhouseDataSource实现了javax.sql.DataSource,参数中allUrls是构造方法中传入的地址列表,enabledUrls是可用的地址列表。
public class BalancedClickhouseDataSource implements javax.sql.DataSource { private final ThreadLocal<Random> randomThreadLocal = new ThreadLocal<Random>(); private final List<String> allUrls; private volatile List<String> enabledUrls; }
BalancedClickhouseDataSource的构造方法,有多个,但是最终调用的都是BalancedClickhouseDataSource(final List urls, ClickHouseProperties properties),如果像jdbc:clickhouse://10.170.4.81:8123,10.170.4.82:8123,10.170.4.83:8123,10.170.4.84:8123/datasets这样配置多个地址,则会先进行切分。拆分成像jdbc:clickhouse://10.170.4.81:8123/datasets、jdbc:clickhouse://10.170.4.82:8123/datasets多个地址。
public BalancedClickhouseDataSource(final String url, Properties properties) { this(splitUrl(url), new ClickHouseProperties(properties)); } static List<String> splitUrl(final String url) { Matcher m = URL_TEMPLATE.matcher(url); if (!m.matches()) { throw new IllegalArgumentException("Incorrect url"); } String database = m.group(2); if (database == null) { database = ""; } String[] hosts = m.group(1).split(","); final List<String> result = new ArrayList<String>(hosts.length); for (final String host : hosts) { result.add(JDBC_CLICKHOUSE_PREFIX + "//" + host + database); } return result; } private BalancedClickhouseDataSource(final List<String> urls, ClickHouseProperties properties) { if (urls.isEmpty()) { throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty"); } try { ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse(urls.get(0), properties.asProperties()); localProperties.setHost(null); localProperties.setPort(-1); this.properties = localProperties; } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } List<String> allUrls = new ArrayList<String>(urls.size()); for (final String url : urls) { try { if (driver.acceptsURL(url)) { allUrls.add(url); } else { log.error("that url is has not correct format: {}", url); } } catch (SQLException e) { throw new IllegalArgumentException("error while checking url: " + url, e); } } if (allUrls.isEmpty()) { throw new IllegalArgumentException("there are no correct urls"); } this.allUrls = Collections.unmodifiableList(allUrls); this.enabledUrls = this.allUrls; }
初始化完成后,会提供getConnection()方法获取连接,获取连接时会通过getAnyUrl()方法从enabledUrls可用列表中随机获取一个可用的连接。
@Override public ClickHouseConnection getConnection() throws SQLException { return driver.connect(getAnyUrl(), properties); } private String getAnyUrl() throws SQLException { List<String> localEnabledUrls = enabledUrls; if (localEnabledUrls.isEmpty()) { throw new SQLException("Unable to get connection: there are no enabled urls"); } Random random = this.randomThreadLocal.get(); if (random == null) { this.randomThreadLocal.set(new Random()); random = this.randomThreadLocal.get(); } int index = random.nextInt(localEnabledUrls.size()); return localEnabledUrls.get(index); }
最后说一下可用地址列表的获取,scheduleActualization()方法会启动一个线程定时去调用actualize()方法检测可用列表。actualize()方法时通过执行SELECT查询SELECT 1去测试节点是否可用。
/** * set time period for checking availability connections * * @param delay value for time unit * @param timeUnit time unit for checking * @return this datasource with changed settings */ public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) { ClickHouseDriver.ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { actualize(); } catch (Exception e) { log.error("Unable to actualize urls", e); } } }, 0, delay, timeUnit); return this; } /** * Checks if clickhouse on url is alive, if it isn't, disable url, else enable. * * @return number of avaliable clickhouse urls */ public synchronized int actualize() { List<String> enabledUrls = new ArrayList<String>(allUrls.size()); for (String url : allUrls) { log.debug("Pinging disabled url: {}", url); if (ping(url)) { log.debug("Url is alive now: {}", url); enabledUrls.add(url); } else { log.debug("Url is dead now: {}", url); } } this.enabledUrls = Collections.unmodifiableList(enabledUrls); return enabledUrls.size(); } private boolean ping(final String url) { try { driver.connect(url, properties).createStatement().execute("SELECT 1"); return true; } catch (Exception e) { return false; } }
结论
Clickhouse-jdbc是使用负载均衡客户端ru.yandex.clickhouse.BalancedClickhouseDataSource来保证的,本质上是通过后台启动一个线程定时去探测clickhouse服务端,生成可用的地址列表。然后获取连接的时候从可用地址列表中随机选择一个节点来建立连接。
但是,坑爹的是,scheduleActualization方法没有地方调用,也就是说必须手动调用,否则,即使你配置了多个地址,如果某个节点宕机,仍然后很大的概率建立连接失败。
最后,BalancedClickhouseDataSource仅仅保证大部分情况下连接可用,根据ping的频率和超时时间的不同,总有一小段时间不能保证可用地址列表中所有地址都可用。因此想实现故障转移,保证高可用,还必须有客户端的配合,最好增加重试机制。
这篇关于Clickhouse负载均衡客户端BalancedClickhouseDataSource源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-04el-table 开启定时器下,表格的选中状态会消失是什么原因-icode9专业技术文章分享
- 2024-10-03如何安装和初始化飞牛私有云 fnOS?-icode9专业技术文章分享
- 2024-10-03如何安装 App 并连接到飞牛 NAS?-icode9专业技术文章分享
- 2024-10-03如何安装飞牛 TV 并连接到影视服务器?-icode9专业技术文章分享
- 2024-10-03如何在PVE和ESXI上安装飞牛私有云 fnOS?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS安装系统异常情况处理-icode9专业技术文章分享
- 2024-10-03飞牛NAS如何创建存储空间?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS硬盘会自动休眠吗?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS如何安装飞牛影视和创建媒体库?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS如何为家人朋友开通影视账号?-icode9专业技术文章分享