当前位置: 首页 > news >正文

南通网站建设.制作荧光字网站

南通网站建设.,制作荧光字网站,展示设计作品欣赏,旅游网站怎么制作上文说到单线程的reactor模式 reactor模式#xff1a;单线程的reactor模式 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题#xff0c;所以多线程的reactor模式引入线程池的概念#xff0c;把耗时的IO操作交由线程池处理#xff0c;处理完了之后再同步到selecti…上文说到单线程的reactor模式 reactor模式单线程的reactor模式 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题所以多线程的reactor模式引入线程池的概念把耗时的IO操作交由线程池处理处理完了之后再同步到selectionkey中服务器架构图如下     上文reactor模式单线程的reactor模式提到以read和send阶段IO最为频繁所以多线程的reactor版本里把这2个阶段单独拎出来。 下面看看代码实现   1 // Reactor線程 该类与单线程的处理基本无变动 2 package server; 3 4 import java.io.IOException; 5 import java.net.InetSocketAddress; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.util.Iterator; 10 import java.util.Set; 11 12 public class TCPReactor implements Runnable { 13 14 private final ServerSocketChannel ssc; 15 private final Selector selector; 16 17 public TCPReactor(int port) throws IOException { 18 selector Selector.open(); 19 ssc ServerSocketChannel.open(); 20 InetSocketAddress addr new InetSocketAddress(port); 21 ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口 22 ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞 23 SelectionKey sk ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件然後返回該通道的key 24 sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象 25 } 26 27 Override 28 public void run() { 29 while (!Thread.interrupted()) { // 在線程被中斷前持續運行 30 System.out.println(Waiting for new event on port: ssc.socket().getLocalPort() ...); 31 try { 32 if (selector.select() 0) // 若沒有事件就緒則不往下執行 33 continue; 34 } catch (IOException e) { 35 // TODO Auto-generated catch block 36 e.printStackTrace(); 37 } 38 SetSelectionKey selectedKeys selector.selectedKeys(); // 取得所有已就緒事件的key集合 39 IteratorSelectionKey it selectedKeys.iterator(); 40 while (it.hasNext()) { 41 dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度 42 it.remove(); 43 } 44 } 45 } 46 47 /* 48 * name: dispatch(SelectionKey key) 49 * description: 調度方法根據事件綁定的對象開新線程 50 */ 51 private void dispatch(SelectionKey key) { 52 Runnable r (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程 53 if (r ! null) 54 r.run(); 55 } 56 57 }     1 // 接受連線請求線程 2 package server; 3 4 import java.io.IOException; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.Selector; 7 import java.nio.channels.ServerSocketChannel; 8 import java.nio.channels.SocketChannel; 9 10 public class Acceptor implements Runnable { 11 12 private final ServerSocketChannel ssc; 13 private final Selector selector; 14 15 public Acceptor(Selector selector, ServerSocketChannel ssc) { 16 this.sscssc; 17 this.selectorselector; 18 } 19 20 Override 21 public void run() { 22 try { 23 SocketChannel sc ssc.accept(); // 接受client連線請求 24 System.out.println(sc.socket().getRemoteSocketAddress().toString() is connected.); 25 26 if(sc!null) { 27 sc.configureBlocking(false); // 設置為非阻塞 28 SelectionKey sk sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件然後返回該通道的key 29 selector.wakeup(); // 使一個阻塞住的selector操作立即返回 30 sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象 31 } 32 33 } catch (IOException e) { 34 // TODO Auto-generated catch block 35 e.printStackTrace(); 36 } 37 } 38 39 40 }     1 // Handler線程 2 package server; 3 4 import java.io.IOException; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.SocketChannel; 7 import java.util.concurrent.LinkedBlockingQueue; 8 import java.util.concurrent.ThreadPoolExecutor; 9 import java.util.concurrent.TimeUnit; 10 11 public class TCPHandler implements Runnable { 12 13 private final SelectionKey sk; 14 private final SocketChannel sc; 15 private static final int THREAD_COUNTING 10; 16 private static ThreadPoolExecutor pool new ThreadPoolExecutor( 17 THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS, 18 new LinkedBlockingQueueRunnable()); // 線程池 19 20 HandlerState state; // 以狀態模式實現Handler 21 22 public TCPHandler(SelectionKey sk, SocketChannel sc) { 23 this.sk sk; 24 this.sc sc; 25 state new ReadState(); // 初始狀態設定為READING 26 pool.setMaximumPoolSize(32); // 設置線程池最大線程數 27 } 28 29 Override 30 public void run() { 31 try { 32 state.handle(this, sk, sc, pool); 33 34 } catch (IOException e) { 35 System.out.println([Warning!] A client has been closed.); 36 closeChannel(); 37 } 38 } 39 40 public void closeChannel() { 41 try { 42 sk.cancel(); 43 sc.close(); 44 } catch (IOException e1) { 45 e1.printStackTrace(); 46 } 47 } 48 49 public void setState(HandlerState state) { 50 this.state state; 51 } 52 } 53 54   1 package server; 2 3 import java.io.IOException; 4 import java.nio.channels.SelectionKey; 5 import java.nio.channels.SocketChannel; 6 import java.util.concurrent.ThreadPoolExecutor; 7 8 public interface HandlerState { 9 10 public void changeState(TCPHandler h); 11 12 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, 13 ThreadPoolExecutor pool) throws IOException ; 14 }     1 package server; 2 3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.SocketChannel; 7 import java.util.concurrent.ThreadPoolExecutor; 8 9 public class ReadState implements HandlerState{ 10 11 private SelectionKey sk; 12 13 public ReadState() { 14 } 15 16 Override 17 public void changeState(TCPHandler h) { 18 // TODO Auto-generated method stub 19 h.setState(new WorkState()); 20 } 21 22 Override 23 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, 24 ThreadPoolExecutor pool) throws IOException { // read() 25 this.sk sk; 26 // non-blocking下不可用Readers因為Readers不支援non-blocking 27 byte[] arr new byte[1024]; 28 ByteBuffer buf ByteBuffer.wrap(arr); 29 30 int numBytes sc.read(buf); // 讀取字符串 31 if(numBytes -1) 32 { 33 System.out.println([Warning!] A client has been closed.); 34 h.closeChannel(); 35 return; 36 } 37 String str new String(arr); // 將讀取到的byte內容轉為字符串型態 38 if ((str ! null) !str.equals( )) { 39 h.setState(new WorkState()); // 改變狀態(READING-WORKING) 40 pool.execute(new WorkerThread(h, str)); // do process in worker thread 41 System.out.println(sc.socket().getRemoteSocketAddress().toString() 42 str); 43 } 44 45 } 46 47 /* 48 * 執行邏輯處理之函數 49 */ 50 synchronized void process(TCPHandler h, String str) { 51 // do process(decode, logically process, encode).. 52 // .. 53 h.setState(new WriteState()); // 改變狀態(WORKING-SENDING) 54 this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件 55 this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回 56 } 57 58 /* 59 * 工作者線程 60 */ 61 class WorkerThread implements Runnable { 62 63 TCPHandler h; 64 String str; 65 66 public WorkerThread(TCPHandler h, String str) { 67 this.h h; 68 this.strstr; 69 } 70 71 Override 72 public void run() { 73 process(h, str); 74 } 75 76 } 77 }   1 package server; 2 3 import java.io.IOException; 4 import java.nio.channels.SelectionKey; 5 import java.nio.channels.SocketChannel; 6 import java.util.concurrent.ThreadPoolExecutor; 7 8 public class WorkState implements HandlerState { 9 10 public WorkState() { 11 } 12 13 Override 14 public void changeState(TCPHandler h) { 15 // TODO Auto-generated method stub 16 h.setState(new WriteState()); 17 } 18 19 Override 20 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, 21 ThreadPoolExecutor pool) throws IOException { 22 // TODO Auto-generated method stub 23 24 } 25 26 } 1 package server; 2 3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.SocketChannel; 7 import java.util.concurrent.ThreadPoolExecutor; 8 9 public class WriteState implements HandlerState{ 10 11 public WriteState() { 12 } 13 14 Override 15 public void changeState(TCPHandler h) { 16 // TODO Auto-generated method stub 17 h.setState(new ReadState()); 18 } 19 20 Override 21 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc, 22 ThreadPoolExecutor pool) throws IOException { // send() 23 // get message from message queue 24 25 String str Your message has sent to 26 sc.socket().getLocalSocketAddress().toString() \r\n; 27 ByteBuffer buf ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0所以不需要再flip() 28 29 while (buf.hasRemaining()) { 30 sc.write(buf); // 回傳給client回應字符串發送buf的position位置 到limit位置為止之間的內容 31 } 32 33 h.setState(new ReadState()); // 改變狀態(SENDING-READING) 34 sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件 35 sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回 36 } 37 } 1 package server; 2 3 import java.io.IOException; 4 5 public class Main { 6 7 8 public static void main(String[] args) { 9 // TODO Auto-generated method stub 10 try { 11 TCPReactor reactor new TCPReactor(1333); 12 reactor.run(); 13 } catch (IOException e) { 14 // TODO Auto-generated catch block 15 e.printStackTrace(); 16 } 17 } 18 19 }   总的来说多线程版本的reactor是为了解决单线程reactor版本的IO和CPU处理速度不匹配问题从而达到高效处理的目的   参考文章 https://blog.csdn.net/yehjordan/article/details/51017025转载于:https://www.cnblogs.com/billmiao/p/9872221.html
http://wiki.neutronadmin.com/news/55070/

相关文章:

  • wordpress发的文章怎么删除苏州百度搜索排名优化
  • 网站后台如何添加关键词什么是营销网站建设
  • 专门做汽车配件的外贸网站所有爱做网站
  • 烟台专业网站建设公司宁波优化关键词首页排名
  • 如何让自己的网站快速被百度收录温州高端模板建站
  • 网站商品图片怎么做天津移动网站建设
  • 招聘网站有哪些平台在网站上保存网址怎么做
  • 全球十大网站排名织梦 做网站 知乎
  • 语音app开发公司北京数据优化公司
  • 做代码的网站淘客做网站运营
  • 自学做网站的书wordpress 用户名
  • 网站建设要注意哪些wordpress按分类搜索
  • 太原seo网站优化建设维护网站 未签订合同
  • 甘肃兰州网站建设音乐网站建设方案书模板
  • 网页设计与网站建设主要内容网络营销推广活动方案
  • 长春seo网站优化佛山高端网页制作
  • 吉林市做网站的科技html5网站后台页面设计
  • 网站显示备案号seo站外推广
  • ps制作博客网站界面外贸网站平台是不是很难做
  • 中建招聘网站wordpress如何加表情
  • 做外汇看新闻在什么网站看wordpress 摘要函数
  • 山东建设局网站电工网站 网页区别是什么
  • 汽车 营销 网站建设多语言网站思路
  • 专业的美容网站建设公关咨询公司
  • php网站建设 关键技术沈阳男科三甲医院排行榜
  • 一个人做的网站做什么好外贸型网站建设方法
  • 前端做网站难吗专业的定制型网站建设
  • 网站推广渠道制作简易网站
  • 中国工商银行官网网站wordpress 手机维修
  • 江苏省泰州市建设局官方网站邯郸网站制作外包