多进程的通信方式
2021/10/21 7:09:35
本文主要是介绍多进程的通信方式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
多进程的通信
一、队列(Queue)
''' 一、队列(Queue) Queue.qsize() 返回当前队列包含的消息数量 Queue.empty() 如果队列为空,返回 True,否则返回 False Queue.full() 如果队列满了,返回 True,否则返回 False Queue.get([block[, timeout]]) 获取队列中的一条消息,然后将其从队列中移除,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout(单位:秒),消息队列为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止,如果设置了 timeout,则会等待 timeout 秒,若还没有读取到任何消息,则抛出 Queue.Empty 异常 Queue.get_nowait() 相当于 Queue.get(False) Queue.put(item, [block[, timeout]]) 将 item 消息写入队列,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout(单位:秒),消息队列如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置了 timeout,则会等待 timeout 秒,若还没有空间,则抛出 Queue.full 异常 Queue.put_nowait(item) 相当于 Queue.put(item, False) ''' from multiprocessing import Queue if __name__ == '__main__': q = Queue(3) # 初始化一个 Queue 对象,最多可接收 3 条 put 消息 q.put("消息1") q.put("消息2") print(q.full()) q.put("消息3") print(q.full()) # 因为消息队列已满,再 put 会报异常,第一个 try 等待 2 秒后再抛出异常,第二个 try 立刻抛出 try: q.put("消息4", True, 2) except: print("消息队列已满,现有消息数量: %s" % q.qsize()) try: q.put_nowait("消息4") except: print("消息队列已满,现有消息数量: %s" % q.qsize()) # 读取消息时,先判断消息队列是否为空,再读取 if not q.empty(): print("----从消息队列中获取消息--") for i in range(q.qsize()): print(q.get_nowait()) # False # True # 消息队列已满,现有消息数量: 3 # 消息队列已满,现有消息数量: 3 # ----从消息队列中获取消息-- # 消息1 # 消息2 # 消息3 ''' 下面通过一个实例结合 Process 和 Queue 实现进程间通信。创建两个子进程,一个子进程负责向队列中写入数据,另一个子进程负责从队列中读取数据。 生产者--消费者模型 ''' from multiprocessing import Process, Queue import time # 向对列中写入数据 def write_task(q): if not q.full(): for i in range(5): message = "消息" + str(i) q.put(message) print("写入: %s" % message) # 从队列读取数据 def read_task(q): time.sleep(1) while not q.empty(): print("读取: %s" % q.get(True, 2)) # 等待 2 秒,如果还没有读取到任何消息,则抛出异常 if __name__ == '__main__': print("---父进程开始---") q = Queue() # 父进程创建 Queue,并传递给子进程 pw = Process(target=write_task, args=(q,)) pr = Process(target=read_task, args=(q,)) pw.start() pr.start() print("---等待子进程结束---") pw.join() pr.join() print("---父进程结束---") # ---父进程开始--- # ---等待子进程结束--- # 写入: 消息0 # 写入: 消息1 # 写入: 消息2 # 写入: 消息3 # 写入: 消息4 # 读取: 消息0 # 读取: 消息1 # 读取: 消息2 # 读取: 消息3 # 读取: 消息4 # ---父进程结束---
二、可连接的共享队列(multiprocessing.JoinableQueue())
#encoding=utf-8 ''' multiprocessing.JoinableQueue() 创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 q = JoinableQueue() q.join() q.task_done ''' from multiprocessing import Process, JoinableQueue import time, random, os def consumer(q): while True: res = q.get() time.sleep(random.randint(1, 3)) print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res)) q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走了 def producer(name, q): for i in range(10): time.sleep(random.randint(1, 3)) res = '%s%s' % (name, i) q.put(res) print('\033[44m%s 生产了 %s\033[0m' % (os.getpid(), res)) q.join() # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。 if __name__ == '__main__': q = JoinableQueue() # 生产者们:即厨师们 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('骨头', q)) p3 = Process(target=producer, args=('泔水', q)) # 消费者们:即吃货们 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True # 开始 p_l = [p1, p2, p3, c1, c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') # 主进程等--->p1,p2,p3等---->c1,c2 # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据 # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
二、管道(multiprocess.Pipe)
持续更新。。。
这篇关于多进程的通信方式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-02Java管理系统项目实战入门教程
- 2024-11-02Java监控系统项目实战教程
- 2024-11-02Java就业项目项目实战:从入门到初级工程师的必备技能
- 2024-11-02Java全端项目实战入门教程
- 2024-11-02Java全栈项目实战:从入门到初级应用
- 2024-11-02Java日志系统项目实战:初学者完全指南
- 2024-11-02Java微服务系统项目实战入门教程
- 2024-11-02Java微服务项目实战:新手入门指南
- 2024-11-02Java项目实战:新手入门教程
- 2024-11-02Java小程序项目实战:从入门到简单应用