软件销售网站模板,wordpress多媒体不显示,本地wordpress登录,山东网站建设哪家好文章目录 一、NIO-Selector1.处理accept2.cancel3.处理read4.处理客户端断开5. 处理消息的边界6. 写入内容过多的问题7. 处理可写事件 二、多线程优化三、NIO概念剖析1. stream 和 channel2. IO模型2.1 阻塞IO2.2 非阻塞IO2.3多路复用2.4 同步异步 3. 零拷贝3.1 NIO优化3.2 sen… 文章目录 一、NIO-Selector1.处理accept2.cancel3.处理read4.处理客户端断开5. 处理消息的边界6. 写入内容过多的问题7. 处理可写事件 二、多线程优化三、NIO概念剖析1. stream 和 channel2. IO模型2.1 阻塞IO2.2 非阻塞IO2.3多路复用2.4 同步异步 3. 零拷贝3.1 NIO优化3.2 sendFile优化3.3 进一步优化 4. AIO(异步IO 一、NIO-Selector
1.处理accept
//1.创建selector,管理多个channel
Selector selector Selector.open();
ByteBuffer buffer ByteBuffer.allocate(16);
ServerSocketChannel ssc ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系注册
//SelectionKey就是将来事件发生后通过它可以知道事件和哪个channel的事件
//四个事件
//accept 会在有连接请求时触发
//connect 是客户端连接建立后触发
//read 可读事件
//write 可写事件
SelectionKey sscKey ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){//3.select方法没有事件发生线程阻塞有事件线程才会恢复运行selector.select();//4.处理事件selectedKeys内部包含了所有发生的事件IteratorSelectionKey iter selector.selectedKeys.iterator();while(iter.next()){SelectionKey key iter.next();ServerSocketChannel channel (ServerSocketChannel)key.channel();SocketChannel sc channel.accept();}
}2.cancel
//1.创建selector,管理多个channel
Selector selector Selector.open();
ByteBuffer buffer ByteBuffer.allocate(16);
ServerSocketChannel ssc ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系注册
SelectionKey sscKey ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){//3.select方法没有事件发生线程阻塞有事件线程才会恢复运行//select在事件未处理时它不会阻塞事件发生后要么处理要么取消不能置之不理selector.select();//4.处理事件selectedKeys内部包含了所有发生的事件IteratorSelectionKey iter selector.selectedKeys.iterator();while(iter.next()){SelectionKey key iter.next();key.cancel();}
}3.处理read
用完key必须要remove
//1.创建selector,管理多个channel
Selector selector Selector.open();
ByteBuffer buffer ByteBuffer.allocate(16);
ServerSocketChannel ssc ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系注册
SelectionKey sscKey ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){//3.select方法没有事件发生线程阻塞有事件线程才会恢复运行selector.select();//4.处理事件selectedKeys内部包含了所有发生的事件//selector会在发生事件后向集合中加入key,但不会删除IteratorSelectionKey iter selector.selectedKeys.iterator();while(iter.next()){SelectionKey key iter.next();//处理key时要从selectedKeys集合中删除否则下次处理就会有问题iter.remove();//5.区分事件类型if(key.isAcceptable()){ //如果是acceptServerSocketChannel channel (ServerSocketChannel)key.channel();SocketChannel sc channel.accept();sc.configureBlocking(false);SelectionKey sckey sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ);}elseif(key.isReadable()){//拿到触发事件的channelServerSocketChannel channel (ServerSocketChannel)key.channel();ByteBuffer buffer ByteBuffer.allocate(16);channel.read(buffer);buffer.flip();debugRead(buffer);}}
}4.处理客户端断开
//1.创建selector,管理多个channel
Selector selector Selector.open();
ByteBuffer buffer ByteBuffer.allocate(16);
ServerSocketChannel ssc ServerSocketChannel.open();
ssc.configureBlocking(false);
//2.建立selector和channel的联系注册
SelectionKey sscKey ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){ //3.select方法没有事件发生线程阻塞有事件线程才会恢复运行 selector.select(); //4.处理事件selectedKeys内部包含了所有发生的事件 //selector会在发生事件后向集合中加入key,但不会删除 IteratorSelectionKey iter selector.selectedKeys.iterator(); while(iter.next()){ SelectionKey key iter.next(); //处理key时要从selectedKeys集合中删除否则下次处理就会有问题 iter.remove(); //5.区分事件类型 if(key.isAcceptable()){ //如果是accept ServerSocketChannel channel (ServerSocketChannel)key.channel(); SocketChannel sc channel.accept();sc.configureBlocking(false); SelectionKey sckey sc.register(selector, 0, null); scKey.interestOps(SelectionKey.OP_READ); }elseif(key.isReadable()){ try{ //拿到触发事件的channel ServerSocketChannel channel (ServerSocketChannel)key.channel(); ByteBuffer buffer ByteBuffer.allocate(16); int read channel.read(buffer);//如果是正常断开read的方法的返回值是-1 ifread -1){ key.cancel(); }else{ buffer.flip(); debugRead(buffer); } }catch(IOException e){ e.printStackTrace();//因为客户端断开了因此需要将key取消从selector 的keys集合中真正删除key) key.cancel();}}}
}5. 处理消息的边界
固定消息长度数据包大小一样服务器按预定长度读取缺点是浪费带宽按分隔符拆分缺点是效率低TLV格式Type类型,Length长度Value数据可以方便获取消息大小分配合适的buffer缺点是buffer需要提前分配如果内容过大影响server吞吐量 Http1.1是TLV格式Http2.0是LTV格式
private static void split(ByteBuffer source){source.flip();for(int i 0; i source.limit(); i){//找到一条完整消息if(source.get(i) \n){int length i 1 -source.position();//把这条完整消息存入新的ByteBufferByteBuffer target ByteBuffer.allocate(length);//从source读向target写for(int j 0; j length; j){target.put(source.get());}debugAll(target);}}source.compact();
}public static void main(){//1.创建selector,管理多个channelSelector selector Selector.open(); ByteBuffer buffer ByteBuffer.allocate(16); ServerSocketChannel ssc ServerSocketChannel.open(); ssc.configureBlocking(false);//2.建立selector和channel的联系注册SelectionKey sscKey ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); while(true){ //3.select方法没有事件发生线程阻塞有事件线程才会恢复运行 selector.select(); //4.处理事件selectedKeys内部包含了所有发生的事件 //selector会在发生事件后向集合中加入key,但不会删除 IteratorSelectionKey iter selector.selectedKeys.iterator(); while(iter.next()){ SelectionKey key iter.next(); //处理key时要从selectedKeys集合中删除否则下次处理就会有问题 iter.remove(); //5.区分事件类型 if(key.isAcceptable()){ //如果是accept ServerSocketChannel channel (ServerSocketChannel)key.channel(); SocketChannel sc channel.accept();sc.configureBlocking(false); ByteBuffer buffer ByteBuffer.allocate(16); //attachment附件//将一个byteBuffer作为附件关联到selectionKey上SelectionKey sckey sc.register(selector, 0, buffer); scKey.interestOps(SelectionKey.OP_READ); }elseif(key.isReadable()){ try{ //拿到触发事件的channel ServerSocketChannel channel (ServerSocketChannel)key.channel(); //获取selectionKey上关联的附件ByteBuffer buffer (ByteBuffer)key.attatchment();int read channel.read(buffer);//如果是正常断开read的方法的返回值是-1 ifread -1){ key.cancel(); }else{ split(buffer);if(buffer.position() buffer.limit()){//扩容ByteBuffer newBuffer ByteBuffer.allocate(buffer.capacity()*2);buffer.flip();newBuffer.put(buffer);//复制key.attach(newbuffer);//替换掉key上原有的buffer}} }catch(IOException e){ e.printStackTrace();//因为客户端断开了因此需要将key取消从selector 的keys集合中真正删除key) key.cancel();}}}}
}6. 写入内容过多的问题
//服务器
public static void main(){ServerSocketChannel ssc ServerSocketChannrl.open();ssc.configureBlocking(false);Selector selector Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(trye){selector.select();IteratorSelectionKey iter selector.selectedKeys.iterator();while(iter.hasNext()){SelectionKey key iter.next();iter.remove();if(key.isAcceptable()){SocketChannel sc ssc.accept();sc.configureBlocking(false);//1.向客户端发送大量数据StringBuilder sb new StringBuilder();for(int i 0; i 3000000; i){sb.append(a);}BytrBuffer buffer Charset.defaultCharset().encode(sb.toString());//不符合非阻塞模式while(buffer.hasRemaining()){//2.返回值代表实际写入的字节数//不能一次性写完//write 0 缓冲区满写不了int write sc.write(buffer);System.out.println(write):}}}}
}//客户端
public static void main(){SocketChannel sc SocketChannel.open();sc.connect(new InetSocketAddress(localhost,8080));//3.接收数据int count 0;while(true){ByteBuffer buffer ByteBuffer.allocate(1024*1024);count sc.read(buffer);System.out.println(count);buffer.clear();}
}7. 处理可写事件
//服务器
public static void main(){ServerSocketChannel ssc ServerSocketChannrl.open();ssc.configureBlocking(false);Selector selector Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(trye){selector.select();IteratorSelectionKey iter selector.selectedKeys.iterator();while(iter.hasNext()){SelectionKey key iter.next();iter.remove();if(key.isAcceptable()){SocketChannel sc ssc.accept();sc.configureBlocking(false);SelectionKey sckey sc.register(selector, 0, null);sckey.interestOps(SelectionKey.OP_READ);//1.向客户端发送大量数据StringBuilder sb new StringBuilder();for(int i 0; i 3000000; i){sb.append(a);}BytrBuffer buffer Charset.defaultCharset().encode(sb.toString());//2.返回值代表实际写入的字节数//不能一次性写完//先写一次int write sc.write(buffer);System.out.println(write)://3.判断是否有剩余内容while(buffer.hasRemaining()){//4.关注可写事件sckey.interestOps(sckey.interestOps() SelectionKey.OP_WRITE);//sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);//5.把未写完的数据挂到sckey上sckey.attach(buffer);}}elseif(key.isWritable())[ByteBuffer buffer (ByteBuffer) key.attachment();SocketChannel sc (SocketChannel)key.channel();int write sc.write(buffer);System.out.println(write)://6.清理操作内存释放if!buffer.haeRemaining()){key.attach(null);//需要清除bufferkey.interestOps(key.interestOps() - SelectionKey.OP_WRITE);//不需关注可写事件}}}}
}二、多线程优化
前面的代码只有一个选择器没有充分利用多核cpu如何改进呢 分两组选择器boss建立连接worker负责数据读写
单线程配一个选择器专门处理accept事件创建cpu核心数的线程每个线程配一个选择器轮流处理read事件
public static void main(){Thread.currentThrea().setName(boss);ServerSocketChannel ssc ServerSocketChannel.open();ssc.configuraBlocking(flase);Selector boss Selector.open();SelectionKey bosskey ssc.register(boss, 0, null);bosskey.interestOps(SelectionKey.OP_ACCEPT):ssc.bind(new InetSocketAddress(8080));//1.创建固定数量的worker并初始化Worker[] workers new Worker[2];for(int i 0; i workers.length; i{workers[i] new Worker(worker-i);}//计数器AtomicInteger index new AtomicInteger():while(true){boss.select();IteratorSelectionKey iter boss.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key iter.next();iter.remove();if(key.isAcceptable())[SocketChannel sc ssc.accept();sc.configureBlocking(false)://2.关联selector//轮询workers[index.getAndIncrement() % workers.length}.register(sc);}}}
}static class Worker implements Runnable{private Thread thread;private Selector worker;private String name;private volatile boolean star false;//还未初始化private ConcurrentLinkedQueueRunnable queue new ConcurrentLinkedQueue():public Worker(String name){this.name name;}//初始化线程和selectorpublic void register(SocketChannel sc){if!start){selector Selector.open();thread new Thread(this, name);thread.start():start true;}//向队列添加任务但这个任务并没有被boss立刻执行queue.add()-{try{sc.register(worker.selector, SelectionKey.OP_READ, null);}catch(ClosedChannelException e) {e.printStackTrace()}}//唤醒run中的select方法selector.wakeup();//也可以用以下方式先wakeup后select阻塞时也能被唤醒/* selector.wakeup();sc.register(worker.selector, SelectionKey.OP_READ, null);*/}Overridepublic void run(){while(true){try{worker.select();Runnable task queue.poll();if(task ! null){task.run();}IteratorSelectionKey iter worker.selectedKeys().iterator();while(iter.hasNext()){SlectionKey key iter.next();iter.remove();if(key.isReadable()){ByteBuffer buffer ByteBuffer.allocate(16);SocketChannel channel (SocketChannel)key.channel();channel.read(buffer);buffer.flip();debugAll(buffer);}}}catch(IOException e){e.printStackTrace();}}}
}三、NIO概念剖析
1. stream 和 channel
stream不会自动缓冲数据channel会利用系统提供的发送缓冲区、接收缓冲区更底层stream仅支持阻塞APIchannel同时支持阻塞、非阻塞API网络channel可配合selector实现多路复用二者均为全双工即读写可同时进行
2. IO模型
2.1 阻塞IO 用户线程被阻塞同步
2.2 非阻塞IO read是中运行无数据立刻返回有数据复制完返回 等待数据非阻塞复制数据阻塞同步 缺点多测内核切换
2.3多路复用 select等待数据阻塞read复制数据阻塞同步 一次性处理多个channel上的事件
2.4 同步异步
同步线程自己去获取结果一个线程 异步一个线程发送一个线程送结果两个线程 read非阻塞 异步阻塞不存在 3. 零拷贝
传统io将一个文件通过socket写出内部工作流程 用户和内核态的切换发生了3次这个操作比较重量级 数据拷贝了4次
3.1 NIO优化
通过DirectByteBuffer ByteBuffer.allocate(10) ,返回HeapByteBuffer,使用Java内存 ByteBuffer.allocateDirect(10),返回DirectByteBuffer,使用操作系统内存 java可以使用DirectByteBuffer将堆内存映射到jvm内存中来直接访问使用 减少了一次数据拷贝用户态与内核态的切换次数没有减少
3.2 sendFile优化
Linux2.1后提供sendFile方法Java中对应着两个channel调用transferTo/transferFrom方法拷贝数据 只发生了一次用户态与内核态的切换 数据拷贝了3次
3.3 进一步优化 一次切换2次拷贝 零拷贝并不是真正无拷贝而是在不会拷贝重复数据到jvm内存中零拷贝的优点有 更少的用户态和内核态切换不利用cpu计算减少cpu缓存伪共享零拷贝适合小文件传输 4. AIO(异步IO
netty不支持异步IO
public static void main(){try(AsynchronousFileChannel channel AsynchronousFileChannel.open(Paths.get(data.txt), StandardOpenOption.READ)){//参数1 ByteBuffer//读取的起始位置//附件//回调对象 CompletionHandlerByteBuffer buffer ByteBuffer.allocate(16);channel.read(buffer, 0, buffer, new CompletionHandlerInteger, ByteBuffer(){Override//read成功public void completed(Integer result, ByteBuffer attachment){attachment.flip();debugAll(attachment);}Override// read失败public void failed(Throwable exc, ByteBuffer attachment){exc.printStachTrace();}}):}catch(IOException e){e.printStackTrace();}//主线程结束守护线程结束//接收控制台的输入控制台不输入就停在这儿System.in.read();
}