signed

QiShunwang

“诚信为本、客户至上”

netty入门

2021/5/14 21:51:35   来源:

一提起netty,一般都能想到NIO,callback。NIO的核心就是epoll,channel,buffer。今天从netty官网上一个简单的例子来入门netty。内容是一个回显服务器,客户端给服务器发送消息,服务器对这些消息进行回复。我相信经过手写这个简单的例子,能对netty一些基本的概念更加熟悉,方便以后的学习。

public class TelnetServer {
    public static void main(String[] args) throws CertificateException, SSLException, InterruptedException {
        SelfSignedCertificate ssc = new SelfSignedCertificate();
        final SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);
        
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new TelnetServerInitializer(sslCtx));
            b.bind(8992).sync().channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端一般会有bossGroup去专门接收请求,由workerGroup专门处理响应的请求。这里我们指定服务端的channel类型为NioServerSocketChannel,服务端的channel是专门用来处理新的连接的。ServerBootstrap在初始化的时候会添加一个专门用于接收新连接的handler到Pipeline内:ServerBootstrapAcceptor。

提一下Pipeline:channel会有一个handler容器,而Pipeline就是充当了handler容器这个角色,典型的责任链模式,handler会判断能不能处理当前请求,如果不能处理就传递给下游。

我们在服务端的channel的pipeline内再加入一个输出日志的handler:LoggingHandler,用于观察服务端接收新连接。

再注册childHandler,chlldHandler用于处理客户端的请求。netty用channel来抽象端到端的通信方式,处理各种事件就是通过在channel上面增加相应的handler。

最后,我们经常看到sync()这个方法,顾名思义,同步,其实就是自旋到这个异步事件完成。有些异步方法比较重要比如bind、connect,为了避免后面的代码变成异步,就可以使用sync方法在这个地方从异步变成同步。

public class TelnetServerInitializer extends ChannelInitializer<SocketChannel> {
    private static final StringDecoder DECODER = new StringDecoder();
    private static final StringEncoder ENCODER = new StringEncoder();

    private final SslContext sslContext;

    public TelnetServerInitializer(SslContext sslContext) {
        this.sslContext = sslContext;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
                .addLast(sslContext.newHandler(ch.alloc()))
                .addLast(new DelimiterBasedFrameDecoder(10*1024, Delimiters.lineDelimiter()))
                .addLast(DECODER)
                .addLast(ENCODER)
                .addLast(new TelnetServerHandler());
    }
}

我们为childChannel的pipeline添加了几个handler,它们的作用分别是

  1. 为channel提供底层SSL协议支持
  2. 通过换行符分割消息
  3. 解码器
  4. 编码器
  5. 具体业务处理器(业务层代码,在这一层字节流已经转为我们想要的消息格式了)
public class TelnetServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // Send greeting for a new connection.
        ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
        ctx.write("It is " + new Date() + " now.\r\n");
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        String response;
        boolean close = false;
        if (msg.isEmpty()) response = "Please type something.\n";
        else if ("bye".equals(msg.toLowerCase())) {
            response = "Have a good day!\n";
            close = true;
        } else response = "Did you say '" + msg + "'?\r\n";
        ChannelFuture f = ctx.write(response);
        if (close) f.addListener(ChannelFutureListener.CLOSE);
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

为了完整,贴一下客户端的代码。


public class TelnetClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        EventLoopGroup group = new NioEventLoopGroup(1);
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new TelnetClientInitializer(sslContext));

            Channel channel = b.connect(InetAddress.getLocalHost(), 8992).channel();

            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

            ChannelFuture lastWriteFuture = null;
            for (; ; ) {
                String line = in.readLine();
                if (line == null) break;
                lastWriteFuture = channel.writeAndFlush(line + "\n");
                if ("bye".equals(line)) {
                    channel.closeFuture().sync();
                    break;
                }
            }
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}
public class TelnetClientInitializer extends ChannelInitializer<SocketChannel> {

    private static final StringEncoder ENCODER = new StringEncoder();
    private static final StringDecoder DECODER = new StringDecoder();

    private final SslContext sslContext;

    public TelnetClientInitializer(SslContext sslContext) {
        this.sslContext = sslContext;
    }
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
                .addLast(sslContext.newHandler(ch.alloc()))
                .addLast(new DelimiterBasedFrameDecoder(10*1024, Delimiters.lineDelimiter()))
                .addLast(DECODER)
                .addLast(ENCODER)
                .addLast(new TelnetClientHandler());
    }
}
@ChannelHandler.Sharable
public class TelnetClientHandler extends SimpleChannelInboundHandler<String> {
    private final Logger logger = Logger.getAnonymousLogger();
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        logger.info(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

通过这个例子我大概能明白server和client是怎么跑起来的,怎么处理I/IO事件的。
server:

  1. 注册handler到主channel(新连接建立),childChannel(客户端请求)
  2. 通过seletor轮询I/O事件,一个seletor可以绑定多个channel,实现I/O多路复用,底层用的是epoll。
  3. eventloop负责轮询I/O事件,I/O事件进入pipeline进行处理。

这些事情用JDK的NIO也能做到,但是Netty做了一些封装和优化,使得NIO服务器变得容易 实现。而且利用Netty,我们还可以轻松的自定义协议,其实协议本质就是从字节流转换成消息。