南通网站建设.,制作荧光字网站,展示设计作品欣赏,旅游网站怎么制作上文说到单线程的reactor模式 reactor模式#xff1a;单线程的reactor模式 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题#xff0c;所以多线程的reactor模式引入线程池的概念#xff0c;把耗时的IO操作交由线程池处理#xff0c;处理完了之后再同步到selecti…上文说到单线程的reactor模式 reactor模式单线程的reactor模式 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题所以多线程的reactor模式引入线程池的概念把耗时的IO操作交由线程池处理处理完了之后再同步到selectionkey中服务器架构图如下 上文reactor模式单线程的reactor模式提到以read和send阶段IO最为频繁所以多线程的reactor版本里把这2个阶段单独拎出来。 下面看看代码实现 1 // Reactor線程 该类与单线程的处理基本无变动 2 package server; 3 4 import java.io.IOException; 5 import java.net.InetSocketAddress; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.util.Iterator;
10 import java.util.Set;
11
12 public class TCPReactor implements Runnable {
13
14 private final ServerSocketChannel ssc;
15 private final Selector selector;
16
17 public TCPReactor(int port) throws IOException {
18 selector Selector.open();
19 ssc ServerSocketChannel.open();
20 InetSocketAddress addr new InetSocketAddress(port);
21 ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
22 ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
23 SelectionKey sk ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件然後返回該通道的key
24 sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象
25 }
26
27 Override
28 public void run() {
29 while (!Thread.interrupted()) { // 在線程被中斷前持續運行
30 System.out.println(Waiting for new event on port: ssc.socket().getLocalPort() ...);
31 try {
32 if (selector.select() 0) // 若沒有事件就緒則不往下執行
33 continue;
34 } catch (IOException e) {
35 // TODO Auto-generated catch block
36 e.printStackTrace();
37 }
38 SetSelectionKey selectedKeys selector.selectedKeys(); // 取得所有已就緒事件的key集合
39 IteratorSelectionKey it selectedKeys.iterator();
40 while (it.hasNext()) {
41 dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
42 it.remove();
43 }
44 }
45 }
46
47 /*
48 * name: dispatch(SelectionKey key)
49 * description: 調度方法根據事件綁定的對象開新線程
50 */
51 private void dispatch(SelectionKey key) {
52 Runnable r (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
53 if (r ! null)
54 r.run();
55 }
56
57 } 1 // 接受連線請求線程 2 package server; 3 4 import java.io.IOException; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.Selector; 7 import java.nio.channels.ServerSocketChannel; 8 import java.nio.channels.SocketChannel; 9
10 public class Acceptor implements Runnable {
11
12 private final ServerSocketChannel ssc;
13 private final Selector selector;
14
15 public Acceptor(Selector selector, ServerSocketChannel ssc) {
16 this.sscssc;
17 this.selectorselector;
18 }
19
20 Override
21 public void run() {
22 try {
23 SocketChannel sc ssc.accept(); // 接受client連線請求
24 System.out.println(sc.socket().getRemoteSocketAddress().toString() is connected.);
25
26 if(sc!null) {
27 sc.configureBlocking(false); // 設置為非阻塞
28 SelectionKey sk sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件然後返回該通道的key
29 selector.wakeup(); // 使一個阻塞住的selector操作立即返回
30 sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
31 }
32
33 } catch (IOException e) {
34 // TODO Auto-generated catch block
35 e.printStackTrace();
36 }
37 }
38
39
40 } 1 // Handler線程 2 package server; 3 4 import java.io.IOException; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.SocketChannel; 7 import java.util.concurrent.LinkedBlockingQueue; 8 import java.util.concurrent.ThreadPoolExecutor; 9 import java.util.concurrent.TimeUnit;
10
11 public class TCPHandler implements Runnable {
12
13 private final SelectionKey sk;
14 private final SocketChannel sc;
15 private static final int THREAD_COUNTING 10;
16 private static ThreadPoolExecutor pool new ThreadPoolExecutor(
17 THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
18 new LinkedBlockingQueueRunnable()); // 線程池
19
20 HandlerState state; // 以狀態模式實現Handler
21
22 public TCPHandler(SelectionKey sk, SocketChannel sc) {
23 this.sk sk;
24 this.sc sc;
25 state new ReadState(); // 初始狀態設定為READING
26 pool.setMaximumPoolSize(32); // 設置線程池最大線程數
27 }
28
29 Override
30 public void run() {
31 try {
32 state.handle(this, sk, sc, pool);
33
34 } catch (IOException e) {
35 System.out.println([Warning!] A client has been closed.);
36 closeChannel();
37 }
38 }
39
40 public void closeChannel() {
41 try {
42 sk.cancel();
43 sc.close();
44 } catch (IOException e1) {
45 e1.printStackTrace();
46 }
47 }
48
49 public void setState(HandlerState state) {
50 this.state state;
51 }
52 }
53
54 1 package server; 2 3 import java.io.IOException; 4 import java.nio.channels.SelectionKey; 5 import java.nio.channels.SocketChannel; 6 import java.util.concurrent.ThreadPoolExecutor; 7 8 public interface HandlerState { 9
10 public void changeState(TCPHandler h);
11
12 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
13 ThreadPoolExecutor pool) throws IOException ;
14 } 1 package server; 2 3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.SocketChannel; 7 import java.util.concurrent.ThreadPoolExecutor; 8 9 public class ReadState implements HandlerState{
10
11 private SelectionKey sk;
12
13 public ReadState() {
14 }
15
16 Override
17 public void changeState(TCPHandler h) {
18 // TODO Auto-generated method stub
19 h.setState(new WorkState());
20 }
21
22 Override
23 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
24 ThreadPoolExecutor pool) throws IOException { // read()
25 this.sk sk;
26 // non-blocking下不可用Readers因為Readers不支援non-blocking
27 byte[] arr new byte[1024];
28 ByteBuffer buf ByteBuffer.wrap(arr);
29
30 int numBytes sc.read(buf); // 讀取字符串
31 if(numBytes -1)
32 {
33 System.out.println([Warning!] A client has been closed.);
34 h.closeChannel();
35 return;
36 }
37 String str new String(arr); // 將讀取到的byte內容轉為字符串型態
38 if ((str ! null) !str.equals( )) {
39 h.setState(new WorkState()); // 改變狀態(READING-WORKING)
40 pool.execute(new WorkerThread(h, str)); // do process in worker thread
41 System.out.println(sc.socket().getRemoteSocketAddress().toString()
42 str);
43 }
44
45 }
46
47 /*
48 * 執行邏輯處理之函數
49 */
50 synchronized void process(TCPHandler h, String str) {
51 // do process(decode, logically process, encode)..
52 // ..
53 h.setState(new WriteState()); // 改變狀態(WORKING-SENDING)
54 this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
55 this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
56 }
57
58 /*
59 * 工作者線程
60 */
61 class WorkerThread implements Runnable {
62
63 TCPHandler h;
64 String str;
65
66 public WorkerThread(TCPHandler h, String str) {
67 this.h h;
68 this.strstr;
69 }
70
71 Override
72 public void run() {
73 process(h, str);
74 }
75
76 }
77 } 1 package server; 2 3 import java.io.IOException; 4 import java.nio.channels.SelectionKey; 5 import java.nio.channels.SocketChannel; 6 import java.util.concurrent.ThreadPoolExecutor; 7 8 public class WorkState implements HandlerState { 9
10 public WorkState() {
11 }
12
13 Override
14 public void changeState(TCPHandler h) {
15 // TODO Auto-generated method stub
16 h.setState(new WriteState());
17 }
18
19 Override
20 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
21 ThreadPoolExecutor pool) throws IOException {
22 // TODO Auto-generated method stub
23
24 }
25
26 } 1 package server; 2 3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.SelectionKey; 6 import java.nio.channels.SocketChannel; 7 import java.util.concurrent.ThreadPoolExecutor; 8 9 public class WriteState implements HandlerState{
10
11 public WriteState() {
12 }
13
14 Override
15 public void changeState(TCPHandler h) {
16 // TODO Auto-generated method stub
17 h.setState(new ReadState());
18 }
19
20 Override
21 public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
22 ThreadPoolExecutor pool) throws IOException { // send()
23 // get message from message queue
24
25 String str Your message has sent to
26 sc.socket().getLocalSocketAddress().toString() \r\n;
27 ByteBuffer buf ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0所以不需要再flip()
28
29 while (buf.hasRemaining()) {
30 sc.write(buf); // 回傳給client回應字符串發送buf的position位置 到limit位置為止之間的內容
31 }
32
33 h.setState(new ReadState()); // 改變狀態(SENDING-READING)
34 sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
35 sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
36 }
37 } 1 package server; 2 3 import java.io.IOException; 4 5 public class Main { 6 7 8 public static void main(String[] args) { 9 // TODO Auto-generated method stub
10 try {
11 TCPReactor reactor new TCPReactor(1333);
12 reactor.run();
13 } catch (IOException e) {
14 // TODO Auto-generated catch block
15 e.printStackTrace();
16 }
17 }
18
19 } 总的来说多线程版本的reactor是为了解决单线程reactor版本的IO和CPU处理速度不匹配问题从而达到高效处理的目的 参考文章 https://blog.csdn.net/yehjordan/article/details/51017025转载于:https://www.cnblogs.com/billmiao/p/9872221.html