commons-pool2 实现 sftp 连接池

2021/7/26 23:38:19

本文主要是介绍commons-pool2 实现 sftp 连接池,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

简介

ssh 默认的连接数量有限,当大量请求连接 ssh 时会概率性连接失败甚至直接失败,因此需要对连接池化,当然如果不要求实时的话可以用生产者消费者。

了解 commons-pool2

依赖

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.8.1</version>
</dependency>

核心概念

  • PooledObjectState:池化对象的状态
    • IDLE:在队列中,未使用。
    • ALLOCATED:正在使用。
    • EVICTION:在队列中,正在判断是否满足被驱逐的条件。
    • EVICTION_RETURN_TO_HEAD:不在队列中,目前正在测试是否可能被驱逐。 测试时尝试借用对象,将其从队列中删除。 一旦
      测试完成,它应该返回到队列的头部。
    • VALIDATION:在队列中,当前正在验证。
    • VALIDATION_PREALLOCATED:预分配状态,不在队列中,还在测试,测试成功后分配。
    • VALIDATION_RETURN_TO_HEAD:不在队列中,目前正在验证。 测试时尝试借用对象,将其从队列中删除。 一旦
      测试完成,它应该返回到队列的头部。
    • INVALID:无效的,将被销毁或已经被销毁。
    • ABANDONED:废弃的
    • RETURNING:使用完毕,正在归还到池中。
  • PooledObject:池化对象的包装器,包含被包装的对象、状态以及使用信息,默认实现是 DefaultPooledObject
  • PooledObjectFactory:对象工厂,管理池对象的生命周期(借出、验证和销毁等等),常用的实现是 BasePooledObjectFactory
  • BaseObjectPoolConfig:对象池配置,实现有 GenericObjectPoolConfig 等等,常用参数如下。
    • minIdle:对象池最小空闲对象数,默认为 0
    • maxIdle:对象池最大空闲对象数,默认为 8
    • maxTotal:对象池最大可用对象数,默认为 8
    • maxWaitMillis:获取对象是最大等待时间,默认为 -1 (负值时为无限等待)。
    • testOnBorrow:从对象池中借出对象时是否进行测试,默认为 false
    • testOnReturn:返回给对象池时是否测试对象可用,默认为 false
    • testWhileIdle:空闲对象驱逐验证开关,打开后定时验证对象池中空闲对象是否可用,不可用就销毁该对象,默认为 false
    • timeBetweenEvictionRunsMillis:上次空闲对象驱逐验证完成后多久开始下次验证,与 testWhileIdle 配合使用,默认为 -1 (负值时为无限等待)。
  • ObjectPool:对象池。常用的实现是 GenericObjectPool ,它可以配置 PooledObjectFactory (对象工厂)、 GenericObjectPoolConfig (对象池的配置)以及 AbandonedConfig (废弃对象检测,可以不配置)。

简单应用

一个最简单的使用步骤为:

  1. 创建用于包装对象的 PooledObject 。这里使用默认实现 DefaultPooledObject
  2. 创建管理对象的工厂 PooledObjectFactory 。新建 CustomPooledObjectFactory ,继承 BasePooledObjectFactory<Object> ,并实现 create()wrap(Object obj) 方法,其中 create() 方法返回的是我们想要放入和借出的对象;wrap(Object obj) 方法返回的是通过 PooledObject 包装的 Object,这里可以使用默认实现 new DefaultPooledObject<Object>(obj)
  3. 创建对象池配置。使用常用的 GenericObjectPoolConfig<Object> ,参数通过对应的 set 方法传入。
  4. 创建连接池。使用常用的 GenericObjectPool ,构造函数为 public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config) ,第一个参数传入第二步创建的 CustomPooledObjectFactory ,第二个参数传入第三步创建的 GenericObjectPoolConfig

代码如下,其中 main 方法为创建对象池的 demo 和使用对象池的官方 demo:

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class CustomPooledObjectFactory extends BasePooledObjectFactory<Object> {
  @Override
  public Object create() throws Exception {
    return new Object();
  }

  @Override
  public PooledObject<Object> wrap(Object obj) {
    return new DefaultPooledObject<>(obj);
  }

  public static void main(String[] args) {
    final GenericObjectPool<Object> objectPool = new GenericObjectPool<>(new CustomPooledObjectFactory(), new GenericObjectPoolConfig<>());
    Object obj = null;
    try {
      obj = objectPool.borrowObject();
      try {
        // 使用对象
      } catch(Exception e) {
        // 使对象无效
        objectPool.invalidateObject(obj);
        // 不要将对象返回到池中两次
        obj = null;
      } finally {
        // 确保对象返回到池中
        if(null != obj) {
          objectPool.returnObject(obj);
        }
      }
    } catch(Exception e) {
      // 从池中借出对象失败
    }
  }
}

Sftp

sftp 是通过 ssh 完成安全的传输的,而 ssh 的连接数是有限的,具体的 ssh 连接参数见 /etc/ssh/sshd_configMaxStartups 参数,默认为 1010:30:100,含义如下:

  • 10:单独只有一个参数表示最大可用连接数,当 10:30:100 这种格式时表示 100% 可以连接成功的连接数。
  • 30:表示 已经有 10 个连接时再连接有 30% 的概率连接失败。
  • 100:表示最大连接数。

因此为了尽可能少的创建新连接就需要使用连接池。

依赖

<dependency>
    <groupId>com.jcraft</groupId>
    <artifactId>jsch</artifactId>
    <version>0.1.55</version>
</dependency>

创建Sftp连接池

首先创建被池化的对象 SftpClient ,其中 originalDir 属性用于保存登录时的原始目录,validateConnect 方法用于验证连接,disconnect 方法用于销毁连接。

package com.haibara.toys.sftp.core;

import com.jcraft.jsch.*;
import lombok.Data;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author haibara
 */
@Data
public class SftpClient {
  private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  private static final AtomicLong CLIENT_NUMBER = new AtomicLong(1L);
  private ChannelSftp channelSftp;
  private Session session;
  /**
   *
   */
  private String clientInfo = "sftpclient";
  /**
   * ssh 根目录。
   * 用于判断是否成功返回连接到连接池的条件之一
   */
  private String originalDir;

  public SftpClient(SftpProperties sftpProperties) throws SftpException, JSchException {
    try {
      JSch jsch = new JSch();
      session = jsch.getSession(sftpProperties.getUsername(), sftpProperties.getHost(), sftpProperties.getPort());
      session.setPassword(sftpProperties.getPassword());
      Properties config = new Properties();
      if (sftpProperties.getSession() != null) {
        sftpProperties.getSession().forEach(config::put);
      }
      session.setConfig(config);
      session.connect();
      channelSftp = (ChannelSftp) session.openChannel("sftp");
      channelSftp.connect();
      clientInfo += CLIENT_NUMBER.getAndIncrement() + ",createTime:" + DATE_TIME_FORMATTER.format(LocalDateTime.now());
      originalDir = channelSftp.pwd();
    } catch (Exception e) {
      disconnect();
      throw e;
    }
  }

  public void disconnect() {
    if (channelSftp != null) {
      try {
        channelSftp.disconnect();
      } catch (Exception ignored) {
      }
    }
    if (session != null) {
      try {
        session.disconnect();
      } catch (Exception ignored) {
      }
    }
  }

  public boolean validateConnect() {
    try {
      return session.isConnected() && channelSftp.isConnected() && originalDir.equals(channelSftp.pwd());
    } catch (Exception e) {
      return false;
    }
  }
}

接下来和前面一样分 4 步创建。

  1. 创建被包装的对象,同样使用默认的 DefaultPooledObject

  2. 创建对象工厂 SftpFactory ,重写 validateObjectdestroyObject 方法为 SftpClient 的验证和销毁方法,SftpFactory 代码见第 4 步 SftpPool 的静态内部类 SftpFactory

  3. 创建连接池配置(以及 sftp 的配置) SftpProperties 类和 GenericObjectPoolConfig (连接池配置),GenericObjectPoolConfig 创建代码见第 4 步 SftpPoolgetPoolConfig 方法 。

package com.haibara.toys.sftp.core;

import lombok.Data;

import java.util.Map;

/**
 * @author haibara
 */
@Data
public class SftpProperties {
  /**
   * 地址
   */
  private String host = "localhost";
  /**
   * 端口号
   */
  private int port = 22;
  /**
   * 用户名
   */
  private String username;
  /**
   * 密码
   */
  private String password;
  /**
   * Session 参数配置
   */
  private Map<String, String> session;
  /**
   * 连接池配置
   */
  private Pool pool;

  /**
   * 连接池配置类
   */
  @Data
  public static class Pool {
    /**
     * 池中最小的连接数,只有当 timeBetweenEvictionRuns 为正时才有效
     */
    private int minIdle = 0;

    /**
     * 池中最大的空闲连接数,为负值时表示无限
     */
    private int maxIdle = 8;

    /**
     * 池可以产生的最大对象数,为负值时表示无限
     */
    private int maxActive = 16;

    /**
     * 当池耗尽时,阻塞的最长时间,为负值时无限等待
     */
    private long maxWait = -1;

    /**
     * 从池中取出对象是是否检测可用
     */
    private boolean testOnBorrow = true;

    /**
     * 将对象返还给池时检测是否可用
     */
    private boolean testOnReturn = false;

    /**
     * 检查连接池对象是否可用
     */
    private boolean testWhileIdle = true;

    /**
     * 距离上次空闲线程检测完成多久后再次执行
     */
    private long timeBetweenEvictionRuns = 300000L;
  }
}

yml 中对应的配置如下:

sftp:
  host: 127.0.0.1
  port: 22
  username: root
  password: 123456
  session:
    StrictHostKeyChecking: no
    kex: diffie-hellman-group1-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group-exchange-sha256
  pool:
    max-idle: 8
    min-idle: 1
    max-active: 16
    max-wait: 150000
    test-on-borrow: true
    test-on-return: false
    test-while-idle: true
    time-between-eviction-runs: 120000
  1. 创建连接池 SftpPool 。需要注意的一点是返还给连接池的连接要和新连接的状态相同,因此重写 GenericObjectPoolreturnObject 方法,在原来的 returnObject 前恢复(还原当前目录为初始目录等等),同时在 SftpClientvalidateConnect 方法中添加判断是否恢复的条件(判断当前目录是否是初始目录等等)。
package com.haibara.toys.sftp.core;

import com.jcraft.jsch.SftpException;
import lombok.SneakyThrows;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.util.NoSuchElementException;

/**
 * @author haibara
 */
public class SftpPool implements ObjectPool<SftpClient> {
  private final GenericObjectPool<SftpClient> internalPool;

  public SftpPool(SftpProperties sftpProperties) {
    this.internalPool = new GenericObjectPool<SftpClient>(new SftpFactory(sftpProperties), getPoolConfig(sftpProperties.getPool())){
      @Override
      public void returnObject(SftpClient sftpClient) {
        try {
          sftpClient.getChannelSftp().cd(sftpClient.getOriginalDir());
        } catch (Exception ignored) {
        }
        super.returnObject(sftpClient);
      }
    };
  }

  @Override
  public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
    internalPool.addObject();
  }

  @Override
  public SftpClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
    return internalPool.borrowObject();
  }

  @Override
  public void clear() throws Exception, UnsupportedOperationException {
    internalPool.clear();
  }

  @Override
  public void close() {
    internalPool.close();
  }

  @Override
  public int getNumActive() {
    return internalPool.getNumActive();
  }

  @Override
  public int getNumIdle() {
    return internalPool.getNumIdle();
  }

  @Override
  public void invalidateObject(SftpClient obj) throws Exception {
    internalPool.invalidateObject(obj);
  }

  @Override
  public void returnObject(SftpClient obj) {
    internalPool.returnObject(obj);
  }

  private static class SftpFactory extends BasePooledObjectFactory<SftpClient> {

    private final SftpProperties sftpProperties;

    public SftpFactory(SftpProperties sftpProperties) {
      this.sftpProperties = sftpProperties;
    }

    @Override
    public SftpClient create() throws Exception {
      return new SftpClient(sftpProperties);
    }

    @Override
    public PooledObject<SftpClient> wrap(SftpClient sftpClient) {
      return new DefaultPooledObject<>(sftpClient);
    }

    @Override
    public boolean validateObject(PooledObject<SftpClient> p) {
      return p.getObject().validateConnect();
    }

    @Override
    public void destroyObject(PooledObject<SftpClient> p) {
      p.getObject().disconnect();
    }

  }

  private GenericObjectPoolConfig<SftpClient> getPoolConfig(SftpProperties.Pool properties) {
    if (properties == null) {
      properties = new SftpProperties.Pool();
    }
    GenericObjectPoolConfig<SftpClient> config = new GenericObjectPoolConfig<>();
    config.setMinIdle(properties.getMinIdle());
    config.setMaxIdle(properties.getMaxIdle());
    config.setMaxTotal(properties.getMaxActive());
    config.setMaxWaitMillis(properties.getMaxWait());
    config.setTestOnBorrow(properties.isTestOnBorrow());
    config.setTestOnReturn(properties.isTestOnReturn());
    config.setTestWhileIdle(properties.isTestWhileIdle());
    config.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRuns());
    return config;
  }
}

最后只需要创建一个 SftpPool 对象就好了,使用代码如下(SftpTemplate 是对 SftpClient 封装的工具类):

@Test
void testSftp() {
  String localFile = "";
  String remotePath = "";
  String fileName = "";
  SftpClient sftpClient = null;
  try (FileInputStream fileInputStream = new FileInputStream(localFile)) {
    try {
      sftpClient = sftpPool.borrowObject();
      SftpTemplate sftpTemplate = new SftpTemplate(sftpClient);
      sftpTemplate.cd(remotePath);
      sftpTemplate.upload(fileInputStream, fileName);
    } catch (Exception e) {
      sftpPool.invalidateObject(sftpClient);
      sftpClient = null;
    } finally {
      if (null != sftpClient) {
        sftpPool.returnObject(sftpClient);
      }
    }
  } catch (Exception e) {
    // 从池中借出对象失败
  }
}

具体代码见 hligaty/Toys 。

参考

apache common pool2原理与实战 - 海向 - 博客园 (cnblogs.com)



这篇关于commons-pool2 实现 sftp 连接池的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程