多进程的通信方式

2021/10/21 7:09:35

本文主要是介绍多进程的通信方式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

多进程的通信

  

一、队列(Queue)

  

'''
一、队列(Queue)
    Queue.qsize()	返回当前队列包含的消息数量
    Queue.empty()	如果队列为空,返回 True,否则返回 False
    Queue.full()	如果队列满了,返回 True,否则返回 False
    Queue.get([block[, timeout]])	获取队列中的一条消息,然后将其从队列中移除,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout(单位:秒),消息队列为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止,如果设置了 timeout,则会等待 timeout 秒,若还没有读取到任何消息,则抛出 Queue.Empty 异常
    Queue.get_nowait()	相当于 Queue.get(False)
    Queue.put(item, [block[, timeout]])	将 item 消息写入队列,block 默认值为 True。如果 block 使用默认值,且没有设置 timeout(单位:秒),消息队列如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置了 timeout,则会等待 timeout 秒,若还没有空间,则抛出 Queue.full 异常
    Queue.put_nowait(item)	相当于 Queue.put(item, False)
'''
from multiprocessing import Queue

if __name__ == '__main__':
    q = Queue(3)    # 初始化一个 Queue 对象,最多可接收 3 条 put 消息
    q.put("消息1")
    q.put("消息2")
    print(q.full())
    q.put("消息3")
    print(q.full())

    # 因为消息队列已满,再 put 会报异常,第一个 try 等待 2 秒后再抛出异常,第二个 try 立刻抛出
    try:
        q.put("消息4", True, 2)
    except:
        print("消息队列已满,现有消息数量: %s" % q.qsize())

    try:
        q.put_nowait("消息4")
    except:
        print("消息队列已满,现有消息数量: %s" % q.qsize())

    # 读取消息时,先判断消息队列是否为空,再读取
    if not q.empty():
        print("----从消息队列中获取消息--")
        for i in range(q.qsize()):
            print(q.get_nowait())


# False
# True
# 消息队列已满,现有消息数量: 3
# 消息队列已满,现有消息数量: 3
# ----从消息队列中获取消息--
# 消息1
# 消息2
# 消息3

'''
下面通过一个实例结合 Process 和 Queue 实现进程间通信。创建两个子进程,一个子进程负责向队列中写入数据,另一个子进程负责从队列中读取数据。
生产者--消费者模型
'''
from multiprocessing import Process, Queue
import time

# 向对列中写入数据
def write_task(q):
    if not q.full():
        for i in range(5):
            message = "消息" + str(i)
            q.put(message)
            print("写入: %s" % message)

# 从队列读取数据
def read_task(q):
    time.sleep(1)
    while not q.empty():
        print("读取: %s" % q.get(True, 2))    # 等待 2 秒,如果还没有读取到任何消息,则抛出异常

if __name__ == '__main__':
    print("---父进程开始---")
    q = Queue()     # 父进程创建 Queue,并传递给子进程
    pw = Process(target=write_task, args=(q,))
    pr = Process(target=read_task, args=(q,))
    pw.start()
    pr.start()

    print("---等待子进程结束---")
    pw.join()
    pr.join()
    print("---父进程结束---")


# ---父进程开始---
# ---等待子进程结束---
# 写入: 消息0
# 写入: 消息1
# 写入: 消息2
# 写入: 消息3
# 写入: 消息4
# 读取: 消息0
# 读取: 消息1
# 读取: 消息2
# 读取: 消息3
# 读取: 消息4
# ---父进程结束---

  二、可连接的共享队列(multiprocessing.JoinableQueue())

 

  

#encoding=utf-8


'''
multiprocessing.JoinableQueue()
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
q = JoinableQueue()
q.join()
q.task_done
'''
from multiprocessing import Process, JoinableQueue
import time, random, os


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
        q.task_done()  # 向q.join()发送一次信号,证明一个数据已经被取走了


def producer(name, q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' % (os.getpid(), res))
    q.join()  # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。


if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨头', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    # 开始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主')

    # 主进程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

  

 

 

 

 

 

 

二、管道(multiprocess.Pipe)

    

 

  

 

 

 

 持续更新。。。

 



这篇关于多进程的通信方式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程