signed

QiShunwang

“诚信为本、客户至上”

Java基础系列:NIO编程

2021/3/21 8:46:02   来源:

俗世游子:专注技术研究的程序猿

说在前面的话

聊完了Socket编程,我们来聊一聊关于NIO方面的话题,当然在这里只会介绍用的比较广泛的类/方法,其他用的不多的,就不多介绍了,用到的时候查API就好了

本节我们聊个大概内容,明白该如何使用,等到Netty部分的时候还会重点聊漏掉的部分,比如:

  • 四种网络模型
  • Reactor反应器模式
  • 。。。

Channel和Buffer

前面我们聊到的Socket编程,建立连接之后本质上还是在操作IO流,这种方式属于同步阻塞模型,也就是我们常说的BIO

这种方式的缺点可想而知,所以为了提高效率,在JDK1.4之后,出现了NIO

  • 缓冲区为载体,通过建立Channel来进行数据传输

NIO是非阻塞模型,所谓的非阻塞模型,就是说:

  • 当我们调用read方法的时候,如果此时有数据,则读取到数据并返回
  • 如果没有数据,那么就直接返回,不会阻塞到主线程

而且所有的操作都是基于事件监听的方法

首先我们先聊缓冲区:

Buffer

根据官方介绍,Buffer是一种特定类型的容器,用来存放需要传输的数据,该类属于抽象类,我们可以查看一下它的具体实现类:

Buffer的实现类

从名字上应该能看出它的意思,这里就不多介绍了

后续的例子都是用ByteBuffer为例

重要属性

我要说的是它的特性,其中包含3个非常重要的属性:

  • capacity

该属性表示当前Buffer的容量,当前容量在初始化之后是固定的,不能被修改。

// 创建过程
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 直接指定内容的创建
ByteBuffer byteBuffer = ByteBuffer.wrap("123".getBytes());
// 分配直接内存缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);

三者区别:

  • allocate()只是开辟了一个数组空间
  • wrap()表示直接将数组对象包装到缓冲区中,这样创建下来的对象,limitcapacity都是数组长度,position为0,所以在读取数据的时候不需要调用flip()方法
  • allocateDirect()allocate()的表现方式一样,但是底层实现不同。allocateDirect()是从直接内存中开辟一块空间做缓冲区。

直接内存归属于OS管理的内核空间的内存,可以做到零拷贝

通过查看源码一步步到Buffer中,一定可以看到

// cap就是我们初始化的容量
this.capacity = cap;
  • limit

limit表示限制,也就是说在读取或者写入元素的时候不允许的索引限制,默认情况下,初始化之后,其限制值不会超过容量

// 1024
System.out.println("limit:" + byteBuffer.limit());
  • position

表示读取或者写入后的索引位置,不能超过限制值,否则会抛出BufferOverflowException

byteBuffer.put("11".getBytes());
// 2 插入了2位的数据
System.out.println("position:" + byteBuffer.position());

读写模式

Buffer使用最麻烦的地方在这里,前面说过,Channel是通过Buffer来传输数据的,那么我们可以认定Buffer为中心点

  • 当向Bufferput()的时候,称为写模式,而且默认情况下就是写模式,可以通过clear()来进行模式转换
byteBuffer.clear();
  • 当从Bufferget()的时候,称为读模式,我们通过flip()来转换
byteBuffer.flip();

下面我用一个实验来查看一下:

public static void main(String[] args) {
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    byteBuffer.put("123456".getBytes());

    print("原始", byteBuffer);
    byteBuffer.flip();
    print("flip后", byteBuffer);
    byteBuffer.clear();
    print("clear后", byteBuffer);

}

private static void print(String tag, ByteBuffer byteBuffer) {
    System.out.println(tag + ":position:" + byteBuffer.position());
    System.out.println(tag + ":limit:" + byteBuffer.limit());
    System.out.println("==============================");
}

输出结果

原始:position:6
原始:limit:1024
==============================
flip后:position:0
flip后:limit:6
==============================
clear后:position:0
clear后:limit:1024
==============================
重要结论

通过分析上面的输出,我们可以得出以下结论:

  • 默认情况下,Buffer是写模式,而且postion表示写入数据的位置,limit表示最大限制

  • 当我们调用flip()转换模式之后,position置为0,表示从第0位读取,limit置为原始position,说明我们在读取的时候,不能超过上次写入的长度,而且如果我们在这里put()的话,也会覆盖掉之前的数据

  • 当调用clear()之后,再次恢复成写模式,虽然position还是0,但是之前写入的数据并不会消失,只有在重新put()之后才会覆盖

  • 所以,clear()flip()方法修改的只是positionlimit,不会修改实际存储数组,这里大家不要混淆

还是推荐大家亲自试一试,每一个细节点大家都看看那两个属性的变化

方法整理

上面其实也了解到不少方法,下面还有一些方法我们用到的也比较频繁:

array()
byteBuffer.flip();
final byte[] array = byteBuffer.array();
System.out.println(new String(array, 0, byteBuffer.limit()));
byteBuffer.clear();

该方法会直接返回最大容量的数组,所以我们在读取的时候,最好能转换成读模式

putXXX()

ByteBuffer除了可以写入字节数组外,还可以写入其他类型的数据,包括:

image-20210305112923066

get也是同理,这里就不写了

Channel

Buffer配合使用的通道,可以理解为一个通道就是一个连接。根据不同的传输协议,有不同的Channel实现:

FileChannel

这是专门用来操作文件的通道,这里要注意一点,FileChannel阻塞模式的通道

开启通道
// 第一种启动方式
Path path = Paths.get("文件路径");
final FileChannel fileChannel = FileChannel.open(path);

// 通过流来启动
final FileChannel fileChannel = new FileInputStream("文件路径").getChannel();
force()

在将缓冲区的内容写入到通道的过程,是由操作系统来完成的,处于性能方面考虑,不可能是实时写入,所以为了保证数据最终都真正写入到磁盘上,需要强制刷新

fileChannel.force(true);

其他的和之前的IO没有什么区别,我这里就不写了,给大家写个例子就明白了

基于NIO的文件传输

public class CopyFile {
    public static void main(String[] args) throws IOException {
        // 得到文件
        final Path source = Paths.get("D:\\Working Directory\\project\\study\\study-java\\src\\main\\java\\zopx\\top\\study\\jav\\_nio\\CopyFile.java");

        // 读取通道: 通过open打开
        final FileChannel sourceChannel = FileChannel.open(source);
        // 写出通道:  通过流打开
        final FileChannel targetChannel = new FileOutputStream(new File("D:\\Working Directory\\project\\study\\study-java\\src\\main\\java\\zopx\\top\\study\\jav\\_nio\\_CopyFile.txt")).getChannel();

        // --------------------
        // 定义ByteBuffer
        ByteBuffer bytebuffer = ByteBuffer.allocate(1024);
        // read其实是向bytebuffer中写入数据,所以这里是默认的,不需要转换
        while ((sourceChannel.read(bytebuffer)) != -1) {
            // write其实是从 bytebuffer 中读取数据然后再通过channel发送出去,所以这里需要转换成读模式
            bytebuffer.flip();
            // 将bytebuffer中的数据写出到通道中
            targetChannel.write(bytebuffer);
            // 将数据写出去之后,最好将bytebuffer的模式改成写模式,不然写入数据会出错
            bytebuffer.clear();
        }
        
        // 强制刷新
        targetChannel.force(true);

        // 关闭通道
        sourceChannel.close();
        targetChannel.close();
    }
}

这里还有一种不需要考虑ByteBuffer模式转换的方式

long size = sourceChannel.size();
long pos = 0, count = 0;
while (pos < size) {
    count = size - count < 1024 ? size - count : 1024;
    pos += targetChannel.transferFrom(sourceChannel, pos, count);
}

TCP/IP协议下的Channel

两者成对出现,不想单独写了

基于网络传输的通道,分别表示为:

  • ServerSocketChannel
  • SocketChannel

和之前的ServerSocket/Socket意思是一样的

服务端开启通道
// 开启通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 绑定端口
ssc.bind(new InetSocketAddress(8888));
客户端开启通道
// 连接到服务端
SocketChannel sc = 
    SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));

// 也可以通过调用connect()方法来实现连接到服务端
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888))

下面两边的方法都是一样的,就以服务端为例

主要配置项
ssc.configureBlocking(false);

该配置表示设置通道的阻塞模式,如果:

  • 设置为true,表示为阻塞模型,后续的read/write方法都是阻塞的,就和面向流的操作效率一样
  • 设置为false,表示为非阻塞模式
注册选择器
Selector selector = Selector.open();
// 注册接收事件
ssc.register(selector, SelectionKey.OP_ACCEPT);

如果说一个通道代表一个连接,那么在非阻塞模式下,通过选择器可以监控多个连接的IO状况,可以说选择器和通道是监控被监控的关系

通道IO事件

表示通道具备完成某种IO操作的条件,包含以下几种事件:

  • SelectionKey.OP_ACCEPT

就绪可接收

  • SelectionKey.OP_CONNECT

可连接

  • SelectionKey.OP_READ

可读

  • SelectionKey.OP_WRITE

可写

轮询感兴趣的IO事件
  • 就绪状态的查询:通过选择器的select(),查询注册过的所有socket的就绪状态,当有任何一个注册过的socket中的数据就绪,那么就将其添加到就绪的列表中
  • 然后我们通过循环处理
while (selector.select() > 0) {
    final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while (iterator.hasNext()) {
        final SelectionKey selectionKey = iterator.next();
        // 防止重复处理
        iterator.remove();

        if (selectionKey.isAcceptable()) {
            // 接收
        } 
        if (selectionKey.isConnectable()) {
            // 连接
        }
        if (selectionKey.isReadable()) {
            // 可读
        }
        if (selectionKey.isWritable()) {
            // 可写
        }
    }
}       

基本操作就是这样,接下里就是通过read()write()操作Buffer,还是已一个实际案例来完成

一定要注意Buffer模式之间的转换

基于NIO的点对点聊天

  • 服务端
public class ChatServer {

    /**
     * 绑定端口号
     */
    public static int PORT = 8888;

    private static Map<String, SocketChannel> TOKEN_SOCKET_MAP = new HashMap<>();
    private static Map<SocketChannel, String> SOCKET_TOKEN_MAP = new HashMap<>();


    public static void main(String[] args) {
        try {
            new ChatServer().start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    ServerSocketChannel ssc = null;

    private void start() throws IOException {
        // 开启NIO的ServerSocket通道
        ssc = ServerSocketChannel.open();
        // 绑定端口
        ssc.bind(new InetSocketAddress(PORT));

        //设置为非阻塞式,这里是NIO的关键点
        ssc.configureBlocking(false);

        // 选择器
        Selector selector = Selector.open();
        // 注册接收事件
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        // 轮询注册事件
        while (selector.select() > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 移除掉,避免重复处理
                iterator.remove();

                if (selectionKey.isAcceptable()) {
                    // 获取连接过来的客户端
                    SocketChannel accept = ssc.accept();

                    if (null == accept)
                        continue;

                    System.out.println(accept);

                    // 维护Token
                    String token = UUID.randomUUID().toString();
                    TOKEN_SOCKET_MAP.put(token, accept);
                    SOCKET_TOKEN_MAP.put(accept, token);

                    // 设置客户端为非阻塞式并注册可写事件
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_WRITE);
                }
                // 如果是读取事件
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();

                    ByteBuffer buffer = ByteBuffer.allocate(1024);

                    // 读取数据
                    StringBuilder sb = new StringBuilder();
                    while ((channel.read(buffer)) > 0) {
                        buffer.flip();
                        String msg = new String(buffer.array(), 0, buffer.limit());
                        sb.append(msg);
                        buffer.clear();
                    }

                    if (sb.length() > 0) {
                        System.out.println("-----读取数据:" + sb.toString());

                        writeMsg(channel, sb.toString());
                    }
                }
                // 如果是写出事件
                if (selectionKey.isWritable()) {

                    // 将生成的token返回给客户端
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    String content = getTokenBySocket(channel) + ":" + getTokenBySocket(channel);

                    if (null != channel) {
                        send(channel, content);
                        channel.register(selector, SelectionKey.OP_READ);
                    }
                }
            }
        }
    }

    /**
     * 处理数据并发送
     * @param channel 当前channel
     * @param msg 发送内容
     * @throws IOException IOException
     */
    private void writeMsg(SocketChannel channel, String msg) throws IOException {
        if (null == msg || "".equals(msg)) return;

        String[] split = msg.split(":");
        SocketChannel targetChannel = getSocketByToken(split[0]);
        String content = getTokenBySocket(channel) + ":" + split[1];

        send(targetChannel, content);
    }

    /**
     * 通过token找到SocketChannel
     *
     * @param token token
     * @return SocketChannel
     */
    public SocketChannel getSocketByToken(String token) {
        return TOKEN_SOCKET_MAP.get(token);
    }

    /**
     * 通过通道找到Token
     *
     * @param channel 通道
     * @return String
     */
    public String getTokenBySocket(SocketChannel channel) {
        return SOCKET_TOKEN_MAP.get(channel);
    }

    /**
     * 发送消息
     * 发送者token: 发送内容
     *
     * @param channel 通道
     * @param msg     消息
     * @throws IOException IO异常
     */
    public void send(SocketChannel channel, String msg) throws IOException {
        System.out.println("----发送的消息#" + msg);
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        channel.write(buffer);
    }
}

  • 客户端
public class ChatClient {

    private static Charset charset = StandardCharsets.UTF_8;

    public static void main(String[] args) throws IOException {
        // 开启SocketChannel
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", ChatServer.PORT));
        // 配置非阻塞
        socketChannel.configureBlocking(false);

        new Thread(() -> {
            try {
                Selector selector = Selector.open();
                socketChannel.register(selector, SelectionKey.OP_READ);

                while (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();

                        if (selectionKey.isReadable()) {
                            SocketChannel channel = (SocketChannel) selectionKey.channel();

                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            StringBuilder sb = new StringBuilder();
                            while ((channel.read(byteBuffer)) > 0) {
                                byteBuffer.flip();
                                String msg = new String(byteBuffer.array(), 0, byteBuffer.limit());
                                sb.append(msg);
                                byteBuffer.clear();
                            }
                            if (!"".equals(sb.toString())) {
                                System.out.println(sb.toString());
                            }
                            // 设置可读事件
                            selectionKey.interestOps(SelectionKey.OP_READ);
                        }
                    }
                }
            } catch (Exception e) {

            }
        }).start();

        Scanner scanner = new Scanner(System.in);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (scanner.hasNext()) {
            String next = scanner.next();
            System.out.println("----想要发送的数据:" + next);
            buffer.put(next.getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
        }
    }
}

代码实际测过,按照token:消息体的方式发送消息

demo代码,很多地方没有优化到,如果感兴趣可以自己尝试在这个基础上更新

最后的话

到此就全部结束啦,这里没有介绍DatagramChannel,该类属于UDP协议

同样的,很多其它API方法也没有聊到,还是给出文档,用到的时候现查就可以了:

NIO Buffer

NIO Channel

NIO File