commons-pool2 实现 sftp 连接池
2021/7/26 23:38:19
本文主要是介绍commons-pool2 实现 sftp 连接池,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
简介
ssh 默认的连接数量有限,当大量请求连接 ssh 时会概率性连接失败甚至直接失败,因此需要对连接池化,当然如果不要求实时的话可以用生产者消费者。
了解 commons-pool2
依赖
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.8.1</version> </dependency>
核心概念
- PooledObjectState:池化对象的状态
- IDLE:在队列中,未使用。
- ALLOCATED:正在使用。
- EVICTION:在队列中,正在判断是否满足被驱逐的条件。
- EVICTION_RETURN_TO_HEAD:不在队列中,目前正在测试是否可能被驱逐。 测试时尝试借用对象,将其从队列中删除。 一旦
测试完成,它应该返回到队列的头部。 - VALIDATION:在队列中,当前正在验证。
- VALIDATION_PREALLOCATED:预分配状态,不在队列中,还在测试,测试成功后分配。
- VALIDATION_RETURN_TO_HEAD:不在队列中,目前正在验证。 测试时尝试借用对象,将其从队列中删除。 一旦
测试完成,它应该返回到队列的头部。 - INVALID:无效的,将被销毁或已经被销毁。
- ABANDONED:废弃的
- RETURNING:使用完毕,正在归还到池中。
- PooledObject:池化对象的包装器,包含被包装的对象、状态以及使用信息,默认实现是
DefaultPooledObject
。 - PooledObjectFactory:对象工厂,管理池对象的生命周期(借出、验证和销毁等等),常用的实现是
BasePooledObjectFactory
。 - BaseObjectPoolConfig:对象池配置,实现有
GenericObjectPoolConfig
等等,常用参数如下。- minIdle:对象池最小空闲对象数,默认为
0
。 - maxIdle:对象池最大空闲对象数,默认为
8
。 - maxTotal:对象池最大可用对象数,默认为
8
。 - maxWaitMillis:获取对象是最大等待时间,默认为
-1
(负值时为无限等待)。 - testOnBorrow:从对象池中借出对象时是否进行测试,默认为
false
。 - testOnReturn:返回给对象池时是否测试对象可用,默认为
false
。 - testWhileIdle:空闲对象驱逐验证开关,打开后定时验证对象池中空闲对象是否可用,不可用就销毁该对象,默认为
false
。 - timeBetweenEvictionRunsMillis:上次空闲对象驱逐验证完成后多久开始下次验证,与 testWhileIdle 配合使用,默认为
-1
(负值时为无限等待)。
- minIdle:对象池最小空闲对象数,默认为
- ObjectPool:对象池。常用的实现是
GenericObjectPool
,它可以配置PooledObjectFactory
(对象工厂)、GenericObjectPoolConfig
(对象池的配置)以及AbandonedConfig
(废弃对象检测,可以不配置)。
简单应用
一个最简单的使用步骤为:
- 创建用于包装对象的
PooledObject
。这里使用默认实现DefaultPooledObject
。 - 创建管理对象的工厂
PooledObjectFactory
。新建CustomPooledObjectFactory
,继承BasePooledObjectFactory<Object>
,并实现create()
和wrap(Object obj)
方法,其中create()
方法返回的是我们想要放入和借出的对象;wrap(Object obj)
方法返回的是通过PooledObject
包装的 Object,这里可以使用默认实现new DefaultPooledObject<Object>(obj)
。 - 创建对象池配置。使用常用的
GenericObjectPoolConfig<Object>
,参数通过对应的set
方法传入。 - 创建连接池。使用常用的
GenericObjectPool
,构造函数为public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config)
,第一个参数传入第二步创建的CustomPooledObjectFactory
,第二个参数传入第三步创建的GenericObjectPoolConfig
。
代码如下,其中 main
方法为创建对象池的 demo 和使用对象池的官方 demo:
import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class CustomPooledObjectFactory extends BasePooledObjectFactory<Object> { @Override public Object create() throws Exception { return new Object(); } @Override public PooledObject<Object> wrap(Object obj) { return new DefaultPooledObject<>(obj); } public static void main(String[] args) { final GenericObjectPool<Object> objectPool = new GenericObjectPool<>(new CustomPooledObjectFactory(), new GenericObjectPoolConfig<>()); Object obj = null; try { obj = objectPool.borrowObject(); try { // 使用对象 } catch(Exception e) { // 使对象无效 objectPool.invalidateObject(obj); // 不要将对象返回到池中两次 obj = null; } finally { // 确保对象返回到池中 if(null != obj) { objectPool.returnObject(obj); } } } catch(Exception e) { // 从池中借出对象失败 } } }
Sftp
sftp 是通过 ssh 完成安全的传输的,而 ssh 的连接数是有限的,具体的 ssh 连接参数见 /etc/ssh/sshd_config
的 MaxStartups
参数,默认为 10
或 10:30:100
,含义如下:
- 10:单独只有一个参数表示最大可用连接数,当
10:30:100
这种格式时表示 100% 可以连接成功的连接数。 - 30:表示 已经有 10 个连接时再连接有 30% 的概率连接失败。
- 100:表示最大连接数。
因此为了尽可能少的创建新连接就需要使用连接池。
依赖
<dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.55</version> </dependency>
创建Sftp连接池
首先创建被池化的对象 SftpClient
,其中 originalDir
属性用于保存登录时的原始目录,validateConnect
方法用于验证连接,disconnect
方法用于销毁连接。
package com.haibara.toys.sftp.core; import com.jcraft.jsch.*; import lombok.Data; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; /** * @author haibara */ @Data public class SftpClient { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final AtomicLong CLIENT_NUMBER = new AtomicLong(1L); private ChannelSftp channelSftp; private Session session; /** * */ private String clientInfo = "sftpclient"; /** * ssh 根目录。 * 用于判断是否成功返回连接到连接池的条件之一 */ private String originalDir; public SftpClient(SftpProperties sftpProperties) throws SftpException, JSchException { try { JSch jsch = new JSch(); session = jsch.getSession(sftpProperties.getUsername(), sftpProperties.getHost(), sftpProperties.getPort()); session.setPassword(sftpProperties.getPassword()); Properties config = new Properties(); if (sftpProperties.getSession() != null) { sftpProperties.getSession().forEach(config::put); } session.setConfig(config); session.connect(); channelSftp = (ChannelSftp) session.openChannel("sftp"); channelSftp.connect(); clientInfo += CLIENT_NUMBER.getAndIncrement() + ",createTime:" + DATE_TIME_FORMATTER.format(LocalDateTime.now()); originalDir = channelSftp.pwd(); } catch (Exception e) { disconnect(); throw e; } } public void disconnect() { if (channelSftp != null) { try { channelSftp.disconnect(); } catch (Exception ignored) { } } if (session != null) { try { session.disconnect(); } catch (Exception ignored) { } } } public boolean validateConnect() { try { return session.isConnected() && channelSftp.isConnected() && originalDir.equals(channelSftp.pwd()); } catch (Exception e) { return false; } } }
接下来和前面一样分 4 步创建。
-
创建被包装的对象,同样使用默认的
DefaultPooledObject
。 -
创建对象工厂
SftpFactory
,重写validateObject
和destroyObject
方法为SftpClient
的验证和销毁方法,SftpFactory
代码见第 4 步SftpPool
的静态内部类SftpFactory
。 -
创建连接池配置(以及 sftp 的配置)
SftpProperties
类和GenericObjectPoolConfig
(连接池配置),GenericObjectPoolConfig
创建代码见第 4 步SftpPool
的getPoolConfig
方法 。
package com.haibara.toys.sftp.core; import lombok.Data; import java.util.Map; /** * @author haibara */ @Data public class SftpProperties { /** * 地址 */ private String host = "localhost"; /** * 端口号 */ private int port = 22; /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * Session 参数配置 */ private Map<String, String> session; /** * 连接池配置 */ private Pool pool; /** * 连接池配置类 */ @Data public static class Pool { /** * 池中最小的连接数,只有当 timeBetweenEvictionRuns 为正时才有效 */ private int minIdle = 0; /** * 池中最大的空闲连接数,为负值时表示无限 */ private int maxIdle = 8; /** * 池可以产生的最大对象数,为负值时表示无限 */ private int maxActive = 16; /** * 当池耗尽时,阻塞的最长时间,为负值时无限等待 */ private long maxWait = -1; /** * 从池中取出对象是是否检测可用 */ private boolean testOnBorrow = true; /** * 将对象返还给池时检测是否可用 */ private boolean testOnReturn = false; /** * 检查连接池对象是否可用 */ private boolean testWhileIdle = true; /** * 距离上次空闲线程检测完成多久后再次执行 */ private long timeBetweenEvictionRuns = 300000L; } }
yml 中对应的配置如下:
sftp: host: 127.0.0.1 port: 22 username: root password: 123456 session: StrictHostKeyChecking: no kex: diffie-hellman-group1-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group-exchange-sha256 pool: max-idle: 8 min-idle: 1 max-active: 16 max-wait: 150000 test-on-borrow: true test-on-return: false test-while-idle: true time-between-eviction-runs: 120000
- 创建连接池
SftpPool
。需要注意的一点是返还给连接池的连接要和新连接的状态相同,因此重写GenericObjectPool
的returnObject
方法,在原来的returnObject
前恢复(还原当前目录为初始目录等等),同时在SftpClient
的validateConnect
方法中添加判断是否恢复的条件(判断当前目录是否是初始目录等等)。
package com.haibara.toys.sftp.core; import com.jcraft.jsch.SftpException; import lombok.SneakyThrows; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import java.util.NoSuchElementException; /** * @author haibara */ public class SftpPool implements ObjectPool<SftpClient> { private final GenericObjectPool<SftpClient> internalPool; public SftpPool(SftpProperties sftpProperties) { this.internalPool = new GenericObjectPool<SftpClient>(new SftpFactory(sftpProperties), getPoolConfig(sftpProperties.getPool())){ @Override public void returnObject(SftpClient sftpClient) { try { sftpClient.getChannelSftp().cd(sftpClient.getOriginalDir()); } catch (Exception ignored) { } super.returnObject(sftpClient); } }; } @Override public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { internalPool.addObject(); } @Override public SftpClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException { return internalPool.borrowObject(); } @Override public void clear() throws Exception, UnsupportedOperationException { internalPool.clear(); } @Override public void close() { internalPool.close(); } @Override public int getNumActive() { return internalPool.getNumActive(); } @Override public int getNumIdle() { return internalPool.getNumIdle(); } @Override public void invalidateObject(SftpClient obj) throws Exception { internalPool.invalidateObject(obj); } @Override public void returnObject(SftpClient obj) { internalPool.returnObject(obj); } private static class SftpFactory extends BasePooledObjectFactory<SftpClient> { private final SftpProperties sftpProperties; public SftpFactory(SftpProperties sftpProperties) { this.sftpProperties = sftpProperties; } @Override public SftpClient create() throws Exception { return new SftpClient(sftpProperties); } @Override public PooledObject<SftpClient> wrap(SftpClient sftpClient) { return new DefaultPooledObject<>(sftpClient); } @Override public boolean validateObject(PooledObject<SftpClient> p) { return p.getObject().validateConnect(); } @Override public void destroyObject(PooledObject<SftpClient> p) { p.getObject().disconnect(); } } private GenericObjectPoolConfig<SftpClient> getPoolConfig(SftpProperties.Pool properties) { if (properties == null) { properties = new SftpProperties.Pool(); } GenericObjectPoolConfig<SftpClient> config = new GenericObjectPoolConfig<>(); config.setMinIdle(properties.getMinIdle()); config.setMaxIdle(properties.getMaxIdle()); config.setMaxTotal(properties.getMaxActive()); config.setMaxWaitMillis(properties.getMaxWait()); config.setTestOnBorrow(properties.isTestOnBorrow()); config.setTestOnReturn(properties.isTestOnReturn()); config.setTestWhileIdle(properties.isTestWhileIdle()); config.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRuns()); return config; } }
最后只需要创建一个 SftpPool
对象就好了,使用代码如下(SftpTemplate
是对 SftpClient
封装的工具类):
@Test void testSftp() { String localFile = ""; String remotePath = ""; String fileName = ""; SftpClient sftpClient = null; try (FileInputStream fileInputStream = new FileInputStream(localFile)) { try { sftpClient = sftpPool.borrowObject(); SftpTemplate sftpTemplate = new SftpTemplate(sftpClient); sftpTemplate.cd(remotePath); sftpTemplate.upload(fileInputStream, fileName); } catch (Exception e) { sftpPool.invalidateObject(sftpClient); sftpClient = null; } finally { if (null != sftpClient) { sftpPool.returnObject(sftpClient); } } } catch (Exception e) { // 从池中借出对象失败 } }
具体代码见 hligaty/Toys 。
参考
apache common pool2原理与实战 - 海向 - 博客园 (cnblogs.com)
这篇关于commons-pool2 实现 sftp 连接池的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-27Excel中实现拖动排序的简单教程
- 2024-11-27Rocket消息队列资料:新手入门指南
- 2024-11-27rocket消息队资料详解与入门指南
- 2024-11-27RocketMQ底层原理资料详解入门教程
- 2024-11-27RocketMQ项目开发资料:新手入门教程
- 2024-11-27RocketMQ项目开发资料详解
- 2024-11-27RocketMQ消息中间件资料入门教程
- 2024-11-27初学者指南:深入了解RocketMQ源码资料
- 2024-11-27Rocket消息队列学习入门指南
- 2024-11-26Rocket消息中间件教程:新手入门详解