网上服装商城网站建设方案策划书,房屋租赁网站开发需求分析,朋友圈推广广告,上海网络推广部字节开源的netPoll多路复用器源码解析 引言NetPollepoll API原生网络库实现netpoll 设计思路netpoll 对比 go net数据结构 源码解析多路复用池初始化Epoll相关API可读事件处理server启动accept 事件客户端连接初始化客户端连接建立 可读事件等待读取数据 可写事件处理客户端启动… 字节开源的netPoll多路复用器源码解析 引言NetPollepoll API原生网络库实现netpoll 设计思路netpoll 对比 go net数据结构 源码解析多路复用池初始化Epoll相关API可读事件处理server启动accept 事件客户端连接初始化客户端连接建立 可读事件等待读取数据 可写事件处理客户端启动写数据可写事件客户端socket可写事件服务端socket可写事件 引言
IO 有阻塞和非阻塞两种模式在阻塞IO下我们需要耗费一个线程去阻塞在read操作下去等待有足够多的数据可读并返回。在非阻塞IO下不停对所有fd集合进行轮询筛选出所有可读fd进行处理。
阻塞IO浪费线程(会占用内存和上下文切换开销)非阻塞IO会浪费CPU做大量无效操作。而基于IO多路复用系统调用实现的poll的意义在于将可读/可写状态通知和实际文件操作分开并支持多个文件描述符通过一个系统调用监听以提升性能。 网络库的核心功能就是去同时监听大量的文件描述符的状态变化(通过操作系统调用)并对于不同状态变更高效安全地进行对应的文件操作。
对于一个高效的网络库而言它的设计需要考虑以下几个场景
连接数量密集型: 如长连接场景每个连接请求并不多但是需要一直维护着长连接连接创建/销毁密集型: 如短连接场景会频繁创建销毁连接 这类场景下对 listener fd 的压⼒很⼤监听 listener fd 的系统调⽤会被频繁唤醒。⽽⼀个 fd 只能被⼀个线程处理这样的话创建连接的压⼒只能由单个 CPU 承担⽆法充分利⽤多核。Linux 后⾯增加SO_REUSEPORT 功能可以对同⼀个 bind ipport 创建多个 listener fd内核提供负载均衡分发这样来实现多核处理连接创建密集型场景。需要注意的是即便是那些⻓连接场景下如果遇到⼀些特殊业务场景例如准点秒杀也会出现瞬间创建⼤量连接的情况。 请求密集型: 如支持连接多路复用的RPC服务点对点的所有请求都可以基于一个长连接进行此时单连接会频繁被唤醒处理事件
同时由于网络库不仅要管理监听文件事件还需要管理用户业务逻辑层handler的执行 因此一个设计优秀的网络库还应当具备以下指标:
QPS 尽量高 QPS 要⾼意味着要让单请求开销低即平均 latency 低⾸先要保证充分利⽤CPU 其次是要让 CPU 尽可能少执⾏内存拷⻉等和我们⾸要⼯作⽆关的代码。简⽽⾔之CPU处于满负荷⼯作且做有效的⼯作的状态。⽆效⼯作是指那些和业务⽆关的事情例如GC线程 协程上下⽂切换开销锁竞争等。在 Golang 中G 依赖 P 运⾏⽽ P ⾃⾝有调度逻辑所以需要尽可能充分利⽤ P不让 P 空转 P99 延迟尽量低 P99 ⽐ Avg ⾼的根因是在运⾏中间遇到⼀些原因导致 CPU 腾出去进⾏了其他的⼯作或是整个⼯作循环被暂停了如 GC stop the world或是 Goroutine 陷⼊ syscall导致的暂时卡顿⼜或是锁竞争。 NetPoll
epoll API
在正式开始讲解NetPoll源码前我们先来快速复习一下多路复用API实现本文基于Linux系统进行展开所有此处多路复用器实现基于epoll展开:
typedef union epoll_data {int fd;//...
} epoll_data_t;struct epoll_event {uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
};int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);epoll_create: 创建 epollfd 对象后续 epoll 操作都围绕该对象。epoll_ctl: 在 epollfd 对象上对 fd 使⽤某个 op(ADD/DEL/...)并声明关⼼什么events(EPOLLIN/EPOLLOUT/...)。epoll_wait: 阻塞等待直到 epollfd 内有就绪事件便返回返回值为有效事件数并且有效事件会记录再传⼊的 events 地址中。 timeout 0 时超过 timeout ms 后返回若⽆事件发⽣返回值为 0timeout 0 时⾮阻塞即便没有任何事件发⽣也会⽴刻返回返回值为 0timeout -1 时: 阻塞直到有事件发⽣
Epoll 在使⽤上有两种模式边缘触发(ET)和⽔平触发(LT)
边缘触发只有在从⽆数据到有数据时通知⼀次⽽⽔平触发只要 fd 处于可读状态就会⼀直触发。⽔平触发的缺点在于如果没有读完 fd 继续调⽤ epoll_wait 还会再次触发 EPOLLOUT 导致会⼀直尝试进⾏读完所有数据。这导致如果包体积特别⼤的情况下会占⽤更多内存开销。 原生网络库实现
golang 原生网络库基于epoll et模式开发基本架构如下图所示:
每个 fd 对应⼀个 goroutine业务⽅对 conn 发起主动的读写底层使⽤⾮阻塞 IO当事件未就 绪将 fd 注册(epoll_ctl)进 epoll fd通过把 goroutine 设置(park)成 GWaiting 状态。当有就绪 事件后唤醒(ready) 对应 goroutine 成 GRunnable 状态。⽤⼾⾃⼰的 Goroutine 负责进⾏实际的 read/write syscalGo Net 负责事件监听以及帮助 block/ready ⽤⼾ Goroutine。
golang原生网络库的特点就是:
从⽤⼾视⻆来看 net.Conn 接⼝的函数都是阻塞的即便底层 IO 是⾮阻塞的Read 接⼝能够填充满缓冲区就填充填充不满也会直接返回⻓度 n上层调⽤⽅既可以控制从内核缓冲区中的读取速率也可以控制读取块⼤⼩
type Conn interface {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
Read(b []byte) (n int, err error)// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
Write(b []byte) (n int, err error)
}由于go net采用ET模式所以只会在数据就绪时通知一次用户在自己的线程中调用read api不断读取数据直到返回的n等于0说明数据全部读取完毕了。 golang 原生网络库的优势如下:
创建连接后由调⽤⽅ Goroutine ⾃⾝决定是否要进⾏读写何时进⾏读写以多少 size 进⾏读写。⽤⼾只需关⼼ net.Conn 暴露的同步接⼝使⽤上⾮常⽅便。
但是有利必有弊golang原生网络库的设计会导致如下问题:
1 Goroutine : 1 Connection 模型在连接⾮常多时Goroutine 数量会爆炸从⽽在调度上产⽣较⼤开销进⽽影响到 P99 指标。不⽀持零拷⻉⽤⼾使⽤ conn.Read(b []byte) 后完成⼀次内核缓冲区到⽤⼾缓冲区的复制。拿到 []byte 后再传递给上层进⾏协议解析反序列化时往往还需要再进⾏⼀次拷⻉ 根本性原因还是协议反序列化时拿到的内存和内核缓冲区复制到⽤⼾缓冲区的内存不是同⼀块导致的 net.Conn 是⽤⼾主动调⽤才触发事件监听所以如果⼀个连接对端已经关闭此时需要下⼀次写/读时才能根据返回 error 进⾏判断连接状态。如果该连接⼀直没被调⽤到会⼀直存在占⽤内存。 netpoll 设计思路
Netpoll 主要由两⼤部分构成
EventLoop(Polls)⽤来监听⽂件描述符事件FDOperators⽤来根据不同事件进⾏不同操作
这里官方提供了一幅图画的很好: 整个流程要分为三部分来看: netpoll 初始化: netpoll 启动时会初始化poll manager , 依次初始化池中每个poll对象首先调用EpollCreate api创建一个新的Epoll对象然后将其与当前poll对象绑定同时还会为当前poll对象分配linkbuffer等缓冲区为每个poll对象开启一个协程来不断轮询当前epoll上的可读可写等事件 server 端: 启动后从poll manager中获取一个空闲的poll 将listener fd注册到poll中监听accept事件当accept 到客户端连接后从poll manager中获取一个空闲的poll 将客户端socket fd注册到poll中监听可读事件每个poll会关联一个LinkBuffer对象当监听到客户端连接上的可读事件后从linkbuffer中预定一块内存将数据都读取到这块内存中来包装一个模版任务用于不断轮询处理linkbuffer上剩余可读数据同时每次轮询完后都会回调用户设置好的OnRequest函数就是上图的hanler函数包装的模版任务会被提交到协程池中执行也就是上图中的gopool与内核的系统调⽤交互完全由⽹络库进⾏控制⽤⼾对 Conn 的读写都只是在操作⼀段 Buffer ⽽已 client 端: 启动后建立和server端的连接 , 从opCache对象池中获取一个空闲的FDOperator对象返回然后等待直到client socket可写调用connection提供的相关写api如malloc先分配一块内存用于写数据写完需要发送给server的数据调用flush api进行数据提交flush api会首先尝试将数据写入socket内核缓冲区中如果一次没写完说明socket缓冲区写满了此时会在poll上注册对当前socket fd可写事件监听然后调用waitFlush api阻塞等待writeTrigger通道发送过来的可写通知当poll线程监听到当前socket fd上发生了可写事件的时候会向writeTrigger通道发送消息唤醒等待的客户端 netpoll 对比 go net
netpoll 实现思路和 golang 原生网络库的区别如下:
Go Net 使⽤ Epoll ET Netpoll 使⽤ LT。Netpoll 在⼤包场景下会占⽤更多的内存。Go Net 只有⼀个 Epoll 事件循环因为 ET 模式被唤醒的少且事件循环内⽆需负责读写所以⼲的活少⽽ Netpoll 允许有多个事件循环循环内需要负责读写⼲的活多读写越重越需要开更多 Loops。Go Net ⼀个连接⼀个 GoroutineNetpoll 连接数和 Goroutine 数量没有关系和请求数有⼀定 关系但是有 Gopool 重⽤。Go Net 不⽀持 Zero Copy甚⾄于如果⽤⼾想要实现 BufferdConnection 这类缓存读取还会产 ⽣⼆次拷⻉。Netpoll ⽀持管理⼀个 Buffer 池直接交给⽤⼾且上层⽤⼾可以不使⽤ Read(p []byte) 接⼝⽽使⽤特定零拷⻉读取接⼝对 Buffer 进⾏管理实现零拷⻉能⼒的传递。 ET模式在高并发下调度压力比较大因为 EventLoop 本⾝只是监听事件真正的读写操作都在⽤⼾⾃⼰的 Goroutine 函数中执⾏不由⽹络库控制因此每次 EventLoop监听到事件发生后都需要唤醒对应的线程去读写数据这里存在上下文切换开销。
而 LT 单线程轮询对 cache/计算类业务更友好因为 Cache 的特点是业务逻辑执⾏的⾮常快所以在 readv 完了后可以⽴刻执⾏ handler 同时执⾏write整个过程都不需要进⾏线程调度。对于计算类任务⽽⾔越少协程切换能够让 CPU 尽可能少的做⽆效⼯作。 数据结构
此处只列举核心的几个对象:
server 服务端对象:
type server struct {operator FDOperator // ⽤来根据不同事件进⾏不同操作ln Listener. opts *options // 配置相关回调接口onQuit func(err error) // 退出回调connections sync.Map // 记录当前server上accept得到的活跃客户端连接: keyfd, valueconnection
}封装多路复用器操作的对象:
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {// epoll 监听的 fdFD int// 监听到读写事件后的回调函数OnRead func(p Poll) error // accept 事件回调 OnWrite func(p Poll) error // 客户端 socket 写回调OnHup func(p Poll) error// linkbuffer 与 socket 缓冲区之间的读写APIInputs func(vs [][]byte) (rs [][]byte)InputAck func(n int) (err error)Outputs func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)OutputAck func(n int) (err error)// epoll 对象poll Poll...
}封装多路复用器的对象
type Poll interface {Wait() errorClose() errorTrigger() errorControl(operator *FDOperator, event PollEvent) errorAlloc() (operator *FDOperator)Free(operator *FDOperator)
}type defaultPoll struct {pollArgsfd int // 监听的fdwop *FDOperator // eventfd(轻量级进程通信机制), wake epoll_wait -- 用于唤醒wait上阻塞的线程...Handler func(events []epollevent) (closed bool) // 发生感兴趣事件时回调该接口处理这些事件
}type pollArgs struct {...events []epollevent // 发送/接收感兴趣事件barriers []barrier。 // 用于实现分散读/集中写的向量缓冲区
}type epollevent struct {events uint32 // 事件位图_ int32data [8]byte // 注册感兴趣事件时可以携带用户数据的指针
}源码解析
多路复用池初始化
netpoll使用pollmanager维护着一组epoll对象池以此来实现对象复用每次有客户端新连接被Accept时都会从epoll池中按照对应的负载均衡策略pick出一个空闲的epoll对象来监听客户端连接上后续的读写事件。
netpoll多路复用池初始化的流程图如下所示: 具体源码如下:
// poll_manager.go
var pollmanager *manager // 多路复用器池子管理器func init() {var loops runtime.GOMAXPROCS(0)/20 1pollmanager manager{}// 设置负载均衡器默认采用轮询策略从epoll池中挑选空闲epollpollmanager.SetLoadBalance(RoundRobin)// 设置epoll池的大小同时会初始化池中的epoll对象pollmanager.SetNumLoops(loops)...
}golang 程序启动时会去自动调用每个go文件的init方法所以pollmanager会在程序启动时被初始化。
真正初始化epoll池中epoll对象的逻辑是在设置eventLoopNum时完成的:
// poll_manager.go
func (m *manager) SetNumLoops(numLoops int) error {..// netpoll支持运行时动态调整epoll池大小,所以此处存在该分支// 如果我们打算缩小epoll池大小,则进入下面这个分支if numLoops m.NumLoops {// 创建一个新的epoll池var polls make([]Poll, numLoops)for idx : 0; idx m.NumLoops; idx {// 对于无需缩减的部分直接重新指向即可if idx numLoops {polls[idx] m.polls[idx]} else {// 对于需要缩减的部分直接Close关闭该多路复用器if err : m.polls[idx].Close(); err ! nil {logger.Printf(NETPOLL: poller close failed: %v\n, err)}}}// 更新多路复用池管理器的相关状态m.NumLoops numLoopsm.polls pollsm.balance.Rebalance(m.polls)// 如果是动态缩容缩容完毕后直接返回return nil}// 进入初始化或者扩容逻辑m.NumLoops numLoopsreturn m.Run()
}从上面代码可以看出netpoll支持在运行时动态调整池子的大小下面我们看看初始化和扩容逻辑是如何完成的:
// poll_manager.go
// 扩容或者初始化epoll池
func (m *manager) Run() (err error) {defer func() {if err ! nil {_ m.Close()}}()// 如果是初始化epoll池此处的polls大小应该为0// 如果时扩容逻辑此处的polls大小为当前池中已有的多路复用器个数for idx : len(m.polls); idx m.NumLoops; idx {var poll Poll// 创建一个新的多路复用器poll, err openPoll()if err ! nil {return}// 新创建的多路复用器追加到polls集合 m.polls append(m.polls, poll)// 每个多路复用器绑定一个协程不断轮询注册到该epoll上的fd事件go poll.Wait()}// 更新多路复用池管理器的相关状态m.balance.Rebalance(m.polls)return nil
}初始化epoll池时首先是将池中每个epoll对象创建出来:
// poll_default_linux.go// 打开多路复用器
func openPoll() (Poll, error) {return openDefaultPoll()
}func openDefaultPoll() (*defaultPoll, error) {var poll new(defaultPoll) poll.buf make([]byte, 8)// 创建Epoll对象var p, err EpollCreate(0)...// 保存epoll的fdpoll.fd p// eventfd是一种进程/线程通信的机制他类似信号不过eventfd只是一种通知机制// 无法承载数据eventfd承载的数据是8个字节他的好处是简单并且只消耗一个fd// 进程间通信机制: https://zhuanlan.zhihu.com/p/383395277var r0, _, e0 syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)...// TODO: 这几个回调接口干啥的 ?poll.Reset poll.reset// 处理当前epoll fd上所发生的感兴趣的事件poll.Handler poll.handler// eventFd 通信机制poll.wop FDOperator{FD: int(r0)}// 在epoll上注册并监听eventFd的可读事件 -- 监听r0上的可读事件if err poll.Control(poll.wop, PollReadable); err ! nil {_ syscall.Close(poll.wop.FD)_ syscall.Close(poll.fd)return nil, err}// 初始化FDOperator缓存poll.opcache newOperatorCache()return poll, nil
}这里使用defaultPoll保存多路复用器上下文信息同时还为每个多路复用器创建出了一个eventFD用于实现进程间通信同时在当前epoll上注册监听eventFD的可读事件。
此处使用eventFD是为了epoll池关闭的时候通知那些阻塞在epoll_wait系统调用上的线程可以醒过来然后结束自己。
当创建出来多路复用器后下一步便是将其加入epoll池中最后为每个多路复用器绑定一个协程然后不断轮询注册到该epoll上的fd事件:
// poll_default_linux.go
func (p *defaultPoll) Wait() (err error) {// initvar caps, msec, n barriercap, -1, 0p.Reset(128, caps)// waitfor {if n p.size p.size 128*1024 {p.Reset(p.size1, caps)}// p.fd 就是 epoll fd// events 就是挂载到epoll tree上的epoll item// mesc 用于指定阻塞时间是永久阻塞还是阻塞一段时间还是非阻塞IO// 等待当前epoll上发生感兴趣的事件n, err EpollWait(p.fd, p.events, msec)if err ! nil err ! syscall.EINTR {return err}// 如果没有发生感兴趣的事件则将msec设置为-1表示下一次采用永久阻塞策略来等待感兴趣的事件发生// 然后调用Gosched完成协程调度if n 0 {msec -1runtime.Gosched()continue}msec 0// 处理感兴趣的事件if p.Handler(p.events[:n]) {return nil}// we can make sure that there is no op remaining if Handler finishedp.opcache.free()}
}defaultPoll的Handler回调接口是在openDefaultPoll函数中被赋值的实际调用的是poll_default_linux.go文件中的handler函数:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {var triggerRead, triggerWrite, triggerHup, triggerError boolvar err error// 遍历所有感兴趣的事件for i : range events {// epollevent.data保存的是与之关联的FDOperator对象operator : p.getOperator(0, unsafe.Pointer(events[i].data))if operator nil || !operator.do() {continue}var totalRead int// 判断当前发生了什么事件evt : events[i].eventstriggerRead evtsyscall.EPOLLIN ! 0triggerWrite evtsyscall.EPOLLOUT ! 0triggerHup evt(syscall.EPOLLHUP|syscall.EPOLLRDHUP) ! 0triggerError evtsyscall.EPOLLERR ! 0// trigger or exit gracefully// 是否是eventFD可读事件发生了if operator.FD p.wop.FD {// must clean trigger first// 从eventFD中读取数据到buf中syscall.Read(p.wop.FD, p.buf)atomic.StoreUint32(p.trigger, 0)// if closed exit// 说明接收到了关闭信号那么就关闭当前epollif p.buf[0] 0 {// 关闭eventFDsyscall.Close(p.wop.FD)// 关闭epoll fdsyscall.Close(p.fd)operator.done()return true}operator.done()continue}// 发生了可读事件if triggerRead {// 如果FDOperator上的OnRead回调接口不为空说明发生的是客户端的accept事件if operator.OnRead ! nil {// 调用OnRead来接收并处理客户端连接operator.OnRead(p)// 否则说明发生的是某个客户端连接上的可读事件 } else if operator.Inputs ! nil {// 每个poll对象会关联一个barriers结构该结构用于实现分散读取与集中写入的系统调用// 每个poll对象还会关联一个LinkBuffer对象作为读写数据缓冲区// 此处是从LinkBuffer中分配出一块空闲内存var bs operator.Inputs(p.barriers[i].bs)if len(bs) 0 {// 读取数据到bs缓存区中var n, err ioread(operator.FD, bs, p.barriers[i].ivs)// 推动读指针让写入缓冲区的数据对消费者可见同时调用用户注册的OnRequest回调接口处理读数据operator.InputAck(n)...}} else {logger.Printf(NETPOLL: operator has critical problem! event%d operator%v, evt, operator)}}// 其他感兴趣事件的触发此处暂时不展开...
}Epoll相关API
本节介绍一下netpoll为Linux下的epoll系统调用封装的API接口:
Linux 底层的epoll系统调用由红黑树实现netpoll 给红黑树上每个节点都关联一个epollevent类型该类型由一个事件位图和用户数据指针组成:
// sys_epoll_linux_arm64.go
type epollevent struct {events uint32 // events表示要监听的事件类型如可读、可写等。这是一个位掩码可以设置多个事件类型例如 EPOLLIN 表示可读事件EPOLLOUT 表示可写事件。_ int32data [8]byte // 可以携带用户数据。这里的用户数据通常是一个指针指向与文件描述符关联的对象或其他相关数据。
}netpoll 还提供了对epoll对象创建感兴趣事件监听等待感兴趣事件发生等操作的API封装:
创建epoll对象
func EpollCreate(flag int) (fd int, err error) {var r0 uintptr// 执行epoll_create系统调用r0, _, err syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)if err syscall.Errno(0) {err nil}return int(r0), err
}注册感兴趣事件
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {_, _, err syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)if err syscall.Errno(0) {err nil}return err
}等待感兴趣事件发生
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {var r0 uintptrvar _p0 unsafe.Pointer(events[0])if msec 0 {r0, _, err syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)} else {r0, _, err syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)}if err syscall.Errno(0) {err nil}return int(r0), err
}关于注册感兴趣的事件netpoll在此基础之上又封装了一层
// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {var op int// TODO: evt.data operatorvar evt epollevent// 将epollevent对象的data指针指向传入的FDOperator对象p.setOperator(unsafe.Pointer(evt.data), operator)// 根据监听的事件类型,更新事件位图switch event {case PollReadable: // server accept a new connection and wait readoperator.inuse()op, evt.events syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollWritable: // client create a new connection and wait connect finishedoperator.inuse()op, evt.events syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollDetach: // deregisterp.delOperator(operator)op, evt.events syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollR2RW: // connection wait read/writeop, evt.events syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollRW2R: // connection wait readop, evt.events syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR}// 完成监听事件信息注册return EpollCtl(p.fd, op, operator.FD, evt)
}从Control函数中可以看出来netpoll会在epollevent的data字段中保存监听的fd对象信息这里fd对象是netpoll经过封装后的FDOperator对象。
FDOperator对象中又保存了对当前fd对象的读写API封装。 可读事件处理
可读事件有两类一种是accept事件另一种是readable事件: accept事件针对的是server端 下面给出整个读事件处理的流程图大家可以时不时回看本图: server启动
服务提供方server在启动时会创建一个新的server端套接字然后在该套接字上打开并监听对应的端口随后向poll manager获取一个空闲poller对象 并在该对象上监听server端套接字的可读事件这里实际是客户端的accept事件:
netpoll server 方代码模版写法如下:
// 1. OnRequest: 有可读数据时回调该接口
var OnRequest func(ctx context.Context, connection netpoll.Connection) error { return nil }// 2. OnPrepare: 客户端连接建立完毕后,回调该接口
var OnPrepare func(connection netpoll.Connection) context.Context { return nil }func main() {// 1. 建立连接listen, err : net.Listen(tcp, :1234)if err ! nil {return}// 2. 创建eventLoopeventLoop, _ : netpoll.NewEventLoop(OnRequest, netpoll.WithOnPrepare(OnPrepare), netpoll.WithReadTimeout(time.Second))// 3. 启动服务eventLoop.Serve(listen)
}eventLoop的Serve方法会创建一个新的Server对象并启动netpoll服务端:
// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {// 将原生的listener对象转换为netpoll包装后的Listener对象npln, err : ConvertListener(ln)if err ! nil {return err}evl.Lock()// 创建新的server对象evl.svr newServer(npln,// opts对象保存了相关事件回调evl.opts,// 退出事件回调监听函数evl.quit)// 启动回调evl.svr.Run()evl.Unlock()// 监听到停止信号后从此处返回err evl.waitQuit()// ensure evl will not be finalized until Serve returnsruntime.SetFinalizer(evl, nil)return err
}创建完server对象后会调用server对象的Run方法启动服务:
// Run this server.
func (s *server) Run() (err error) {// 当前FDOperator对象封装的是server socket套接字对象s.operator FDOperator{FD: s.ln.Fd(), // 服务端Socket监听器OnRead: s.OnRead, // 可读事件发生OnHup: s.OnHup, // 挂断事件发生}// 挑选一个空闲的多路复用器s.operator.poll pollmanager.Pick()// 监听服务端套接字上的可读事件err s.operator.Control(PollReadable)if err ! nil {// 错误退出时,回调该方法s.onQuit(err)}return err
}netpoll 在对server socket执行事件注册时会设置FDOperator的OnRead接口用于处理服务端套接字上的可读事件。
netpoll 也是通过FDOperator的OnRead接口是否为nil来判断当前发生的事件是accept还是readable事件。 只有server端启动时才会对服务端套接字设置OnRead回调接口client端是不会设置的。 accept 事件
在defaultPoll的handler函数中我们暂时只关心读事件是如何被处理的而关于可读事件本节我们来看看客户端accept事件是如何处理的:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {var triggerRead, triggerWrite, triggerHup, triggerError boolvar err error// 遍历所有感兴趣的事件for i : range events {// epollevent.data保存的是与之关联的FDOperator对象operator : p.getOperator(0, unsafe.Pointer(events[i].data))...var totalRead int// 判断当前发生了什么事件evt : events[i].eventstriggerRead evtsyscall.EPOLLIN ! 0triggerWrite evtsyscall.EPOLLOUT ! 0triggerHup evt(syscall.EPOLLHUP|syscall.EPOLLRDHUP) ! 0triggerError evtsyscall.EPOLLERR ! 0...// 发生了可读事件if triggerRead {// 如果FDOperator上的OnRead回调接口不为空说明发生的是客户端的accept事件if operator.OnRead ! nil {// 调用OnRead来接收并处理客户端连接operator.OnRead(p)// 否则说明发生的是某个客户端连接上的可读事件 } else if operator.Inputs ! nil {// 每个poll对象会关联一个barriers结构该结构用于实现分散读取与集中写入的系统调用// 每个poll对象还会关联一个LinkBuffer对象作为读写数据缓冲区// 此处是从LinkBuffer中分配出一块空闲内存var bs operator.Inputs(p.barriers[i].bs)if len(bs) 0 {// 读取数据到bs缓存区中var n, err ioread(operator.FD, bs, p.barriers[i].ivs)// 推动读指针让写入缓冲区的数据对消费者可见同时调用用户注册的OnRequest回调接口处理读数据operator.InputAck(n)...}} else {logger.Printf(NETPOLL: operator has critical problem! event%d operator%v, evt, operator)}}// 其他感兴趣事件的触发此处暂时不展开...
}回顾上面给出的handler函数可知netpoll会依次遍历感兴趣的事件集合中每个事件然后获取与当前事件绑定的FDOperator对象首先判断当前发生的是否死可读事件再根据FDOperator的OnRead接口是否为空来判断发生的是accept事件还是readable事件。
在server启动一节我们已经知道了如果FDOperator的OnRead接口不为空那么说明发生的是客户端的accept事件此时会调用FDOperator的OnRead回调来处理客户端的连接事件此处实际调用的是server的OnRead的方法;
// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {// 获取客户端连接conn, err : s.ln.Accept()...// 包装一下原生的conn连接var connection connection{}// 初始化一下连接connection.init(conn.(Conn),// 初始化完毕后,回调用户注册进来的prepare接口s.opts)// 连接不活跃,直接返回if !connection.IsActive() {return nil}// 返回客户端连接套接字对应的文件描述符var fd conn.(Conn).Fd()// 添加关闭回调接口 --- netpoll回调接口这里采用的是回调链的形式,可以添加多个回调接口connection.AddCloseCallback(func(connection Connection) error {// 当前连接关闭时,将自己从server连接集合中移除s.connections.Delete(fd)return nil})// 在server对象中保存 fd , 已打开连接 s.connections.Store(fd, connection)// 调用连接建立接口connection.onConnect()return nil
}处理客户端accept事件的过程主要分为三步:
获取原生conn连接对象对其进行包装然后为当前连接初始化相关数据结构和回调接口从poller池中挑选出一个poll对象与当前连接进行绑定并在该poll上注册对当前连接可读事件的监听为当前连接包装一个任务对象然后丢入协程池中之行该任务负责死循环轮询发现可读数据立马回调用户提供的OnRequest接口进行处理 客户端连接初始化
server.OnRead函数中调用的connection.init函数主要是用来为当前连接初始化相关数据结构回调接口以及在poll上注册对当前connection可读事件的监听
// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {// init buffer, barrier, finalizerc.readTrigger make(chan error, 1)c.writeTrigger make(chan error, 1)// 初始化LinkBuffer相关数据结构c.bookSize, c.maxSize pagesize, pagesizec.inputBuffer, c.outputBuffer NewLinkBuffer(pagesize), NewLinkBuffer()c.outputBarrier barrierPool.Get().(*barrier) // 用于聚集读写的缓冲区// 初始化c.initNetFD(conn) // 确保conn是被netpoll包装后的netFD类型c.initFDOperator() // 初始化FDOperatorc.initFinalizer() // 添加close回调函数// 将客户端连接套接字设置为非阻塞模式syscall.SetNonblock(c.fd, true)// enable TCP_NODELAY by defaultswitch c.network {case tcp, tcp4, tcp6:// 禁用 Nagle 算法setTCPNoDelay(c.fd, true)}// 启用零拷贝传输的 TCP Socket 选项 和 阻塞超时时间if setZeroCopy(c.fd) nil setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) nil {c.supportZeroCopy true}// connection initialized and prepare options// 设置相关回调接口,在poll上注册对当前connection可读事件的监听return c.onPrepare(opts)
}netpoll 里面为原生的ListenerConnectionEpollFd等对象都进行了一层自己的封装initNetFD函数便是对原生客户端套接字文件描述符的封装:
func (c *connection) initNetFD(conn Conn) {if nfd, ok : conn.(*netFD); ok {c.netFD *nfdreturn}c.netFD netFD{fd: conn.Fd(),localAddr: conn.LocalAddr(),remoteAddr: conn.RemoteAddr(),}
}FDOperator 是对需要注册在epoll上进行监听的fd的封装其是netpoll中的一个核心对象内部持有被监听的fd和poll对象同时对外提供fd数据读写回调接口 , 当fd上发生可读可写事件时便会回调FDOperator上注册好的回调接口进行处理:
func (c *connection) initFDOperator() {// 通过负载均衡器挑选一个可用的pollpoll : pollmanager.Pick()// 从opcache中分配一个可用的poll对象op : poll.Alloc()// 拿到当前客户端连接对应的socket文件描述符op.FD c.fd// 回调接口初始化 -- 注意OnRead回调被设置为了nilop.OnRead, op.OnWrite, op.OnHup nil, nil, c.onHupop.Inputs, op.InputAck c.inputs, c.inputAckop.Outputs, op.OutputAck c.outputs, c.outputAckc.operator op
}connection 的 onPrepare 函数主要用来将用户提供的相关回调接口设置到当前connection对象上以及相关读写超时参数等如果用户提供了OnPrepare接口此处会进行回调通知。
该函数最后会在当前poll上注册对当前客户端connection的读事件监听
// onPrepare supports close connection, but not read/write data.
// connection will be registered by this call after preparing.
func (c *connection) onPrepare(opts *options) (err error) {if opts ! nil {// 将用户通过options设置的回调接口都赋值给当前accept得到的客户端连接c.SetOnConnect(opts.onConnect)c.SetOnRequest(opts.onRequest)c.SetReadTimeout(opts.readTimeout)c.SetWriteTimeout(opts.writeTimeout)c.SetIdleTimeout(opts.idleTimeout)// calling prepare first and then register.// 如果我们指定了onPrepare回调此处会执行回调if opts.onPrepare ! nil {c.ctx opts.onPrepare(c)}}// 初始化连接上下文if c.ctx nil {c.ctx context.Background()}// prepare may close the connection.if c.IsActive() {// 在当前poll上注册对当前客户端connection的读事件监听return c.register()}return nil
}connection的register函数负责在poll上注册对当前客户端connection的读事件监听:
// register only use for connection register into poll.
func (c *connection) register() (err error) {err c.operator.Control(PollReadable)... return nil
}客户端连接建立
当accept得到的客户端连接初始化完毕后会调用onConnect函数对客户端连接进行任务包装然后提交到协程池执行任务:
// onConnect is responsible for executing onRequest if there is new data coming after onConnect callback finished.
func (c *connection) onConnect() {// 获取用户设置的OnConnect回调和OnRequest回调接口 --- 如果没有设置OnConnect回调此处直接返回var onConnect, _ c.onConnectCallback.Load().(OnConnect)if onConnect nil {return}var onRequest, _ c.onRequestCallback.Load().(OnRequest)var connected int32c.onProcess(// 第一个回调函数用于判断当前是否连接此刻是否可被处理func(c *connection) bool {// 在当前客户端连接初始化完毕后,会在onConnect函数中回调一次客户端提供的OnConnect接口// 此处通过标记确保只会调用一次OnConnect函数if atomic.LoadInt32(connected) 0 {return true}// check for onRequestreturn onRequest ! nil // 存在可读数据c.Reader().Len() 0},// 第二个回调函数会在第一个回调函数返回true的前提下,进行处理func(c *connection) {// 回调OnConnect函数if atomic.CompareAndSwapInt32(connected, 0, 1) {c.ctx onConnect(c.ctx, c)return}// 处理可读数据,回调用户提供的回调函数if onRequest ! nil {_ onRequest(c.ctx, c)}},)
}onProcess 函数内部负责实现一套模版方法用于不断轮询连接状态如果可处理则调用执行处理直到接收到停止信号或连接不可处理时才会退出循环:
// onProcess is responsible for executing the process function serially,
// and make sure the connection has been closed correctly if user call c.Close() in process function.
func (c *connection) onProcess(isProcessable func(c *connection) bool, process func(c *connection)) (processed bool) {...// 准备任务var task func() {// 如果当前任务可执行,确保至少被执行过一次if isProcessable(c) {process(c)}// 死循环处理任务,直到接收到关闭信号或者任务不再可处理var closedBy whofor {closedBy c.status(closing)// close by user or no processableif closedBy user || !isProcessable(c) {break}process(c)}...return}// 异步跑这个任务 --- gopool.CtxGo 字节开源的协程池runTask(c.ctx, task)return true
}但是这里要注意的是如果连接上一段时间都没有可读数据那么与当前连接绑定的协程在发现无数据可读时会退出返回也就是说当前协程就与当前连接解绑并重新放回了协程池中。
大家要注意此处netpoll的实现思路
连接初始化完毕的最后会调用onConnect函数该函数主要作用是调用用户设置好的onConnect回调通知用户连接已经建立完毕了而还需要OnRequest回调只是为了顺道检查是否有可读数据准备就绪如果准备就绪了那么就顺道处理一波。onProcess函数主要做的事情就是不断轮询处理当前连接上的可读数据直到接送到停止信号或者当前连接此刻没有可读数据了则结束轮询释放当前协程。
netpoll 通过一个单独的协程来监听fd上的可读可写事件当监听到可读可写事件时不是在当前协程内进行同步处理而是将可读可写事件包装为一个任务然后从协程池中取出一个空闲协程进行处理这是典型的Reactor模式实现思路。 可读事件
当netpoll accept到一个连接后会从poller池中挑选一个空闲poll然后在当前poll上执行对当前conn可读事件的监听。
后续当conn上发生可读事件时便会被与该conn绑定的poll感知到然后通过判断FDOperator的OnRead接口为nil知道当前发生的是可读事件而非accept事件。
此时我们再来回看defaultPoll的handler看看当发生可读事件时netpoll是如何处理的:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {var triggerRead, triggerWrite, triggerHup, triggerError boolvar err error// 遍历所有感兴趣的事件for i : range events {// epollevent.data保存的是与之关联的FDOperator对象operator : p.getOperator(0, unsafe.Pointer(events[i].data))...var totalRead int// 判断当前发生了什么事件evt : events[i].eventstriggerRead evtsyscall.EPOLLIN ! 0triggerWrite evtsyscall.EPOLLOUT ! 0triggerHup evt(syscall.EPOLLHUP|syscall.EPOLLRDHUP) ! 0triggerError evtsyscall.EPOLLERR ! 0...// 发生了可读事件if triggerRead {// 如果FDOperator上的OnRead回调接口不为空说明发生的是客户端的accept事件if operator.OnRead ! nil {// 调用OnRead来接收并处理客户端连接operator.OnRead(p)// 否则说明发生的是某个客户端连接上的可读事件 } else if operator.Inputs ! nil {// 每个poll对象会关联一个barriers结构该结构用于实现分散读取与集中写入的系统调用// 每个poll对象还会关联一个LinkBuffer对象作为读写数据缓冲区// 此处是从LinkBuffer中分配出一块空闲内存var bs operator.Inputs(p.barriers[i].bs)if len(bs) 0 {// 读取数据到bs缓存区中var n, err ioread(operator.FD, bs, p.barriers[i].ivs)// 推动读指针让写入缓冲区的数据对消费者可见同时调用用户注册的OnRequest回调接口处理读数据operator.InputAck(n)...}} else {logger.Printf(NETPOLL: operator has critical problem! event%d operator%v, evt, operator)}}// 其他感兴趣事件的触发此处暂时不展开...
}关于LinkBuffer的源码解析本文就不过多展开了感兴趣的小伙伴可以阅读我之前写的这篇文章:
字节开源的netPoll底层LinkBuffer设计与实现
FDOperator的Inputs和InputAck回调接口都是在客户端连接初始化时在initFDOperator方法中被设置的:
func (c *connection) initFDOperator() {...op.Inputs, op.InputAck c.inputs, c.inputAckop.Outputs, op.OutputAck c.outputs, c.outputAck...
}connection的inputs函数就是调用linkbuffer提供的book方法预定一块内存用于接收socket缓冲区中的可读数据:
// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {vs[0] c.inputBuffer.book(c.bookSize, c.maxSize)return vs[:1]
}inputAck则复杂一些首先会调用linkbuffer的bookAck函数完成预留内存的提交这样已经从socket缓冲区写入linkbuffer的数据就对用户可见了:
// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {...// 提交预留内存,提交后,用户便可以读取这部分内存数据了length, _ : c.inputBuffer.bookAck(n)...var needTrigger true// 从协程池中取出一个空闲协程来处理当前连接上的可读数据if length n { // first start onRequestneedTrigger c.onRequest() // 返回值表示是否读取完毕了所有需要的数据, 如果返回false说明读完了否则说明没有读完}// 单开协程处理客户端连接上的可读数据时,可能在回调用户OnRequest接口时,调用读数据接口从而阻塞等待数据准备就绪// 此处当数据就绪时会唤醒对应的协程if needTrigger length int(atomic.LoadInt64(c.waitReadSize)) {c.triggerRead(nil)}return nil
}此处调用connection的onRequest方法并非直接就是调用的用户提供的回调接口而是和OnConnect方法一样创建一个读数据任务去处理当前连接上的可读数据
// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool) {// 加载用户设置的回调接口var onRequest, ok c.onRequestCallback.Load().(OnRequest)if !ok {return true}// 处理请求processed : c.onProcess(// 第一个回调函数用于判断当前连接是否活跃并且还有未读取数据func(c *connection) bool {return c.Reader().Len() 0},// 第二个回调才是真正将请求交给用户回调来处理func(c *connection) {_ onRequest(c.ctx, c)},)// if not processed, should trigger readreturn !processed
}connection的onProcess方法上文已经说过就是从协程池中捞取一个空闲协程来处理当前连接上的可读数据
如果当前连接上一直有数据可读便会一直处理如果当前协程上没有数据可读了协程便会被释放重新返回池中。 等待读取数据
上文说到当poll线程监听到可读可写数据的时候会单开一个线程去处理当前连接上的可读可写数据如果此时发生的是可读事件那么最终会回调到用户提供的OnRequest接口。
而用户可以在OnRequest接口中去调用connection相关读API去读取数据:
// ReadString implements Connection.
func (c *connection) ReadString(n int) (s string, err error) {if err c.waitRead(n); err ! nil {return s, err}return c.inputBuffer.ReadString(n)
}// ReadBinary implements Connection.
func (c *connection) ReadBinary(n int) (p []byte, err error) {if err c.waitRead(n); err ! nil {return p, err}return c.inputBuffer.ReadBinary(n)
}// Next implements Connection.
func (c *connection) Next(n int) (p []byte, err error) {if err c.waitRead(n); err ! nil {return p, err}return c.inputBuffer.Next(n)
}这些读API在方法开头都会调用waitRead等待所读数据量就绪后或者读超时后才会进行数据读取或者超时返回:
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {// 如果当前可读数据大于需要的了,直接返回,无需等待if n c.inputBuffer.Len() {return nil}// 存储自己希望读取的数据量atomic.StoreInt64(c.waitReadSize, int64(n))// 返回时,清空变量defer atomic.StoreInt64(c.waitReadSize, 0)// 如果设置了读超时属性,就有限期等待,直到数据就绪if c.readTimeout 0 {return c.waitReadWithTimeout(n)}// wait full n// 否则无限期等待,直到数据准备就绪for c.inputBuffer.Len() n {switch c.status(closing) {case poller:return Exception(ErrEOF, wait read)case user:return Exception(ErrConnClosed, wait read)default:// 等待接收数据就绪的信号err -c.readTriggerif err ! nil {return err}}}return nil
}// waitReadWithTimeout will wait full n bytes or until timeout.
// 有限期等待
func (c *connection) waitReadWithTimeout(n int) (err error) {// set read timeoutif c.readTimer nil {c.readTimer time.NewTimer(c.readTimeout)} else {c.readTimer.Reset(c.readTimeout)}for c.inputBuffer.Len() n {switch c.status(closing) {case poller:// cannot return directly, stop timer first!err Exception(ErrEOF, wait read)goto RETcase user:// cannot return directly, stop timer first!err Exception(ErrConnClosed, wait read)goto RETdefault:select {case -c.readTimer.C:// double check if there is enough data to be readif c.inputBuffer.Len() n {return nil}return Exception(ErrReadTimeout, c.remoteAddr.String())case err -c.readTrigger:if err ! nil {return err}continue}}}
RET:// clean timer.Cif !c.readTimer.Stop() {-c.readTimer.C}return err
}可写事件处理
可写事件有两类一种是client端socket套接字可写事件另一种是server端socket套接字可写事件: 注意区分server socket和socket套接字的区别 , 前者是server端启动绑定并监听的套接字用于accept客户端连接后者是accept得到的客户端socket连接套接字 和 客户端connect 服务端成功后得到的 socket套接字。 下面还是给出一幅写数据流程图: 客户端启动
客户端代码典型写法如下:
func main() {// 1. 建立连接dialer : netpoll.NewDialer()conn, _ : dialer.DialConnection(tcp, :1234, time.Second)var reader, writer conn.Reader(), conn.Writer()// 2. 写数据write_data : []byte(hello world)alloc, _ : writer.Malloc(len(write_data))copy(alloc, write_data) // write datawriter.Flush()// 3. 读数据buf, _ : reader.Next(reader.Len())fmt.Println(服务端响应的数据: string(buf))reader.Release()
}我们下面来看一下客户端启动过程:
建立连接
func (d *dialer) DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) {... switch network {case tcp, tcp4, tcp6:// 走tcp连接return d.dialTCP(ctx, network, address)...}
}func (d *dialer) dialTCP(ctx context.Context, network, address string) (connection *TCPConnection, err error) {...connection, err DialTCP(ctx, tcp, nil, tcpAddr)...return nil, firstErr
}func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error) {...sd : sysDialer{network: network, address: raddr.String()}c, err : sd.dialTCP(ctx, laddr, raddr)...return c, nil
}连接建立成功后返回对应的客户端连接socket
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConnection, error) {conn, err : internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, dial)...return newTCPConnection(conn)
}func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (conn *netFD, err error) {... return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)
}func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (netfd *netFD, err error) {// syscall.Socket set socket optionsvar fd int// 创建客户端socket对象,同时设置为非阻塞模式fd, err sysSocket(family, sotype, proto)if err ! nil {return nil, err}...// 包装客户端socket fd 为netFDnetfd newNetFD(fd, family, sotype, net)// 建立与server的连接err netfd.dial(ctx, laddr, raddr)...return netfd, nil
}func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) {...// 连接serverif crsa, err c.connect(ctx, lsa, rsa); err ! nil {return err}...return nil
}通过connect系统调用真正完成连接建立然后为当前client socket创建一个FDOperator
func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) {// 系统调用connect,连接serversyscall.Connect(c.fd, ra)...// 为当前client socket创建一个新的FDOperatorc.pd newPollDesc(c.fd)for {// 等待直到当前client socket可写为止if err : c.pd.WaitWrite(ctx); err ! nil {return nil, err}switch err : syscall.Errno(nerr); err {...// 如果没有错误发生,直接返回Sockaddrcase syscall.Errno(0):// The runtime poller can wake us up spuriously;// see issues 14548 and 19289. Check that we are// really connected; if not, wait again.if rsa, err : syscall.Getpeername(c.fd); err nil {return rsa, nil}...}}
}为当前client socket创建一个FDOperator同时设置其OnWrite回调接口
func newPollDesc(fd int) *pollDesc {pd : pollDesc{}poll : pollmanager.Pick()pd.operator FDOperator{poll: poll,FD: fd,OnWrite: pd.onwrite, // 设置OnWrite回调接口OnHup: pd.onhup,}pd.writeTrigger make(chan struct{})pd.closeTrigger make(chan struct{})return pd
}注册可写事件同时等待直到client socket可写
// WaitWrite .
func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) {if pd.operator.isUnused() {// add ET|Write|Hup// 在当前连接绑定的poll上注册等待可写事件if err pd.operator.Control(PollWritable); err ! nil {logger.Printf(NETPOLL: pollDesc register operator failed: %v, err)return err}}// 等待直到接收到终止信号或者可写事件(当前client socket缓冲区为空) select {case -pd.closeTrigger: // triggered by poller// no need to detach, since poller has done it in OnHup.return Exception(ErrConnClosed, by peer)case -pd.writeTrigger: // triggered by pollererr nilcase -ctx.Done(): // triggered by ctxpd.detach()pd.operator.unused()err mapErr(ctx.Err())}// double check close trigger// 如果没有接收到停止信号,此处就直接返回select {case -pd.closeTrigger:return Exception(ErrConnClosed, by peer)default:return err}
}写数据
当我们需要写数据时通常都会先调用connection的malloc方法分配一块写缓冲区:
func (c *connection) Malloc(n int) (buf []byte, err error) {return c.outputBuffer.Malloc(n)
}然后调用connection的flush方法刷新写缓冲区中的数据:
func (c *connection) Flush() error {...// 刷新写缓冲区中的数据,让其对外可见 c.outputBuffer.Flush()// 将数据写入内核socket缓冲区return c.flush()
}最终调用connection的flush方法,将linkbuffer中的数据写入socket内核缓冲区中:
func (c *connection) flush() error {...// netpoll采用聚集写,所以第一步是将写缓冲区中的数据都读取到写缓冲区向量数组中var bs c.outputBuffer.GetBytes(c.outputBarrier.bs)// 将数据都写入内核socket缓冲区中var n, err sendmsg(c.fd, bs, c.outputBarrier.ivs, false c.supportZeroCopy)...// 如果写入了部分数据,则释放掉这部分内存中间if n 0 {err c.outputBuffer.Skip(n)c.outputBuffer.Release()if err ! nil {return Exception(err, when flush)}}// 如果所有数据都已经成功写入内核socket缓冲区中,则直接返回if c.outputBuffer.IsEmpty() {return nil}// 可能是因为socket缓冲区满了导致还有一部分数据没写完// 此处注册对可写事件的监听err c.operator.Control(PollR2RW)...// 等待直到可写才会返回return c.waitFlush()
}如果socket内核缓冲区被写满了则进行等待具体是进行无限期等待还是有限期等待取决于我们是否设置了写超时时间:
func (c *connection) waitFlush() (err error) {// 如果我们没有设置写超时事件则进行无限期等待if c.writeTimeout 0 {select {case err -c.writeTrigger:}return err}// 如果我们设置了写超时事件则执行有限期等待if c.writeTimer nil {c.writeTimer time.NewTimer(c.writeTimeout)} else {c.writeTimer.Reset(c.writeTimeout)}select {case err -c.writeTrigger:if !c.writeTimer.Stop() { // clean timer-c.writeTimer.C}return errcase -c.writeTimer.C:select {// try fetch writeTrigger if both cases firescase err -c.writeTrigger:return errdefault:}// if timeout, remove write event from poller// we cannot flush it again, since we dont if the poller is still process outputBufferc.operator.Control(PollRW2R)return Exception(ErrWriteTimeout, c.remoteAddr.String())}
}可写事件
可写事件分为两类一类是客户端socket可写一类是服务端socket可写本节我们来分别看看这两类可写事件都是如何处理的:
// 当感兴趣事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {var triggerRead, triggerWrite, triggerHup, triggerError boolvar err error// 遍历所有感兴趣的事件for i : range events {...// 触发写事件if triggerWrite {// 处理client socket可写事件if operator.OnWrite ! nil {operator.OnWrite(p)} else if operator.Outputs ! nil {// 处理服务端socket可写事件var bs, supportZeroCopy operator.Outputs(p.barriers[i].bs)if len(bs) 0 {// TODO: Let the upper layer pass in whether to use ZeroCopy.var n, err iosend(operator.FD, bs, p.barriers[i].ivs, false supportZeroCopy)operator.OutputAck(n)if err ! nil {p.appendHup(operator)continue}}} else {logger.Printf(NETPOLL: operator has critical problem! event%d operator%v, evt, operator)}}operator.done()}...
}客户端socket可写事件
当客户端socket可写事件发生时也就是客户端socket内核缓冲区有空闲空间可写时会调用FDOperator的onwrite回调方法进行处理。
onwrite回调中会向writeTrigger通道写入消息唤醒阻塞等待可写事件的线程:
func (pd *pollDesc) onwrite(p Poll) error {select {case -pd.writeTrigger:default:pd.detach()close(pd.writeTrigger)}return nil
}服务端socket可写事件
当服务端socket可写事件发生时也就是在server accept到客户端连接后发现客户端连接对应的socket可写时会经历下面三步:
如果写缓冲区数据为空那么就移除对当前fd上可写事件的监听否则读取数据到传入的vs缓冲区中
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {// 如果写缓冲区为空if c.outputBuffer.IsEmpty() {// 移除对当前fd上可写事件的监听c.rw2r()return rs, c.supportZeroCopy}// 读取数据到rs中rs c.outputBuffer.GetBytes(vs)return rs, c.supportZeroCopy
}// 不再监听当前FD上的可写事件
func (c *connection) rw2r() {c.operator.Control(PollRW2R)c.triggerWrite(nil) // 唤醒等到可写事件的线程
}采用分散写技术将bs向量中所有数据写入socket内核缓冲区中 var n, err iosend(operator.FD, bs, p.barriers[i].ivs, false supportZeroCopy)3释放掉已经用完的写缓冲区空间同时移除对当前fd上可写事件的监听
func (c *connection) outputAck(n int) (err error) {// 将已经用完的部分内存回收掉 if n 0 {c.outputBuffer.Skip(n)c.outputBuffer.Release()}// 如果此时发现所有待写入数据都写入完毕了,那么就移除对当前fd上可写事件的监听if c.outputBuffer.IsEmpty() {c.rw2r()}return nil
}