Reactor (2)使用reactor-netty TCP通信示例

2021/6/9 10:25:43

本文主要是介绍Reactor (2)使用reactor-netty TCP通信示例,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

使用reactor-netty TCP通信示例

实现Reactor&Netty进行反应式tcp网络通信。

1、引入pom依赖

<!-- netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.21.Final</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.5</version>
</dependency>
<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
    <version>3.4.3</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>1.0.6</version>
</dependency>
<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty-core</artifactId>
    <version>1.0.6</version>
</dependency>

2、示例Demo

package reactornettyexamples.tcp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

import java.time.Duration;

public class MyServerConnection {
  private Logger log = LoggerFactory.getLogger(this.getClass());

  final Connection conn;

  public MyServerConnection(Connection conn) {
    this.conn = conn;
  }

  public void handle() {
    conn.inbound().receiveObject() // 从StringToIntegerDecoder接收到作为结果接收到的对象
        .log("MyServerConnection")
        .delayElements(Duration.ofSeconds(1)) // 将下一个元素的处理延迟1秒
        .doOnNext(s ->
            log.info("Current received and decoded element: " + s))
        .take(5) // 在收到五个元素后取消(实际上断开了客户端的连接)
        .flatMap(s ->
            conn.outbound().sendString(
                Mono.just(String.format("byte count: %d", (Integer) s))
            ).then()
        )
        .subscribe(conn.disposeSubscriber()); // 我们必须按顺序使用该核心订阅者,以使连接在take(5)之后断开连接
  }
}
package reactornettyexamples.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpServer;

import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.List;

public class MyTcpServer {
  private static Logger log = LoggerFactory.getLogger(MyTcpServer.class);

  public static void main(String[] args) throws CertificateException, InterruptedException {
    SelfSignedCertificate cert = new SelfSignedCertificate();
    SslContextBuilder sslContextBuilder =
        SslContextBuilder.forServer(cert.certificate(), cert.privateKey());

    TcpServer.create()   // 准备要配置的TCP服务器。
        .port(1551)    // 配置端口号
        .secure(spec -> spec.sslContext(sslContextBuilder)) // 使用自签名证书。
        //.wiretap()
        .doOnConnection(MyTcpServer::onConnect)
        .bindUntilJavaShutdown(Duration.ofSeconds(30), null); // 以阻塞方式启动服务器,并等待其完成初始化。
  }

  private static void onConnect(Connection conn) {
    conn.addHandler(new StringToIntegerDecoder()); // 将处理程序添加到netty管道
    MyServerConnection myConn = new MyServerConnection(conn);
    log.info("New client connected: {}", conn);
    myConn.handle();
  }

  /**
   * 返回接收字节数的解码器。
   */
  public static class StringToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
      int n = in.readableBytes();
      if (in.readableBytes() > 0) {
        in.readBytes(n);
        out.add(n); // 将解码器的结果存储在此处
      }
    }
  }
}
package reactornettyexamples.tcp;

import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.tcp.TcpClient;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;

public class MyTcpClient {
  private static Logger log = LoggerFactory.getLogger(MyTcpClient.class);

  public static void main(String[] args) throws InterruptedException {

    CountDownLatch latch = new CountDownLatch(1);
    StringBuilder toSend = new StringBuilder("a");

    TcpClient.create() // 准备要配置的TCP客户端
        .port(1551) // 服务端口
        // 配置SSL,以提供已配置的SslContext。
        .secure(spec -> spec
            .sslContext(SslContextBuilder.forClient()
            .trustManager(InsecureTrustManagerFactory.INSTANCE)))
        //.wiretap()
        .doOnConnected(con -> {
          log.info("Client connected successfully!");
          // 下一个导致Publisher <Void>的运算符的下一个序列永远不会自己完成,因此客户端保持永久连接
          con.outbound().sendString(Mono.just(toSend.toString()))
              .then(con.inbound()
                  .receive()
                  .asString()
                  .log("tcp-connection")
                  .doOnNext(s -> log.info("Server returned: " + s))
                  .flatMap(s -> con.outbound()
                      .sendString(Mono.just(toSend.append("a").toString()))
                      .then()
                  )
              )
              .then()
              .subscribe();
        })
        .doOnDisconnected(con -> {
          log.info("Server disconnected!");
          latch.countDown();
        })
        .connect()
        // 方式1:简单重试,retry()动作是当操作序列发生错误后重新订阅序列。
        // 重试3次
        //.retry(3)

        // 方式2:backoff方法返回就其实是Retry的子类RetryBackoffSpec。它需要两个参数:最大重试次数和最小间隔时间。
        // 最多重试3次,每次的最短时间间隔为5秒
        //.retryWhen(Retry.backoff(3, Duration.ofSeconds(5)))

        // 方式3:fixedDelay方法返回的也是RetryBackoffSpec。它需要两个参数:最大重试次数和固定的间隔时间。
        // 最大重试3次,固定延迟5秒
        //.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(5)))

        // 方式4:from方法。它需要一个Function函数。
        /*.retryWhen(Retry.from((retrySignals) -> {
            return retrySignals.map(rs -> getNumberOfTries(rs));
        }))*/

        // 方式5:withThrowable方法。它和from有一点类似也是接收一个Function,但是它的参数是异常。
        /*.retryWhen(Retry.withThrowable((retrySignals) -> {
            return retrySignals.map(rs -> {
                if(rs instanceof Exception) {
                    throw new RuntimeException("重试错误");
                } else {
                    return rs;
                }
            });
        }))*/
        .log("tcp-client")
        .doOnError(e -> log.error("Error connecting to server ... " + e.getMessage()))
        //.retryBackoff(Long.MAX_VALUE, Duration.ofSeconds(3), Duration.ofSeconds(10)) // 重试服务
        .block();

    latch.await(); // 客户端正在运行,直到服务器断开与客户端的连接
  }


  private Long getNumberOfTries(Retry.RetrySignal rs) {
    log.info("重试:" + rs.totalRetries());
    if (rs.totalRetries() < 3) {
      return rs.totalRetries();
    } else {
      log.error("retries exhausted");
      throw Exceptions.propagate(rs.failure());
    }
  }
}


这篇关于Reactor (2)使用reactor-netty TCP通信示例的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程