多进程
2022/7/7 5:20:15
本文主要是介绍多进程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
multiprocessing通过使用子进程而非线程有效的绕过了全局解释器锁。multiprocessing可以利用cpu的多核性能。multiprocessing的Api与threading类似
Process类
开启子进程的方法
- spawn
- 启动一个全新的python解释器进程,子进程不继承父进程的文件描述符或其它资源,只继承和run相关的资源。windows默认
- fork
- 父进程使用os.fork()来开启一个子进程。子进程继承父进程的所有资源。Unix默认。
- forkserver
- 使用forkserver时,会启动一个服务器进程来调用os.fork()来创建子进程。
- 通过上下文对象创建子进程
使用multiprocessing.get_context()方法来获取上下文对象,上下文对象有和multiprocessing相似的Api。
对象在不同上下文创建的进程可能不兼容,fork上下文创建的锁不能传递给spawn或forkserver启动方法启动的进程。
multiprocessing.get_context(method=None):- 返回一个Context对象,具有和multiprocessing 模块相同的API。
- 如果method为None,则返回默认上下文对象
- method为fork、spawn或forkserver
import multiprocessing import time def fun(i): print(f"process{i} start at {time.strftime('%X')}") if __name__ == "__main__": ctx = multiprocessing.get_context() p1 = ctx.Process(target=fun, args=(1,)) p2 = ctx.Process(target=fun, args=(2,)) p1.start() p2.start() p1.join() p2.join()
Process
*class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
与threading.Thread的api类似。run()、start()、join()、name、is_alive()
- daemon
- 守护进程的标志,当进程退出时,会尝试终止所有守护进程子进程。不允许在守护进程中创建子进程
- pid: 进程ID,start之前为None
- exitcode: 子进程退出代码。
- 如果该进程尚未终止为None如果子进程run方法正常返回,退出代码将是0.如果它通过sys.exit()终止将返回一个N。
- 如果时因为run内未捕获异常终止返回1
- 如果由信号N终止,返回-N
- authkey: 进程的身份验证密钥(字节字符串)
- multiprocessing初始化时,主进程使用os.urandom()分配一个随机字符串。
- 创建Process对象时,它将继承父进程的身份验证密钥
- sentinel
- 系统对象的数字句柄,当进程结束时变为ready
- 如果要使用 multiprocessing.connection.wait() 一次等待多个事件,可以使用此值。否则调用 join() 更简单。
- 在Windows上,这是一个操作系统句柄,可以与 WaitForSingleObject 和 WaitForMultipleObjects 系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自 select 模块的原语。
- terminate():终止进程
- Unix上这是使用SIGTERM 信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等。
- 子进程的子进程不会被终止,它们会变成孤儿进程
- kill(): 与terminate相同,在Unix上使用SIGKILL信号
- close(): 关闭Process对象,释放与之关联的所有资源,如果底层进程仍在运行将会引发ValueError。
进程间交换对象
队列
multiprocessing的Queue类时queue.Queue的克隆,是一个线程安全的队列。put方法添加元素时如果队满会一直阻塞直到有空间放入元素。get方法获取元素时如果队空也会一直阻塞。
multiprocessing.Queue([maxsize])
- qsize():返回队列长度,但是由于多线程或多进程的上下文,数字不可靠。Unix平台会引起NotImplementedError
- empty():队列是否为空。因为多线程或多进程环境状态不可靠。
- put(obj, block=Ture, timeout=None):
- 添加元素,block为True和timeout为None时会阻塞当前进程。直到有空的缓冲槽。
- 如果timeout为正数,则会在超时后抛出queue.Full异常。如果block为False时,不会阻塞,会抛出queue.Full异常。
- put_nowait(obj): 等同于put(obj, block=False)
- get(block=True, timeout=None):
- 获取元素。如果超时或者block为False会抛出queue.Empty异常。
- get_nowait(obj): 相当于get(False)
get和put方法在队列关闭后会抛出ValueError(3.8) - close():指示当前进程将不会再往队列中放入对象。一旦所有缓冲区的数据被写入管道之后,后台的线程会退出。
- join_thread():等待后台线程,再close方法之后调用,阻塞当前进程直到后台线程退出,确保缓冲区数据被写入管道。
- cancel_join_thread():防止join_thread方法阻塞当前进程。
multiprocessing.SimpleQueue
简化版的Queue - close():关闭队列,释放内部资源。队列在被关闭后就不再被使用。不能再用get,put,empty方法
- empty()
- get()
- put(item)
multiprocessing.JoinableQueue([maxsize])
Queue子类额外添加了task_done和join方法 - task_done():
- 支出之前进入队列的任务已经完成,由队列的消费者进程使用。每次调用get获取的任务,执行完成后调用task_done告诉队列该任务已经处理完成。
- 如果join方法正在阻塞,则在所有对象都被处理完后返回。
- join(): 阻塞队列直到所有元素都被接受和处理完毕。
- 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。
import multiprocessing import random import time import random class Producer(multiprocessing.Process): def __init__(self, queue): super().__init__() self.queue = queue def run(self): for i in range(10): item = random.randint(0,256) self.queue.put(item) print(f"producer append {item} to queue") time.sleep(1) print(f"the size of queue is {self.queue.qsize()}") class Consumer(multiprocessing.Process): def __init__(self, queue): super().__init__() self.queue = queue def run(self): while True: if self.queue.empty(): print("queue is empty") break else: time.sleep(2) item = self.queue.get() print(f"Consumer get {item}") time.sleep(1) if __name__ =="__main__": queue = multiprocessing.Queue() producer = Producer(queue) consumer = Consumer(queue) producer.start() consumer.start() producer.join() consumer.join()
管道
multiprocessing.Pipe([duplex])
返回一对Connection对象,(con1,con2),分别表示管道两端。duplex默认为True,表示可以双向通信。如果False为单向的con1只能接收消息,con2只能发送。
multiprocessing.connection.Connection
连接对象,允许发送可序列化对象。
- send(obj):发生一个可序列化的对象
- recv():返回另一端使用send发送的对象,该方法会一直阻塞直到接收到对象,如果对端关闭了连接或没有东西可接收返回EOFerror
- fileno():返回由连接对象使用的描述符或者句柄
- close():关闭对象
5.poll([timeout]):返回连接对象中释放有可以读取的数据。如果timeout是None那么将一直等待不会超时。
import multiprocessing import time def send(left, right): left.send(['left', time.strftime("%X")]) print(left.recv()) def recv(left,right): right.send(['right', time.strftime("%X")]) print(right.recv()) if __name__ == '__main__': left,right = multiprocessing.Pipe() s_p = multiprocessing.Process(target=send, args=(left,right)) s_p.start() r_p = multiprocessing.Process(target=recv, args=(left,right)) r_p.start() s_p.join() r_p.join()
共享内存
multiprocessing.Value(typecode_or_type, *args, lock=True)
返回从共享内存上创建的ctypes对象,默认情况下返回的对象实际上是经过了同步器包装过的,可以通过value属性访问对象本身。
- typecode_or_type指明了返回的对象类型。可能是ctype类型或array模块中每个类型对应的单字符长度的字符串。
- *args会传递给这个类的构造函数
- lock默认为True,将会新建一个递归式用于同步此值的访问操作。如果是Lock或RLock对象,那么这个传入的锁将会用于同步这个值的访问操作。如果为False,则这个对象的访问将没有锁保护,这个变量不是进程安全的。
from multiprocessing import Process, Value def f(v): with v.get_lock(): # += 类操作不具有原子性,使用对象内部关联锁 v.value+=1 if __name__ == "__main__": v = Value('i',0) p1 = Process(target=f, args=(v,)) p2 = Process(target=f, args=(v,)) p1.start() p2.start() p1.join() p2.join() print(v.value)
multiprocessing.Array(typecode_or_type, size_or_initializer,*,lock=True)
从共享内存申请并返回一个具有ctypes类型的数组对象,默认情况下返回值实际上是被同步器包装过的数组对象。
- size_or_initializer如果是整数,则表示数组长度,并且每个元素都会初始化为0,如果是一个序列,则会使用这个序列初始化数组中的每一元素,并且根据元素个数自动判断数组长度。
from multiprocessing import Process, Array def f(arr, i): arr[i]=i if __name__ =="__main__": arr = Array('i', 10) processes = [] for i in range(10): process = Process(target=f,args=(arr,i)) processes.append(process) for p in processes: p.start() for p in processes: p.join() print(arr[:])
管理器multiprocessing.Manager
管理器维护一个用于管理共享对象的服务,其他进程可以通过代理访问这些共享对象。
multiprocessing.Manager()返回一个已启动的SyncManager管理器对象,可以用于在不同进程中共享数据。
支持 list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。
from multiprocessing import Manager , Process def f(mylist, i): mylist.append(i) if __name__ =="__main__": manager = Manager() mylist = manager.list() processes = [] for i in range(10): p = Process(target=f, args=(mylist, i)) processes.append(p) for p in processes: p.start() for p in processes: p.join() print(mylist)
进程池
multiprocessing.Pool([processes[,initalizer[,initargs[,maxtaskperchild[,[context]]]]])
返回一个进程池对象,它控制可以提交作业的工作进程池,支持超时和回调的异步结果以及并行的map。
- processes进程数,如果为None,则使用os.cup_count()返回的值
- 如果initalizer不为None,则每个工作进程将会在启动时调用initalizer(*initargs)
- apply(func[,args[,kwds]])
- 使用args参数以及kwds命名参数调用func,在返回结果前阻塞
- apply_async(func[,args[,kwds[,callback[,error_callback]]]])
- appyly的变种返回AsyncResult对象
- callback和error_callback是一个接受单格参数的可调用对象,执行成功调用callback,否则调用error_callback
- 回调函数应该立即执行完成,否则会阻塞负责处理结果的线程
- map(func, iterable[,chunksize])
- 内置map()函数的并行版本,会保持阻塞到获得结果,该方法会将可迭代对象分割为许多块,提交给进程池,可以将chunksize设置为一个正整数从而近似指定块的大小
- map_async(func,iterable[,chunksize[,callback[,error_callback]]])
- map的变种,返回AsyncResult对象
- close(): 阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。
- terminate():不等待未完成任务,立即停止工作进程,进程池对象被垃圾回收时,会立即调用termainate
- join(): 必须在close或terminate后调用
multiprocessing.pool.AsyncResult
Pool.apply_async和pool.map_async()返回的对象所属的类。
- get([timeout]):获取执行结果
- wait([timeout]): 阻塞直到返回结果
- ready(): 返回执行状态,是否已经完成
- successful():判断是否已经完成并且未引发异常。如果还未获得结果将引发ValueError
from multiprocessing import Pool import time def f(x): time.sleep(1) return x**x def mf(x): time.sleep(0.5) return x*2 def initializer(*args): print(args, time.strftime("%X")) if __name__ =="__main__": with Pool(processes=4, initializer=initializer,initargs=("init-",)) as pool: print(f"apply - start {time.strftime('%X')}") print(pool.apply(f,(10,))) # 阻塞直到运行完成 print(f"apply - end{time.strftime('%X')}") print(f"apply_async - start {time.strftime('%X')}") result = pool.apply_async(f,(10,)) # 异步执行不阻塞当前进程 print(f"apply_async - end{time.strftime('%X')}") print(result.get()) print(f"map - start {time.strftime('%X')}") print(pool.map(mf,[i for i in range(10)])) # 阻塞直到运行完成 print(f"map- end{time.strftime('%X')}") print(f"map_async - start {time.strftime('%X')}") result = pool.map_async(mf,[i for i in range(10)]) # 异步执行不阻塞当前进程 print(f"mapy_async - end{time.strftime('%X')}") print(result.get())
这篇关于多进程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-27OpenFeign服务间调用学习入门
- 2024-12-27OpenFeign服务间调用学习入门
- 2024-12-27OpenFeign学习入门:轻松掌握微服务通信
- 2024-12-27OpenFeign学习入门:轻松掌握微服务间的HTTP请求
- 2024-12-27JDK17新特性学习入门:简洁教程带你轻松上手
- 2024-12-27JMeter传递token学习入门教程
- 2024-12-27JMeter压测学习入门指南
- 2024-12-27JWT单点登录学习入门指南
- 2024-12-27JWT单点登录原理学习入门
- 2024-12-27JWT单点登录原理学习入门