netty阻塞请求的实现
2021/7/2 23:22:14
本文主要是介绍netty阻塞请求的实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、概述和测试及工具使用
1、概述
什么是阻塞请求,这个名称是我自己称呼的,比如说在netty中我们客户端自定义的处理器中发送消息,但是要获得服务端的结果,却是异步返回的,怎么才能将这个异步变成同步呢?
我们使用的是CountDownLatch闭锁实现。
2、CountDownLatch异步转同步演示
// 记录响应数据
static String res = null;
// 效果,获得响应数据后,才会打印res,即线程会在await位置阻塞
public static void main(String[] args) throws Exception {
CountDownLatch cd = new CountDownLatch(1);
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
res = "响应数据";
cd.countDown();
}).start();
cd.await();
System.out.println(res);
}
上面是闭锁阻塞异步变同步的基本原理,本次演示客户端发送请求对象,服务端发送响应对象,它们都会存在一个统一的id,以这个id为key,缓存住发送请求对象后,要获得响应对象的Futrue封装的Map中。下面给出响应Futrue和Futrue缓存操作的代码。
ublic class RpcFutrue<T> implements Future<T>{
private T response;
private CountDownLatch countdown = new CountDownLatch(1);
/**
* 设置response后,get方法才不会进行阻塞
*/
public void setResponse(T t) {
this.response = t;
this.countdown.countDown();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public T get() throws InterruptedException, ExecutionException {
this.countdown.await();
return response;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (countdown.await(timeout, unit)) {
return this.response;
}
return null;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
if (response != null) {
return true;
}
return false;
}
@Test
public void test() throws Exception {
RpcFutrue<String> future = new RpcFutrue<>();
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
future.setResponse("response result");
}).start();
String result = future.get();
System.out.println(result);
}
/**
* Future缓存
*/
public class FutrueCache {
private static Map<String, RpcFutrue<Response>> requestMap = new ConcurrentHashMap<>();
public static RpcFutrue<Response> get(String requestId) {
return requestMap.get(requestId);
}
public static void put(String key, RpcFutrue<Response> value) {
requestMap.put(key, value);
}
public static void remove(String key) {
requestMap.remove(key);
}
}
3、对象编码解码
public class ObjectDecode extends ByteToMessageDecoder {
private Class<?> clazz;
public ObjectDecode(Class<?> clazz) {
super();
this.clazz = clazz;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int len = in.readInt();
if (in.readableBytes() < len) {
in.resetReaderIndex();
return;
}
byte[] datas = new byte[len];
in.readBytes(datas);
Object object = SerializingUtil.deserialize(datas, clazz);
out.add(object);
}
}
public class ObjectEncode extends MessageToByteEncoder<Object>{
private Class<?> clazz;
public ObjectEncode(Class<?> clazz) {
super();
this.clazz = clazz;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if(clazz.isInstance(msg)) {
byte[] data = SerializingUtil.serialize(msg);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
4、pojo对象
public class Request {
private String requestId;
private String msg;
/**
* 格式 ip:port 例子:127.0.0.1:7000
*/
private String address;
/**
* 响应对象
*/
public class Response {
private String requestId;
private String msg;
public
二、客户端和服务端代码
1、服务端
服务端和普通的服务端没有什么区别,上面编码和解码器传入的class需要注意顺序,服务端decode是request,encode是response,客户端反之。
public class NettyServer {
public static void main(String[] args) {
NettyServer server = new NettyServer();
server.bind(7000);
}
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecode(Request.class));
ch.pipeline().addLast(new ObjectEncode(Response.class));
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture future = b.bind(port).sync();
System.out.println("server start now");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class ServerHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("Channel active :{}", ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof Request) {
Request request = (Request) msg;
Response response = new Response();
response.setRequestId(request.getRequestId());
response.setMsg("响应请求-请求发来信息是:" + request.getMsg());
ctx.writeAndFlush(response);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
logger.error("Exception occurred:{}", cause.getMessage());
ctx.close();
}
}
2、客户端
客户端我们发送请求时可以在自定义的handler内channelActive用ctx.writeAndFlush发送,实际是channal发送数据。也可以采用客户端用变量接收ChannelFuture进行发送数据。不管上面二个方式怎么处理,服务端返回的响应结果,都是在下次channelRead中获取的响应数据并且处理的。这个过程从我们操作代码的角度看,肯定是异步的,而我们在发送数据的时候,通过构建一个Futrue对象,并且缓存到cache中,然后调用这个Futrue的get方法,它就会持续等待到read方法中返回response并且取出缓存的Futrue调用setResponse才能解除阻塞,这样就使我们的异步代码变成同步的处理代码了。
public class NettyClient {
public static void main(String[] args) {
NettyClient client = new NettyClient();
Request request = new Request();
request.setRequestId(UUID.randomUUID().toString());
request.setMsg("请求信息");
request.setAddress("127.0.0.1:7000");
Response response = client.send(request);
System.out.println(response);
}
public Response send(Request request) {
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
MyClientHandler handler = new MyClientHandler();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecode(Response.class));
ch.pipeline().addLast(new ObjectEncode(Request.class));
ch.pipeline().addLast(handler);
}
});
ChannelFuture f = b.connect(request.getIp(), request.getPort()).sync();
Response response = handler.sendRequest(request); // 发送请求会阻塞至响应获取
// 二次阻塞,第一次阻塞会等待channel注册获取chanel,第二次阻塞会等待响应
return response;
} catch (Exception e) {
e.printStackTrace();
return null;
} finally {
workerGroup.shutdownGracefully();
}
}
}
public class MyClientHandler extends ChannelInboundHandlerAdapter{
private CountDownLatch latch = new CountDownLatch(1);
static final int CHANNEL_WAIT_TIME = 4;
static final int RESPONSE_WAIT_TIME = 8;
/**
* 监听Channel注册后获取
*/
private Channel channel;
private static Logger logger = LoggerFactory.getLogger(MyClientHandler.class);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
this.channel = ctx.channel();
latch.countDown(); // 1、Channel等待注册取消阻塞
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("Connect to server successfully:{}", ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof Response) {
Response response = (Response) msg;
String requestid = response.getRequestId();
RpcFutrue<Response> futrue = FutrueCache.get(requestid);
if (futrue != null) {
futrue.setResponse(response); // 2、等待响应取消阻塞
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("异常出现:{}", cause.getMessage());
}
/**
* 发送请求
*/
public Response sendRequest(Request request) throws Exception {
Response response;
RpcFutrue<Response> future = new RpcFutrue<>();
FutrueCache.put(request.getRequestId(), future);
try {
// 等待注册
if (latch.await(CHANNEL_WAIT_TIME, TimeUnit.SECONDS)) {
channel.writeAndFlush(request);
// 等待响应
response = future.get(RESPONSE_WAIT_TIME, TimeUnit.SECONDS);
} else {
throw new Exception(" channel time out");
}
} catch (Exception e) {
throw new Exception(e.getMessage());
} finally {
FutrueCache.remove(request.getRequestId());
}
return response;
}
}
从上面可以看到,除了我们每个请求使用了Futrue模式,并且初始化发送请求的时候,也使用了一次闭锁,因为我们是在Handler还没有注册时,就调用了send方法,需要阻塞至自定义处理对象的channelRegistered触发,才解除阻塞。
end!!1
————————————————
版权声明:本文为CSDN博主「野生技术协会」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/shuixiou1/article/details/114989404
这篇关于netty阻塞请求的实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-27消息中间件底层原理资料详解
- 2024-11-27RocketMQ底层原理资料详解:新手入门教程
- 2024-11-27MQ底层原理资料详解:新手入门教程
- 2024-11-27MQ项目开发资料入门教程
- 2024-11-27RocketMQ源码资料详解:新手入门教程
- 2024-11-27本地多文件上传简易教程
- 2024-11-26消息中间件源码剖析教程
- 2024-11-26JAVA语音识别项目资料的收集与应用
- 2024-11-26Java语音识别项目资料:入门级教程与实战指南
- 2024-11-26SpringAI:Java 开发的智能新利器