网站建设好公司,做外贸的网站有哪些,自媒体横行还有做网站,男科医院网站建设策略上次说了很多Linux下进程相关知识#xff0c;这边不再复述#xff0c;下面来说说Python的并发编程#xff0c;如有错误欢迎提出#xff5e; 如果遇到听不懂的可以看上一次的文章#xff1a;https://www.cnblogs.com/dotnetcrazy/p/9363810.html 官方文档#xff1a;https… 上次说了很多Linux下进程相关知识这边不再复述下面来说说Python的并发编程如有错误欢迎提出 如果遇到听不懂的可以看上一次的文章https://www.cnblogs.com/dotnetcrazy/p/9363810.html 官方文档https://docs.python.org/3/library/concurrency.html 在线预览http://github.lesschina.com/python/base/concurrency/2.并发编程-进程篇.html 1.进程篇¶ 官方文档https://docs.python.org/3/library/multiprocessing.html Codehttps://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess 1.1.进程Process¶ Python的进程创建非常方便看个案例(这种方法通用fork只适用于Linux系) import os
# 注意一下导入的是Process不是processClass是大写开头
from multiprocessing import Processdef test(name):print([子进程-%s]PID%dPPID%d % (name, os.getpid(), os.getppid()))def main():print([父进程]PID%dPPID%d % (os.getpid(), os.getppid()))p Process(targettest, args(萌萌哒, )) # 单个元素的元组表达别忘了(x,)p.start()p.join() # 父进程回收子进程资源内部调用了wait系列方法if __name__ __main__:main()运行结果 [父进程]PID25729PPID23434
[子进程-萌萌哒]PID25730PPID25729 创建子进程时传入一个执行函数和参数用start()方法来启动进程即可 join()方法是父进程回收子进程的封装主要是回收僵尸子进程(点我) 其他参数可以参考源码 or 文档贴一下源码的init方法 def __init__(self,groupNone,targetNone,nameNone,args(),kwargs{},*,daemonNone) 扩展name为当前进程实例的别名 p.is_alive() 判断进程实例p是否还在执行p.terminate() 终止进程发SIGTERM信号上面的案例如果用OOP来实现就是这样(如果不指定方法默认调Run方法) import os
from multiprocessing import Processclass My_Process(Process):# 重写了Proce类的Init方法def __init__(self, name):self.__name nameProcess.__init__(self) # 调用父类方法# 重写了Process类的run()方法def run(self):print([子进程-%s]PID%dPPID%d % (self.__name, os.getpid(),os.getppid()))def main():print([父进程]PID%dPPID%d % (os.getpid(), os.getppid()))p My_Process(萌萌哒) # 如果不指定方法默认调Run方法p.start()p.join() # 父进程回收子进程资源内部调用了wait系列方法if __name__ __main__:main()PSmultiprocessing.Process自行处理僵死进程不用像os.fork那样自己建立信号处理程序、安装信号处理程序 1.1.源码拓展¶ 现在说说里面的一些门道只想用的可以忽略 新版本的封装可能多层这时候可以看看Python3.3.X系列这个算是Python3早期版本了很多代码都暴露出来比较明了直观 multiprocessing.process.py # 3.4.x开始Process有了一个BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeoutNone):一直等到子进程overself._check_closed()# 断言False就触发异常提示就是后面的内容# 开发中用的比较多部署的时候可以python3 -O xxx 去除所以断言assert self._parent_pid os.getpid(), 只能 join 一个子进程assert self._popen is not None, 只能加入一个已启动的进程res self._popen.wait(timeout) # 本质就是用了我们之前讲的wait系列if res is not None:_children.discard(self) # 销毁子进程multiprocessing.popen_fork.py # 3.4.x开始在popen_fork文件中以前是multiprocessing.forking.py
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeoutNone):if self.returncode is None:# 设置超时的一系列处理if timeout is not None:from multiprocessing.connection import waitif not wait([self.sentinel], timeout):return None# 核心操作return self.poll(os.WNOHANG if timeout 0.0 else 0)return self.returncode# 回顾一下上次说的os.WNOHANG - 如果没有子进程退出则不阻塞waitpid()调用
def poll(self, flagos.WNOHANG):if self.returncode is None:try:# 他的内部调用了waitpidpid, sts os.waitpid(self.pid, flag)except OSError as e:# 子进程尚未创建# e.errno errno.ECHILD 10return Noneif pid self.pid:if os.WIFSIGNALED(sts):self.returncode -os.WTERMSIG(sts)else:assert os.WIFEXITED(sts), Status is {:n}.format(sts)self.returncode os.WEXITSTATUS(sts)return self.returncode关于断言的简单说明别泛滥 如果条件为真它什么都不做反之它触发一个带可选错误信息的AssertionError def test(a, b):assert b ! 0, 哥哥分母不能为0啊return a / bdef main():test(1, 0)if __name__ __main__:main()结果 Traceback (most recent call last):File 0.assert.py, line 11, in modulemain()File 0.assert.py, line 7, in maintest(1, 0)File 0.assert.py, line 2, in testassert b ! 0, 哥哥分母不能为0啊
AssertionError: 哥哥分母不能为0啊 运行的时候可以指定-O参数来忽略asserteg python3 -O 0.assert.py Traceback (most recent call last):File 0.assert.py, line 11, in modulemain()File 0.assert.py, line 7, in maintest(1, 0)File 0.assert.py, line 3, in testreturn a / b
ZeroDivisionError: division by zero 扩展 https://docs.python.org/3/library/unittest.html https://www.cnblogs.com/shangren/p/8038935.html 1.2.进程池¶ 多个进程就不需要自己手动去管理了有Pool来帮你完成先看个案例 import os
import time
from multiprocessing import Pool # 首字母大写def test(name):print([子进程-%s]PID%dPPID%d % (name, os.getpid(), os.getppid()))time.sleep(1)def main():print([父进程]PID%dPPID%d % (os.getpid(), os.getppid()))p Pool(5) # 设置最多5个进程不设置就默认为CPU核数for i in range(10):# 异步执行p.apply_async(test, args(i, )) # 同步用apply如非必要不建议用p.close() # 关闭池不再加入新任务p.join() # 等待所有子进程执行完毕回收资源join可以指定超时时间egp.join(1)print(over)if __name__ __main__:main()图示join可以指定超时时间egp.join(1) 调用join()之前必须先调用close()调用close()之后就不能继续添加新的Process了(下面会说为什么) 1.3.源码拓展¶ 验证一下Pool的默认大小是CPU的核数看源码 multiprocessing.pool.py # https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):def __init__(self, processes指定的进程数,...):if processes is None:processes os.cpu_count() or 1 # os.cpu_count() ~ CPU的核数源码里面apply_async方法是有回调函数callback的 def apply_async(self,func,args(),kwds{},callbackNone,error_callbackNone):if self._state ! RUN:raise ValueError(Pool not running)result ApplyResult(self._cache, callback, error_callback)self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))return result来看个例子(和JQ很像) import os
import time
from multiprocessing import Pool # 首字母大写def test(name):print([子进程%s]PID%dPPID%d % (name, os.getpid(), os.getppid()))time.sleep(1)return namedef error_test(name):print([子进程%s]PID%dPPID%d % (name, os.getpid(), os.getppid()))raise Exception([子进程%s]啊我挂了 % name)def callback(result):成功之后的回调函数print([子进程%s]执行完毕 % result) # 没有返回值就为Nonedef error_callback(msg):错误之后的回调函数print(msg)def main():print([父进程]PID%dPPID%d % (os.getpid(), os.getppid()))p Pool() # CPU默认核数for i in range(5):# 搞2个出错的看看if i 2:p.apply_async(error_test,args(i, ),callbackcallback,error_callbackerror_callback) # 异步执行else:# 异步执行成功后执行callback函数有点像jqp.apply_async(test, args(i, ), callbackcallback)p.close() # 关闭池不再加入新任务p.join() # 等待所有子进程执行完毕回收资源print(over)if __name__ __main__:main()输出 [父进程]PID12348PPID10999
[子进程0]PID12349PPID12348
[子进程2]PID12351PPID12348
[子进程1]PID12350PPID12348
[子进程3]PID12352PPID12348
[子进程4]PID12352PPID12348
[子进程3]啊我挂了
[子进程4]啊我挂了
[子进程0]执行完毕
[子进程2]执行完毕
[子进程1]执行完毕
over 接着上面继续拓展补充说说获取函数返回值。上面是通过成功后的回调函数来获取返回值这次说说自带的方法 import time
from multiprocessing import Pool, TimeoutErrordef test(x):开平方time.sleep(1)return x * xdef main():pool Pool()task pool.apply_async(test, (10, ))print(task)try:print(task.get(timeout1))except TimeoutError as ex:print(超时了, ex)if __name__ __main__:main()输出apply_async返回一个ApplyResult类里面有个get方法可以获取返回值 multiprocessing.pool.ApplyResult object at 0x7fbc354f50b8
超时了 再举个例子顺便把Pool里面的map和imap方法搞个案例类比jq import time
from multiprocessing import Pooldef test(x):return x * xif __name__ __main__:with Pool(processes4) as pool:task pool.apply_async(test, (10, ))print(task.get(timeout1))obj_list pool.map(test, range(10))print(obj_list)# 返回一个可迭代类的实例对象obj_iter pool.imap(test, range(10))print(obj_iter)next(obj_iter)for i in obj_iter:print(i, end )输出 100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
multiprocessing.pool.IMapIterator object at 0x7ff7f9734198
1 4 9 16 25 36 49 64 81 微微看一眼源码(基础忘了可以查看 点我 ) class IMapIterator(object):def __init__(self, cache):self._cond threading.Condition(threading.Lock())self._job next(job_counter)self._cache cacheself._items collections.deque()self._index 0self._length Noneself._unsorted {}cache[self._job] selfdef __iter__(self):return self # 返回一个迭代器# 实现next方法def next(self, timeoutNone):with self._cond:try:item self._items.popleft()except IndexError:if self._index self._length:raise StopIteration from Noneself._cond.wait(timeout)try:item self._items.popleft()except IndexError:if self._index self._length:raise StopIteration from Noneraise TimeoutError from Nonesuccess, value itemif success:return valueraise value
......扩展优雅杀死子进程的探讨 https://segmentfault.com/q/1010000005077517 1.4.拓展之subprocess¶ 官方文档https://docs.python.org/3/library/subprocess.html 还记得之前李代桃僵的execlxxx系列吗 这不subprocess就是它的一层封装当然了要强大的多先看个例子以os.execlp的例子为引 import subprocessdef main():# os.execlp(ls, ls, -al) # 执行Path环境变量可以搜索到的命令result subprocess.run([ls, -al])print(result)if __name__ __main__:main()输出 总用量 44
drwxrwxr-x 2 dnt dnt 4096 8月 7 17:32 .
drwxrwxr-x 4 dnt dnt 4096 8月 6 08:01 ..
-rw-rw-r-- 1 dnt dnt 151 8月 3 10:49 0.assert.py
-rw-rw-r-- 1 dnt dnt 723 8月 5 18:00 1.process2.py
-rw-rw-r-- 1 dnt dnt 501 8月 3 10:20 1.process.py
-rw-rw-r-- 1 dnt dnt 1286 8月 6 08:16 2.pool1.py
-rw-rw-r-- 1 dnt dnt 340 8月 7 16:38 2.pool2.py
-rw-rw-r-- 1 dnt dnt 481 8月 7 16:50 2.pool3.py
-rw-rw-r-- 1 dnt dnt 652 8月 5 17:01 2.pool.py
-rw-rw-r-- 1 dnt dnt 191 8月 7 17:33 3.subprocess.py
CompletedProcess(args[ls, -al], returncode0) 文档¶ 现在看下官方的文档描述来理解一下 r
具有可访问I / O流的子进程
Subprocesses with accessible I/O streams此模块允许您生成进程连接到它们输入/输出/错误管道并获取其返回代码。
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.完整文档可以查看https://docs.python.org/3/library/subprocess.html
For a complete description of this module see the Python documentation.Main APIrun(...): 运行命令等待它完成然后返回CompletedProcess实例。
Runs a command, waits for it to complete,
then returns a CompletedProcess instance.Popen(...): 用于在新进程中灵活执行命令的类
A class for flexibly executing a command in a new processConstants常量
---------
DEVNULL: 特殊值表示应该使用os.devnull
Special value that indicates that os.devnull should be usedPIPE: 表示应创建PIPE管道的特殊值
Special value that indicates a pipe should be createdSTDOUT: 特殊值表示stderr应该转到stdout
Special value that indicates that stderr should go to stdoutOlder API尽量不用说不定以后就淘汰了call(...): 运行命令等待它完成然后返回返回码。
Runs a command, waits for it to complete, then returns the return code.check_call(...): Same as call() but raises CalledProcessError()if return code is not 0返回值不是0就引发异常check_output(...): 与check_call相同,但返回stdout的内容,而不是返回代码
Same as check_call but returns the contents of stdout instead of a return codegetoutput(...): 在shell中运行命令等待它完成然后返回输出
Runs a command in the shell, waits for it to complete,then returns the outputgetstatusoutput(...): 在shell中运行命令等待它完成然后返回一个exitcodeoutput元组
Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple其实看看源码很有意思内部其实就是调用的os.popen【进程先导篇讲进程守护的时候用过】 def run(*popenargs, inputNone, capture_outputFalse,timeoutNone, checkFalse, **kwargs):if input is not None:if stdin in kwargs:raise ValueError(stdin和输入参数可能都不会被使用。)kwargs[stdin] PIPEif capture_output:if (stdout in kwargs) or (stderr in kwargs):raise ValueError(不能和capture_outpu一起使用stdout 或 stderr)kwargs[stdout] PIPEkwargs[stderr] PIPEwith Popen(*popenargs, **kwargs) as process:try:stdout, stderr process.communicate(input, timeouttimeout)except TimeoutExpired:process.kill()stdout, stderr process.communicate()raise TimeoutExpired(process.args, timeout, outputstdout, stderrstderr)except: # 包括KeyboardInterrupt的通信处理。process.kill()# 不用使用process.wait.__ exit__为我们做了这件事。raiseretcode process.poll()if check and retcode:raise CalledProcessError(retcode, process.args, outputstdout, stderrstderr)return CompletedProcess(process.args, retcode, stdout, stderr)返回值类型CompletedProcess # https://github.com/lotapp/cpython3/blob/master/Lib/subprocess.py
class CompletedProcess(object):def __init__(self, args, returncode, stdoutNone, stderrNone):self.args argsself.returncode returncodeself.stdout stdoutself.stderr stderrdef __repr__(self):对象按指定的格式显示args [args{!r}.format(self.args),returncode{!r}.format(self.returncode)]if self.stdout is not None:args.append(stdout{!r}.format(self.stdout))if self.stderr is not None:args.append(stderr{!r}.format(self.stderr))return {}({}).format(type(self).__name__, , .join(args))def check_returncode(self):如果退出代码非零则引发CalledProcessErrorif self.returncode:raise CalledProcessError(self.returncode, self.args, self.stdout,self.stderr)简单demo¶ 再来个案例体会一下方便之处 import subprocessdef main():result subprocess.run([ping, www.baidu.com])print(result.stdout)if __name__ __main__:main()图示 交互demo¶ 再来个强大的案例交互的程序都可以比如 ftpnslookup 等等popen1.communicate import subprocessdef main():process subprocess.Popen([ipython3],stdinsubprocess.PIPE,stdoutsubprocess.PIPE,stderrsubprocess.PIPE)try:# 对pstree进行交互out, err process.communicate(inputbprint(hello), timeout3)print(Out:%s\nErr:%s % (out.decode(), err.decode()))except TimeoutError:# 如果超时到期则子进程不会被终止需要自己处理一下process.kill()out, err process.communicate()print(Out:%s\nErr:%s % (out.decode(), err.decode()))if __name__ __main__:main()输出 IPython 6.4.0 -- An enhanced Interactive Python. Type ? for help.In [1]: helloIn [2]: Do you really want to exit ([y]/n)?Err: 注意点如果超时到期则子进程不会被终止需要自己处理一下官方提醒 通信demo¶ 这个等会说进程间通信还会说所以简单举个例子老规矩拿ps aux | grep bash说事 import subprocessdef main():# ps aux | grep bash# 进程1获取结果p1 subprocess.Popen([ps, -aux], stdoutsubprocess.PIPE)# 得到进程1的结果再进行筛选p2 subprocess.Popen([grep, bash], stdinp1.stdout, stdoutsubprocess.PIPE)# 关闭写段结果已经获取到进程2中了防止干扰显示p1.stdout.close()# 与流程交互将数据发送到stdin并关闭它。msg_tuple p2.communicate()# 输出结果print(msg_tuple[0].decode())if __name__ __main__:main()输出以前案例进程间通信PIPE匿名管道 dnt 2470 0.0 0.1 24612 5236 pts/0 Ss 06:01 0:00 bash
dnt 2512 0.0 0.1 24744 5760 pts/1 Ss 06:02 0:00 bash
dnt 20784 0.0 0.1 24692 5588 pts/2 Ss 06:21 0:00 /bin/bash
dnt 22377 0.0 0.0 16180 1052 pts/1 S 06:30 0:00 grep bash 其他扩展可以看看这篇文章subprocess与Popen() 1.5.进程间通信PIPE管道通信¶ 这个比较有意思看个案例 from multiprocessing import Process, Pipedef test(w):w.send([子进程]老爸老妈回来记得喊我一下)msg w.recv()print(msg)def main():r, w Pipe()p1 Process(targettest, args(w, ))p1.start()msg r.recv()print(msg)r.send([父进程]滚犊子赶紧写作业不然我得跪方便面)p1.join()if __name__ __main__:main()结果 老爸老妈回来记得喊我一下
滚犊子赶紧写作业不然我得跪方便面 multiprocessing.Pipe源码分析¶ 按照道理应该子进程自己写完自己读了和上次讲得不一样啊不急先看看源码 # https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def Pipe(self, duplexTrue):返回由管道连接的两个连接对象from .connection import Pipereturn Pipe(duplex)看看connection.Pipe方法的定义部分是不是双向通信就看你是否设置duplexTrue # https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/connection.py
if sys.platform ! win32:def Pipe(duplexTrue):返回管道两端的一对连接对象if duplex:# 双工内部其实是socket系列下次讲s1, s2 socket.socketpair()s1.setblocking(True)s2.setblocking(True)c1 Connection(s1.detach())c2 Connection(s2.detach())else:# 这部分就是我们上次讲的pipe管道fd1, fd2 os.pipe()c1 Connection(fd1, writableFalse)c2 Connection(fd2, readableFalse)return c1, c2
else: def Pipe(duplexTrue):# win平台的一系列处理......c1 PipeConnection(h1, writableduplex)c2 PipeConnection(h2, readableduplex)return c1, c2通过源码知道了原来双工是通过socket搞的啊 再看个和原来一样效果的案例不用关来关去的了方便 from multiprocessing import Process, Pipedef test(w):# 只能写w.send([子进程]老爸咱们完了老妈一直在门口)def main():r, w Pipe(duplexFalse)p1 Process(targettest, args(w, ))p1.start() # 你把这个放在join前面就直接死锁了msg r.recv() # 只能读print(msg)p1.join()if __name__ __main__:main()输出可以思考下为什么start换个位置就死锁提示阻塞读写 [子进程]老爸咱们完了老妈一直在门口 再举个Pool的例子咱们就进入今天的重点了 from multiprocessing import Pipe, Pooldef proc_test1(conn):conn.send([小明]小张今天哥们要见一女孩你陪我呗我24h等你回复哦)msg conn.recv()print(msg)def proc_test2(conn):msg conn.recv()print(msg)conn.send([小张]不去万一被我帅气的外表迷倒就坑了)def main():conn1, conn2 Pipe()p Pool()p.apply_async(proc_test1, (conn1, ))p.apply_async(proc_test2, (conn2, ))p.close() # 关闭池不再接收新任务p.join() # 等待回收必须先关才能join不然会异常if __name__ __main__:main()输出 [小明]小张今天哥们要见一女孩你陪我呗我24h等你回复哦
[小张]不去万一被我帅气的外表迷倒就坑了pool.join源码分析¶ 看看源码就理解了看看Pool的join是啥情况看源码 # https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/pool.py
def join(self):util.debug(joining pool)if self._state RUN:# 没关闭就join这边就会抛出一个异常raise ValueError(Pool is still running)elif self._state not in (CLOSE, TERMINATE):raise ValueError(In unknown state)self._worker_handler.join()self._task_handler.join()self._result_handler.join()for p in self._pool:p.join() # 循环join回收在pool的__init__的方法中这几个属性 self._processes processes # 指定的进程数
self._pool [] # 列表
self._repopulate_pool() # 给列表append内容的方法将池进程的数量增加到指定的数量join的时候会使用这个列表 def _repopulate_pool(self):# 指定进程数-当前进程数差几个补几个for i in range(self._processes - len(self._pool)):w self.Process(targetworker,args(self._inqueue, self._outqueue,self._initializer,self._initargs, self._maxtasksperchild,self._wrap_exception))self._pool.append(w) # 重点来了w.name w.name.replace(Process, PoolWorker)w.daemon True # pool退出后通过pool创建的进程都会退出w.start()util.debug(added worker)注意池的方法只能由创建它的进程使用 1.5.进程间通信Queue管道通信常用¶ 一步步的设局从底层的的pipe()-os.pipe-PIPE现在终于到Queue了心酸啊明知道上面两个项目 里面基本上不会用但为了你们能看懂源码说了这么久%_%其实以后当我们从Queue说到MQ和RPC之后现在 讲得这些进程间通信(IPC)也基本上不会用了但本质你得清楚我尽量多分析点源码这样你们以后看开源项目压力会很小 欢迎批评指正 引入案例¶ from multiprocessing import Process, Queuedef test(q):q.put([子进程]老爸我出去嗨了)print(q.get())def main():q Queue()p Process(targettest, args(q, ))p.start()msg q.get()print(msg)q.put([父进程]去吧比卡丘)p.join()if __name__ __main__:main()输出get和put默认是阻塞等待的 [子进程]老爸我出去嗨了
[父进程]去吧比卡丘 源码拓展¶ 先看看Queue的初始化方法不指定大小就是最大队列数 # 队列类型使用PIPE缓存线程
class Queue(object):# ctx multiprocessing.get_context(xxx)# 上下文总共3种spawn、fork、forkserver扩展部分会提一下def __init__(self, maxsize0, *, ctx):# 默认使用最大容量if maxsize 0:from .synchronize import SEM_VALUE_MAX as maxsizeself._maxsize maxsize # 指定队列大小# 创建了一个PIPE匿名管道单向self._reader, self._writer connection.Pipe(duplexFalse)# multiprocessing/synchronize.py Lockself._rlock ctx.Lock() # 进程锁读【非递归】self._opid os.getpid() # 获取PIDif sys.platform win32:self._wlock Noneelse:self._wlock ctx.Lock() # 进程锁写【非递归】# Semaphore信号量通常用于保护容量有限的资源# 控制信号量,超了就异常self._sem ctx.BoundedSemaphore(maxsize)# 不忽略PIPE管道破裂的错误self._ignore_epipe False # 线程相关操作self._after_fork()# 向_afterfork_registry字典中注册if sys.platform ! win32:register_after_fork(self, Queue._after_fork)关于get和put是阻塞的问题看下源码探探究竟 q.get()收消息 def get(self, blockTrue, timeoutNone):# 默认情况是阻塞lock加锁if block and timeout is None:with self._rlock:res self._recv_bytes()self._sem.release() # 信号量1else:if block:deadline time.monotonic() timeout# 超时抛异常if not self._rlock.acquire(block, timeout):raise Emptytry:if block:timeout deadline - time.monotonic()# 不管有没有内容都去读超时就抛异常if not self._poll(timeout):raise Emptyelif not self._poll():raise Empty# 接收字节数据作为字节对象res self._recv_bytes()self._sem.release() # 信号量1finally:# 释放锁self._rlock.release()# 释放锁后重新序列化数据return _ForkingPickler.loads(res)queue.put():发消息 def put(self, obj, blockTrue, timeoutNone):# 如果Queue已经关闭就抛异常assert not self._closed, Queue {0!r} has been closed.format(self)# 记录信号量的锁if not self._sem.acquire(block, timeout):raise Full # 超过数量抛个异常# 条件变量允许一个或多个线程等待直到另一个线程通知它们with self._notempty:if self._thread is None:self._start_thread()self._buffer.append(obj)self._notempty.notify()非阻塞get_nowait和put_nowait本质其实也是调用了get和put方法 def get_nowait(self):return self.get(False)def put_nowait(self, obj):return self.put(obj, False)进程间通信1¶ 说这么多不如来个例子看看 from multiprocessing import Queuedef main():q Queue(3) # 只能 put 3条消息q.put([1, 2, 3, 4]) # put一个List类型的消息q.put({a: 1, b: 2}) # put一个Dict类型的消息q.put({1, 2, 3, 4}) # put一个Set类型的消息try:# 不加timeout就一直阻塞等消息队列有空位才能发出去q.put(再加条消息呗, timeout2)# Full(Exception)是空实现你可以直接用Exceptionexcept Exception:print(消息队列已满队列数%s当前存在%s条消息 % (q._maxsize, q.qsize()))try:# 非阻塞不能put就抛异常q.put_nowait(再加条消息呗) # 相当于q.put(obj,False)except Exception:print(消息队列已满队列数%s当前存在%s条消息 % (q._maxsize, q.qsize()))while not q.empty():print(队列数%s当前存在%s条消息 内容%s % (q._maxsize, q.qsize(), q.get_nowait()))print(队列数%s当前存在%s条消息 % (q._maxsize, q.qsize()))if __name__ __main__:main()输出 消息队列已满队列数3当前存在3条消息
消息队列已满队列数3当前存在3条消息
队列数3当前存在3条消息 内容[1, 2, 3, 4]
队列数3当前存在2条消息 内容{a: 1, b: 2}
队列数3当前存在1条消息 内容{1, 2, 3, 4}
队列数3当前存在0条消息 补充说明一下 q._maxsize 队列数(尽量不用_开头的属性和方法q.qsize()查看当前队列中存在几条消息q.full()查看是否满了q.empty()查看是否为空再看个简单点的子进程间通信(铺垫demo) import os
import time
from multiprocessing import Process, Queuedef pro_test1(q):print([子进程1]PPID%d,PID%d,GID%d%(os.getppid(), os.getpid(), os.getgid()))q.put([子进程1]小明今晚撸串不)# 设置一个简版的重试机制三次重试for i in range(3):if not q.empty():print(q.get())breakelse:time.sleep((i 1) * 2) # 第一次1s第二次4s第三次6sdef pro_test2(q):print([子进程2]PPID%d,PID%d,GID%d%(os.getppid(), os.getpid(), os.getgid()))print(q.get())time.sleep(4) # 模拟一下网络延迟q.put([子进程2]不去我今天约了妹子)def main():queue Queue()p1 Process(targetpro_test1, args(queue, ))p2 Process(targetpro_test2, args(queue, ))p1.start()p2.start()p1.join()p2.join()if __name__ __main__:main()输出time python3 5.queue2.py [子进程1]PPID15220,PID15221,GID1000
[子进程2]PPID15220,PID15222,GID1000
[子进程1]小明今晚撸串不
[子进程2]不去我今天约了妹子real 0m6.087s
user 0m0.053s
sys 0m0.035s 进程间通信2¶ 多进程基本上都是用pool可用上面说的Queue方法怎么报错了 import os
import time
from multiprocessing import Pool, Queuedef error_callback(msg):print(msg)def pro_test1(q):print([子进程1]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))q.put([子进程1]小明今晚撸串不)# 设置一个简版的重试机制三次重试for i in range(3):if not q.empty():print(q.get())breakelse:time.sleep((i 1) * 2) # 第一次1s第二次4s第三次6sdef pro_test2(q):print([子进程2]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))print(q.get())time.sleep(4) # 模拟一下网络延迟q.put([子进程2]不去我今天约了妹子)def main():print([父进程]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))queue Queue()p Pool()p.apply_async(pro_test1, args(queue, ), error_callbackerror_callback)p.apply_async(pro_test2, args(queue, ), error_callbackerror_callback)p.close()p.join()if __name__ __main__:main()输出(无法将multiprocessing.Queue对象传递给Pool方法) [父进程]PPID4223,PID32170,GID1000
Queue objects should only be shared between processes through inheritance
Queue objects should only be shared between processes through inheritancereal 0m0.183s
user 0m0.083s
sys 0m0.012s 下面会详说先看一下正确方式队列换了一下其他都一样Manager().Queue() import os
import time
from multiprocessing import Pool, Managerdef error_callback(msg):print(msg)def pro_test1(q):print([子进程1]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))q.put([子进程1]小明今晚撸串不)# 设置一个简版的重试机制三次重试for i in range(3):if not q.empty():print(q.get())breakelse:time.sleep((i 1) * 2) # 第一次1s第二次4s第三次6sdef pro_test2(q):print([子进程2]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))print(q.get())time.sleep(4) # 模拟一下网络延迟q.put([子进程2]不去我今天约了妹子)def main():print([父进程]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))queue Manager().Queue()p Pool()p.apply_async(pro_test1, args(queue, ), error_callbackerror_callback)p.apply_async(pro_test2, args(queue, ), error_callbackerror_callback)p.close()p.join()if __name__ __main__:main()输出 [父进程]PPID4223,PID31329,GID1000
[子进程1]PPID31329,PID31335,GID1000
[子进程2]PPID31329,PID31336,GID1000
[子进程1]小明今晚撸串不
[子进程2]不去我今天约了妹子real 0m6.134s
user 0m0.133s
sys 0m0.035s 再抛个思考题Linux import os
import time
from multiprocessing import Pool, Queuedef error_callback(msg):print(msg)q Queue()def pro_test1():global qprint([子进程1]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))q.put([子进程1]小明今晚撸串不)# 设置一个简版的重试机制三次重试for i in range(3):if not q.empty():print(q.get())breakelse:time.sleep((i 1) * 2) # 第一次1s第二次4s第三次6sdef pro_test2():global qprint([子进程2]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))print(q.get())time.sleep(4) # 模拟一下网络延迟q.put([子进程2]不去我今天约了妹子)def main():print([父进程]PPID%d,PID%d,GID%d % (os.getppid(), os.getpid(),os.getgid()))q Queue()p Pool()p.apply_async(pro_test1, error_callbackerror_callback)p.apply_async(pro_test2, error_callbackerror_callback)p.close()p.join()if __name__ __main__:main()输出为啥这样也可以【提示fork】 [父进程]PPID12855,PID16879,GID1000
[子进程1]PPID16879,PID16880,GID1000
[子进程2]PPID16879,PID16881,GID1000
[子进程1]小明今晚撸串不
[子进程2]不去我今天约了妹子real 0m6.120s
user 0m0.105s
sys 0m0.024s 进程拓展¶ 官方参考https://docs.python.org/3/library/multiprocessing.html 1.上下文系¶ spawnWin默认Linux下也可以用【3.4】父进程启动一个新的python解释器进程。子进程只会继承运行进程对象run()方法所需的那些资源。不会继承父进程中不必要的文件描述符和句柄。与使用fork或forkserver相比使用此方法启动进程相当慢。可在Unix和Windows上使用。Windows上的默认设置。fork:Linux下默认父进程用于os.fork()分叉Python解释器。子进程在开始时与父进程相同这时候内部变量之类的还没有被修改父进程的所有资源都由子进程继承用到多线程的时候可能有些问题仅适用于Unix。Unix上的默认值。forkserver常用当程序启动并选择forkserver start方法时将启动服务器进程。从那时起每当需要一个新进程时父进程就会连接到服务器并请求它分叉一个新进程。fork服务器进程是单线程的因此它可以安全使用os.fork()。没有不必要的资源被继承。可在Unix平台上使用支持通过Unix管道传递文件描述符。这块官方文档很详细贴下官方的2个案例 通过multiprocessing.set_start_method(xxx)来设置启动的上下文类型 import multiprocessing as mpdef foo(q):q.put(hello)if __name__ __main__:mp.set_start_method(spawn) # 不要过多使用q mp.Queue()p mp.Process(targetfoo, args(q,))p.start()print(q.get())p.join()输出set_start_method不要过多使用 helloreal 0m0.407s
user 0m0.134s
sys 0m0.012s 如果你把设置启动上下文注释掉消耗的总时间少了很多 real 0m0.072s
user 0m0.057s
sys 0m0.016s 也可以通过multiprocessing.get_context(xxx)获取指定类型的上下文 import multiprocessing as mpdef foo(q):q.put(hello)if __name__ __main__:ctx mp.get_context(spawn)q ctx.Queue()p ctx.Process(targetfoo, args(q,))p.start()print(q.get())p.join()输出get_context在Python源码里用的比较多so也建议大家这么用 helloreal 0m0.169s
user 0m0.146s
sys 0m0.024s 从结果来看总耗时也少了很多 2.日记系列¶ 说下日记相关的事情 先看下multiprocessing里面的日记记录 # https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def log_to_stderr(self, levelNone):打开日志记录并添加一个打印到stderr的处理程序from .util import log_to_stderrreturn log_to_stderr(level)更多Loging模块内容可以看官方文档https://docs.python.org/3/library/logging.html 这个是内部代码看看即可 # https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/util.py
def log_to_stderr(levelNone):打开日志记录并添加一个打印到stderr的处理程序# 全局变量默认是Falseglobal _log_to_stderrimport logging# 日记记录转换成文本formatter logging.Formatter(DEFAULT_LOGGING_FORMAT)# 一个处理程序类它将已适当格式化的日志记录写入流handler logging.StreamHandler() # 此类不会关闭流因为用到了sys.stdout|sys.stderr# 设置格式[%(levelname)s/%(processName)s] %(message)shandler.setFormatter(formatter)# 返回multiprocessing专用的记录器logger get_logger()# 添加处理程序logger.addHandler(handler)if level:# 设置日记级别logger.setLevel(level)# 现在log是输出到stderr的_log_to_stderr Truereturn _loggerLogging之前也有提过可以看看https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.装饰器传参的扩展可传可不传 来个案例 import logging
from multiprocessing import Process, log_to_stderrdef test():print(test)def start_log():# 把日记输出定向到sys.stderr中logger log_to_stderr()# 设置日记记录级别# 敏感程度DEBUG、INFO、WARN、ERROR、CRITICALprint(logging.WARN logging.WARNING) # 这两个是一样的level logging.INFOlogger.setLevel(level) # 设置日记级别(一般都是WARN)# 自定义输出# def log(self, level, msg, *args, **kwargs):logger.log(level, 我是通用格式) # 通用下面的内部也是调用的这个logger.info(info 测试)logger.warning(warning 测试)logger.error(error 测试)def main():start_log()# 做的操作都会被记录下来p Process(targettest)p.start()p.join()if __name__ __main__:main()输出 True
[INFO/MainProcess] 我是通用格式
[INFO/MainProcess] info 测试
[WARNING/MainProcess] warning 测试
[ERROR/MainProcess] error 测试
[INFO/Process-1] child process calling self.run()
test
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down 3.进程5态¶ 之前忘记说了现在快结尾了补充一下进程5态(来个草图) 1.6.进程间状态共享¶ 应该尽量避免进程间状态共享但需求在那所以还是得研究官方推荐了两种方式 1.共享内存Value or Array¶ 之前说过Queue在Process之间使用没问题用到Pool就使用Manager().xxxValue和Array就不太一样了 看看源码Manager里面的Array和Process共享的Array不是一个概念而且也没有同步机制 # https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.py
class Value(object):def __init__(self, typecode, value, lockTrue):self._typecode typecodeself._value valuedef get(self):return self._valuedef set(self, value):self._value valuedef __repr__(self):return %s(%r, %r) % (type(self).__name__, self._typecode, self._value)value property(get, set) # 给value设置get和set方法和value的属性装饰器一样效果def Array(typecode, sequence, lockTrue):return array.array(typecode, sequence)以Process为例看看怎么用 from multiprocessing import Process, Value, Arraydef proc_test1(value, array):print(子进程1, value.value)array[0] 10print(子进程1, array[:])def proc_test2(value, array):print(子进程2, value.value)array[1] 10print(子进程2, array[:])def main():try:value Value(d, 3.14) # d 类型相当于C里面的doublearray Array(i, range(10)) # i 类型相当于C里面的intprint(type(value))print(type(array))p1 Process(targetproc_test1, args(value, array))p2 Process(targetproc_test2, args(value, array))p1.start()p2.start()p1.join()p2.join()print(父进程, value.value) # 获取值print(父进程, array[:]) # 获取值except Exception as ex:print(ex)else:print(No Except)if __name__ __main__:main()输出Value和Array是进程|线程安全的 class multiprocessing.sharedctypes.Synchronized
class multiprocessing.sharedctypes.SynchronizedArray
子进程1 3.14
子进程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]
子进程2 3.14
子进程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]
父进程 3.14
父进程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]
No Except类型方面的对应关系 typecode_to_type {c: ctypes.c_char,u: ctypes.c_wchar,b: ctypes.c_byte,B: ctypes.c_ubyte,h: ctypes.c_short,H: ctypes.c_ushort,i: ctypes.c_int,I: ctypes.c_uint,l: ctypes.c_long,L: ctypes.c_ulong,q: ctypes.c_longlong,Q: ctypes.c_ulonglong,f: ctypes.c_float,d: ctypes.c_double
}这两个类型其实是ctypes类型更多的类型可以去 multiprocessing.sharedctypes查看来张图 回头解决GIL的时候会用到C系列或者Go系列的共享库讲线程的时候会说 关于进程安全的补充说明对于原子性操作就不用说铁定安全但注意一下i1并不是原子性操作 from multiprocessing import Process, Valuedef proc_test1(value):for i in range(1000):value.value 1def main():value Value(i, 0)p_list [Process(targetproc_test1, args(value, )) for i in range(5)]# 批量启动for i in p_list:i.start()# 批量资源回收for i in p_list:i.join()print(value.value)if __name__ __main__:main()输出理论上应该是:5×10005000 2153 稍微改一下才行进程安全只是提供了安全的方法并不是什么都不用你操心了 # 通用方法
def proc_test1(value):for i in range(1000):if value.acquire():value.value 1value.release()# 官方案例(Lock可以使用with托管)
def proc_test1(value):for i in range(1000):with value.get_lock():value.value 1# 更多可以查看sharedctypes.SynchronizedBase 源码输出关于锁这块后面讲线程的时候会详说看看就好【语法的确比C#麻烦点】 5000 看看源码之前探讨如何优雅的杀死子进程其中就有一种方法使用了Value def Value(typecode_or_type, *args, lockTrue, ctxNone):返回Value的同步包装器obj RawValue(typecode_or_type, *args)if lock is False:return obj# 默认支持Lockif lock in (True, None):ctx ctx or get_context() # 获取上下文lock ctx.RLock() # 获取递归锁if not hasattr(lock, acquire): raise AttributeError(%r has no method acquire % lock)# 一系列处理return synchronized(obj, lock, ctxctx)def Array(typecode_or_type, size_or_initializer, *, lockTrue, ctxNone):返回RawArray的同步包装器obj RawArray(typecode_or_type, size_or_initializer)if lock is False:return obj# 默认是支持Lock的if lock in (True, None):ctx ctx or get_context() # 获取上下文lock ctx.RLock() # 递归锁属性# 查看是否有acquire属性if not hasattr(lock, acquire):raise AttributeError(%r has no method acquire % lock)return synchronized(obj, lock, ctxctx)扩展部分可以查看这篇文章http://blog.51cto.com/11026142/1874807 2.服务器进程Manager¶ 官方文档https://docs.python.org/3/library/multiprocessing.html#managers 有一个服务器进程负责维护所有的对象而其他进程连接到该进程通过代理对象操作服务器进程当中的对象 通过返回的经理Manager()将支持类型list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue 举个简单例子后面还会再说(本质其实就是多个进程通过代理共同操作服务端内容) from multiprocessing import Pool, Managerdef test1(d, l):d[1] 1d[2] 2d[0.25] Nonel.reverse()def test2(d, l):print(d)print(l)def main():with Manager() as manager:dict_test manager.dict()list_test manager.list(range(10))pool Pool()pool.apply_async(test1, args(dict_test, list_test))pool.apply_async(test2, args(dict_test, list_test))pool.close()pool.join()if __name__ __main__:main()输出 {1: 1, 2: 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] 服务器进程管理器比使用共享内存对象更灵活因为它们可以支持任意对象类型。此外单个管理器可以通过网络在不同计算机上的进程共享。但是它们比使用共享内存慢毕竟有了“中介” 同步问题依然需要注意一下举个例子体会一下 from multiprocessing import Manager, Process, Lockdef test(dict1, lock):for i in range(100):with lock: # 你可以把这句话注释掉然后就知道为什么加了dict1[year] 1def main():with Manager() as m:lock Lock()dict1 m.dict({year: 2000})p_list [Process(targettest, args(dict1, lock)) for i in range(5)]for i in p_list:i.start()for i in p_list:i.join()print(dict1)if __name__ __main__:main()扩展补充 multiprocessing.Lock是一个进程安全对象因此您可以将其直接传递给子进程并在所有进程中安全地使用它。大多数可变Python对象如listdict,大多数类不能保证进程中安全所以它们在进程间共享时需要使用Manager多进程模式的缺点是创建进程的代价大在Unix/Linux系统下用fork调用还行在Windows下创建进程开销巨大。Manager这块官方文档很详细可以看看https://docs.python.org/3/library/multiprocessing.html#managers WinServer的可以参考这篇 or 这篇埋坑记Manager一般都是部署在Linux的Win的客户端不影响 扩展补充¶ 还记得之前的无法将multiprocessing.Queue对象传递给Pool方法吗其实一般都是这两种方式解决的 使用Manager需要生成另一个进程来托管Manager服务器。 并且所有获取/释放锁的调用都必须通过IPC发送到该服务器。使用初始化程序在池创建时传递常规multiprocessing.Queue()这将使Queue实例在所有子进程中全局共享再看一下Pool的__init__方法 # processes进程数
# initializer,initargs 初始化进行的操作
# maxtaskperchild每个进程执行task的最大数目
# contex上下文对象
def __init__(self, processesNone, initializerNone, initargs(),maxtasksperchildNone, contextNone):第一种方法不够轻量级在讲案例前稍微说下第二种方法(也算把上面留下的悬念解了) import os
import time
from multiprocessing import Pool, Queuedef error_callback(msg):print(msg)def pro_test1():print([子进程1]PPID%d,PID%d % (os.getppid(), os.getpid()))q.put([子进程1]小明今晚撸串不)# 设置一个简版的重试机制三次重试for i in range(3):if not q.empty():print(q.get())breakelse:time.sleep((i 1) * 2) # 第一次1s第二次4s第三次6sdef pro_test2():print([子进程2]PPID%d,PID%d % (os.getppid(), os.getpid()))print(q.get())time.sleep(4) # 模拟一下网络延迟q.put([子进程2]不去我今天约了妹子)def init(queue):global qq queuedef main():print([父进程]PPID%d,PID%d % (os.getppid(), os.getpid()))queue Queue()p Pool(initializerinit, initargs(queue, ))p.apply_async(pro_test1, error_callbackerror_callback)p.apply_async(pro_test2, error_callbackerror_callback)p.close()p.join()if __name__ __main__:main()输出就是在初始化Pool的时候传了初始化执行的方法并传了参数alizerinit, initargs(queue, )) [父进程]PPID13157,PID24864
[子进程1]PPID24864,PID24865
[子进程2]PPID24864,PID24866
[子进程1]小明今晚撸串不
[子进程2]不去我今天约了妹子real 0m6.105s
user 0m0.071s
sys 0m0.042s Win下亦通用win下没有os.getgid 1.7.分布式进程的案例¶ 有了1.6的基础咱们来个例子练练 BaseManager的缩略图 服务器端代码 from multiprocessing import Queue
from multiprocessing.managers import BaseManagerdef main():# 用来身份验证的key b8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92get_zhang_queue Queue() # 小张消息队列get_ming_queue Queue() # 小明消息队列# 把Queue注册到网络上, callable参数关联了Queue对象BaseManager.register(get_zhang_queue, callablelambda: get_zhang_queue)BaseManager.register(get_ming_queue, callablelambda: get_ming_queue)# 实例化一个Manager对象。绑定ip端口, 设置验证秘钥manager BaseManager(address(192.168.36.235, 5438), authkeykey)# 运行servemanager.get_server().serve_forever()if __name__ __main__:main()客户端代码1 from multiprocessing.managers import BaseManagerdef main():客户端1key b8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92# 注册对应方法的名字从网络上获取QueueBaseManager.register(get_ming_queue)BaseManager.register(get_zhang_queue)# 实例化一个Manager对象。绑定ip端口, 设置验证秘钥m BaseManager(address(192.168.36.235, 5438), authkeykey)# 连接到服务器m.connect()q1 m.get_zhang_queue() # 在自己队列里面留言q1.put([小张]小明老大明天是不是去外地办事啊)q2 m.get_ming_queue() # 获取小明说的话print(q2.get())if __name__ __main__:main()客户端代码2 from multiprocessing.managers import BaseManagerdef main():客户端2key b8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92# 注册对应方法的名字从网络上获取QueueBaseManager.register(get_ming_queue)BaseManager.register(get_zhang_queue)# 实例化一个Manager对象。绑定ip端口, 设置验证秘钥m BaseManager(address(192.168.36.235, 5438), authkeykey)# 连接到服务器m.connect()q1 m.get_zhang_queue() # 获取小张说的话print(q1.get())q2 m.get_ming_queue() # 在自己队列里面留言q2.put([小明]这几天咱们终于可以不加班了(_))if __name__ __main__:main() 输出图示 服务器运行在Linux的测试 其实还有一部分内容没说明天得出去办点事先到这吧后面找机会继续带一下 参考文章 进程共享的探讨python-sharing-a-lock-between-processes 多进程锁的探讨trouble-using-a-lock-with-multiprocessing-pool-pickling-error JoinableQueue扩展https://www.cnblogs.com/smallmars/p/7093603.html Python多进程编程https://www.cnblogs.com/kaituorensheng/p/4445418.html 有深度但需要辩证看的两篇文章 跨进程对象共享http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue 关于Queuehttp://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process NetCore并发编程¶ Python的线程、并行、协程下次说 示例代码https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency 先简单说下概念其实之前也有说所以简说下 并发同时做多件事情多线程并发的一种形式并行处理多线程的一种线程池产生的一种并发类型eg异步编程响应式编程一种编程模式对事件进行响应有点类似于JQ的事件Net里面很少用进程在以前基本上都是线程池异步并行协程 我这边简单引入一下毕竟主要是写Python的教程Net只是帮你们回顾一下如果你发现还没听过这些概念或者你的项目中还充斥着各种Thread和ThreadPool的话真的得系统的学习一下了现在官网的文档已经很完善了记得早几年啥都没有也只能挖那些外国开源项目 https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency 1.异步编程Task¶ Task的目的其实就是为了简化Thread和ThreadPool的代码下面一起看看吧 异步用起来比较简单一般IODBNet用的比较多很多时候都会采用重试机制举个简单的例子 /// summary
/// 模拟一个网络操作别忘了重试机制
/// /summary
/// param nameurlurl/param
/// returns/returns
private async static Taskstring DownloadStringAsync(string url)
{using (var client new HttpClient()){// 设置第一次重试时间var nextDelay TimeSpan.FromSeconds(1);for (int i 0; i 3; i){try{return await client.GetStringAsync(url);}catch { }await Task.Delay(nextDelay); // 用异步阻塞的方式防止服务器被太多重试给阻塞了nextDelay * 2; // 3次重试机会第一次1s第二次2s第三次4s}// 最后一次尝试错误就抛出return await client.GetStringAsync(url);}
}然后补充说下Task异常的问题当你await的时候如果有异常会抛出在第一个await处捕获处理即可 如果async和await就是理解不了的可以这样想async就是为了让await生效为了向后兼容 对了如果返回的是void你设置成Task就行了触发是类似于事件之类的方法才使用void不然没有返回值都是使用Task 项目里经常有这么一个场景等待一组任务完成后再执行某个操作,看个引入案例 /// summary
/// 1.批量任务
/// /summary
/// param namelist/param
/// returns/returns
private async static Taskstring[] DownloadStringAsync(IEnumerablestring list)
{using (var client new HttpClient()){var tasks list.Select(url client.GetStringAsync(url)).ToArray();return await Task.WhenAll(tasks);}
}再举一个场景同时调用多个同效果的API有一个返回就好了其他的忽略 /// summary
/// 2.返回首先完成的Task
/// /summary
/// param namelist/param
/// returns/returns
private static async Taskstring GetIPAsync(IEnumerablestring list)
{using (var client new HttpClient()){var tasks list.Select(url client.GetStringAsync(url)).ToArray();var task await Task.WhenAny(tasks); // 返回第一个完成的Taskreturn await task;}
}一个async方法被await调用后当它恢复运行时就会回到原来的上下文中运行。 如果你的Task不再需要上下文了可以使用task.ConfigureAwait(false)eg写个日记还要啥上下文 逆天的建议是在核心代码里面一种使用ConfigureAwait用户页面相关代码不需要上下文的加上 其实如果有太多await在上下文里恢复那也是比较卡的使用ConfigureAwait之后被暂停后会在线程池里面继续运行 再看一个场景比如一个耗时操作我需要指定它的超时时间 /// summary
/// 3.超时取消
/// /summary
/// returns/returns
private static async Taskstring CancellMethod()
{//实例化取消任务var cts new CancellationTokenSource();cts.CancelAfter(TimeSpan.FromSeconds(3)); // 设置失效时间为3stry{return await DoSomethingAsync(cts.Token);}// 任务已经取消会引发TaskCanceledExceptioncatch (TaskCanceledException ex){return false;}
}
/// summary
/// 模仿一个耗时操作
/// /summary
/// returns/returns
private static async Taskstring DoSomethingAsync(CancellationToken token)
{await Task.Delay(TimeSpan.FromSeconds(5), token);return ok;
}异步这块简单回顾就不说了留两个扩展你们自行探讨 进度方面的可以使用IProgressT就当留个作业自己摸索下吧使用了异步之后尽量避免使用task.Wait or task.Result这样可以避免死锁Task其他新特征去官网看看吧引入到此为止了。 2.并行编程Parallel¶ 这个其实出来很久了现在基本上都是用PLinq比较多点主要就是 数据并行重点在处理数据eg聚合任务并行重点在执行任务每个任务块尽可能独立越独立效率越高数据并行¶ 以前都是Parallel.ForEach这么用现在和Linq结合之后非常方便.AsParallel()就OK了 说很抽象看个简单案例 static void Main(string[] args)
{IEnumerableint list new Listint() { 1, 2, 3, 4, 5, 7, 8, 9 };foreach (var item in ParallelMethod(list)){Console.WriteLine(item);}
}
/// summary
/// 举个例子
/// /summary
private static IEnumerableint ParallelMethod(IEnumerableint list)
{return list.AsParallel().Select(x x * x);
}正常执行的结果应该是 1
4
9
25
64
16
49
81 并行之后就是这样了不管顺序了 25
64
1
9
49
81
4
16 当然了如果你就是对顺序有要求可以使用.AsOrdered() /// summary
/// 举个例子
/// /summary
private static IEnumerableint ParallelMethod(IEnumerableint list)
{return list.AsParallel().AsOrdered().Select(x x * x);
}其实实际项目中使用并行的时候任务时间适中太长不适合太短也不适合 记得大家在项目里经常会用到如SumCount等聚合函数其实这时候使用并行就很合适 var list new Listlong();
for (long i 0; i 1000000; i)
{list.Add(i);
}
Console.WriteLine(GetSumParallel(list));private static long GetSumParallel(IEnumerablelong list)
{return list.AsParallel().Sum();
}time dotnet PLINQ.dll 499999500000real 0m0.096s
user 0m0.081s
sys 0m0.025s 不使用并行稍微多了点CPU越密集差距越大 499999500000real 0m0.103s
user 0m0.092s
sys 0m0.021s 其实聚合有一个通用方法可以支持复杂的聚合(以上面sum为例) .Aggregate(seed:0,func:(sum,item)sumitem); 稍微扩展一下PLinq也是支持取消的.WithCancellation(CancellationToken) Token的用法和上面一样就不复述了如果需要和异步结合一个Task.Run就可以把并行任务交给线程池了 也可以使用Task的异步方法设置超时时间这样PLinq超时了也就终止了 PLinq这么方便其实也是有一些小弊端的比如它会直接最大程度的占用系统资源可能会影响其他的任务而传统的Parallel则会动态调整 任务并行并行调用¶ 这个PLinq好像没有对应的方法有新语法你可以说下来举个例子 await Task.Run(() Parallel.Invoke(() Task.Delay(TimeSpan.FromSeconds(3)),() Task.Delay(TimeSpan.FromSeconds(2))));取消也支持 Parallel.Invoke(new ParallelOptions() { CancellationToken token }, actions);扩充说明¶ 其实还有一些比如数据流和响应编程没说这个之前都是用第三方库刚才看官网文档好像已经支持了所以就不卖弄了感兴趣的可以去看看其实项目里面有流数据相关的框架egSpark都是比较成熟的解决方案了基本上也不太使用这些了。 然后还有一些没说比如NetCore里面不可变类型列表、字典、集合、队列、栈、线程安全字典等等以及限流、任务调度等这些关键词我提一下也方便你去搜索自己学习拓展 先到这吧其他的自己探索一下吧最后贴一些Nuget库你可以针对性的使用 数据流Microsoft.Tpl.Dataflow响应编程(Linq的Rx操作)Rx-Main不可变类型Microsoft.Bcl.Immutable不得不感慨一句微软妈妈真的花了很多功夫Net的并发编程比Python省心多了完