signed

QiShunwang

“诚信为本、客户至上”

Netty实现群聊系统

2020/12/30 0:32:37   来源:

1 需求

(1) 编写一个 Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
(2) 实现多人群聊
(3) 服务器端:可以监测用户上线,离线,并实现消息转发功能
(4) 客户端:通过 channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用
(5) 户发送的消息(有服务器转发得到)
目的:进一步理解Net非阻塞网络编程机制

2 Code

2.1 GroupChatConsts

public final class GroupChatConsts {

    private GroupChatConsts() {
    }


    public static final Integer port = 8888;

    public static final String host = "127.0.0.1";

}

2.2 GroupChatServer

(1) GroupChatServerHandler

package com.rosh.netty.chat.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description:
 * @Author: rosh
 * @Date: 2020/12/29 22:59
 */
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {


    /**
     * 定义一个channel组,管理所有的channel
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * 当建立连接时,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //通知其他客户,有人加入该聊天室
        channelGroup.writeAndFlush("【客户端】 " + sdf.format(new Date()) + channel.remoteAddress() + " 加入聊天 \n");
        //加入聊天
        channelGroup.add(channel);
    }


    /**
     * 表示channel处于一个活跃的状态
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("【" + ctx.channel().remoteAddress() + "】 " + sdf.format(new Date()) + " 上线了 ~");
    }

    /**
     *  表示channel处于不活动的状态
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("【" + ctx.channel().remoteAddress() + "】 " + sdf.format(new Date()) + " 离线了 ~");
    }


    /**
     * channel断开连接会被触发
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("【客户端】 " + sdf.format(new Date()) + channel.remoteAddress() + " 离开了\n");
    }

    /**
     * 转发消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush("【客户】" + sdf.format(new Date()) + channel.remoteAddress() + "发送了消息: " + msg + "\n");
            } else {
                ch.writeAndFlush("【自己】" + sdf.format(new Date()) + "发送了消息: " + msg + "\n");
            }
        });
    }

    /**
     * 发生异常时 关闭通道
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

(2) GroupChatServerChannelInitializer

public class GroupChatServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();

        //解码器
        pipeline.addLast("decoder", new StringDecoder());
        //编码器
        pipeline.addLast("encoder", new StringEncoder());
        //加入自己业务handler
        pipeline.addLast("serverHandler", new GroupChatServerHandler());


    }
}

(3) GroupChatServerHandler

public class GroupChatServer {


    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    //nio
                    .channel(NioServerSocketChannel.class)
                    //bossGroup,option,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //workerGroup,option,是否启动心跳包活机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new GroupChatServerChannelInitializer());
            //绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(GroupChatConsts.port).sync();
            //监听关闭事件
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


}

2.3 GroupChatClient

(1) GroupChatClientHandler

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

(2) GroupChatClientChannelInitializetr

public class GroupChatClientChannelInitializetr  extends ChannelInitializer<SocketChannel>{
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        //解码器
        pipeline.addLast("decoder", new StringDecoder());
        //编码器
        pipeline.addLast("encoder", new StringEncoder());
        //自定义handler
        pipeline.addLast("clientHandler", new GroupChatClientHandler());
    }
}

(3) GroupChatClient

public class GroupChatClient {

    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new GroupChatClientChannelInitializetr());
            ChannelFuture channelFuture = bootstrap.connect(GroupChatConsts.host, GroupChatConsts.port).sync();
            //输入信息
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg + "\n");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }

}


3 测试

(1) 启动服务端,启动3个客户端
在这里插入图片描述
(2) 发送消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述