为什么我的 Netty 应用没有响应?



最终问题的根本原因还是没有深入理解 Netty 的线程模型。

想法就是等服务器端启动, bind成功后再启动client。要想监听server端bind成功的状态变化,当然第一反应就是在server.bind().sync()返回的ChannelFuture中注册一个回调函数,bind成功之后在这个回调函数中启动client就行了。那么 APP入口:

1public void runClientAndServer() {
2        server.run().addListener((ChannelFutureListener) future -> {
3             client.run();                        //this doesn't work!
4//            new Thread(()->client.run()).start();   //this works!
5        });
6}

服务端:

 1public ChannelFuture run() {
 2   try {
 3       ServerBootstrap b = new ServerBootstrap();
 4       b.group(bossGroup, workerGroup)
 5               .channel(NioServerSocketChannel
 6               //配置channel...
 7               .childHandler(new ChannelInitializer<SocketChannel>() {
 8                   @Override
 9                   public void initChannel(SocketChannel ch) throws Exception {
10                       ChannelPipeline p = ch.pipeline();
11                       p.addLast(workerGroup, new EchoServerHandler());
12                   }
13               });
14       return b.bind(port).sync();//等待bind成功,返回
15   } catch (NullPointerException | InterruptedException e) {
16       e.printStackTrace();
17       return null;
18   }
19}

客户端:

 1public void run() {
 2    try {
 3        Bootstrap b = new Bootstrap();
 4        b.group(group)
 5                .channel(NioSocketChannel.class)
 6                .handler(new ChannelInitializer<SocketChannel>() {
 7                    @Override
 8                    public void initChannel(SocketChannel ch) throws Exception {
 9                        ChannelPipeline p = ch.pipeline();
10                        p.addLast(channelHandler);
11                    }
12                });
13        // Start the client.
14        ChannelFuture f = null;
15        try {
16            f = b.connect(host, port).sync();
17            channelHandler.sendMessage();
18        } catch (InterruptedException e) {
19            e.printStackTrace();
20        }
21        try {
22            f.channel().closeFuture().sync(); //等待客户端channel关闭
23        } catch (InterruptedException e) {
24            e.printStackTrace();
25        }
26    } finally {
27        // Shut down the event loop to terminate all threads.
28        group.shutdownGracefully();
29    }
30}

看上去好像没什么问题,一切都很顺理成章。但是点击运行,server端就是收不到client发送的数据。

debug思路

首先就是要确定TCP层面的行为,即客户端到底有没有发送TCP报文,服务器端有没有接收到。使用Wireshark进行抓包:

Wireshark

可以明显地看到,client能够成功地把数据通过TCP交付给server,server回复的ACK报文段代表着数据已经到达server端TCP/IP协议栈。那么可以肯定的是server端没有能够处理这个报文。 给DefaultChannelPipeline.fireChannelRead()方法打上断点,发现没有进入。其实到这基本可以判断是server端负责ChannelPipeline的线程出了问题。 上VisualVM。

VisualVM

一眼就看到服务器的NioEventloopGroup处于waiting状态。 ThreadDump一下:

"nioEventLoopGroup-2-1" #16 prio=10 os_prio=0 tid=0x00007f7350b7b000 nid=0x1533 in Object.wait() [0x00007f73022b6000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076d6c9d80> (a io.netty.channel.AbstractChannel$CloseFuture)
        at java.lang.Object.wait(Object.java:502)
        at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:236)
        - locked <0x000000076d6c9d80> (a io.netty.channel.AbstractChannel$CloseFuture)
        at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:129)
        ...
        at pku.netlab.client.EchoClient.run(EchoClient.java:86)
        at pku.netlab.App.lambda$runClientAndServer$0(App.java:31)
        at pku.netlab.App$$Lambda$1/1521389237.operationComplete(Unknown Source)
        at....
   Locked ownable synchronizers:
        - None

很明显了,server端负责IO的线程,阻塞在了client.run()方法的closeFuture上,为什么会出现这种情况??? 根本原因在于程序入口运行server和client的这段代码:

1public void runClientAndServer() {
2    server.run().addListener((ChannelFutureListener) future -> {
3         client.run();       
4    });
5}

client.run()会阻塞等待直到client的channel关闭(closeFuture的存在),而server.run()返回的ChannelFuture的背后的线程正是server的IO线程,但是我们却偏偏作死在这个线程添加了一个阻塞的回调函数,直接导致server的所有IO事件都得不到处理。

解决办法

在server的回调中新开线程启动客户端即可:

1public void runClientAndServer() {
2    server.run().addListener((ChannelFutureListener) future -> {
3         new Thread(()->client.run()).start();
4    });
5}

总结

  • 永远不要阻塞Netty应用的IO线程,否则会导致整个应用失去响应
  • Channel.closeFuture().sync()会导致当前线程阻塞在等待channel关闭的地方