40. 在线程间进行事件通知

2021/4/25 10:29:15

本文主要是介绍40. 在线程间进行事件通知,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

在之前通过使用多个DownloadThread线程进行下载(I/O)及使用一个ConvertThread线程进行转换(CPU),我们达到了多线程下载csv数据并转换为xml文件的目的。但现在有额外的要求:

实现一个打包线程TarThread,将转换出的xml文件压缩打包。例如转换线程每生产出5个xml文件,就通知打包线程将它们打包成一个xxx.tgz文件,并删除xml文件。打包完成后,打包线程反过来通知转换进程,转换进程继续转换。

解决方案:

线程间的事件通知,可以使用标准库中Threading.Event类:

  1. 等待事件,一端调用wait()方法;

  2. 通知事件,一端调用set()方法。


  • 对于threading.Event类:
class threading.Event

实现事件对象的类。事件对象管理一个内部标志,调用set()方法可将其设置为true。调用clear()方法可将其设置为false。调用wait()方法将进入阻塞直到标志为true。这个标志初始时为false。

threading.Event类有以下方法:

is_set()

当且仅当内部标志为True时返回True。

set()

将内部标志设置为True。所有正在等待这个事件的线程将被唤醒。当标志为True时,调用wait()方法的线程不会被被阻塞。

clear()

将内部标志设置为Talse。之后调用wait()方法的线程将会被阻塞,直到调用set()方法将内部标志再次设置为true。

wait(timeout=None)

阻塞线程直到内部变量为True。如果调用时内部标志为True,将立即返回。否则将阻塞线程,直到调用set()方法将标志设置为True或者发生可选的超时。

当提供了timeout参数且不是None时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。

当内部标志在调用wait进入阻塞后被设置为True,或者调用wait时已经被设置为True时,方法返回True。也就是说,除非设定了超时且发生了超时的情况下将会返回False,其他情况该方法都将返回True。


  • 方案示例:
import requestsimport base64import csvimport timeimport osimport tarfilefrom io import StringIOfrom xml.etree.ElementTree import ElementTree, Element, SubElementfrom threading import Thread,Eventfrom queue import Queue

USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'PASSWORD = b'aff978c42479491f9541ace709081b99'class DownloadThread(Thread):
    def __init__(self, page_number, queue):
        super().__init__()
        self.page_number = page_number
        self.queue = queue    
    def run(self):
        # IO
        csv_file = None
        while not csv_file:
            csv_file = self.download_csv(self.page_number)
        self.queue.put((self.page_number, csv_file))                #存数据到队列中

    def download_csv(self, page_number):
        print('download csv data [page=%s]' % page_number)
        url = "https://api.intrinio.com/price.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s" % page_number
        auth = b'Basic' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD))
        headers = {'Authorization' : auth}
        response = requests.get(url, headers=headers)

        if response.ok:
            return StringIO(response.text)class ConvertThread(Thread):
    def __init__(self, queue, c_event, t_event):
        super().__init__()
        self.queue = queue
        self.c_event = c_event
        self.t_event = t_event    def run(self):
        count = 0
        while True:
            page_number, csv_file = self.queue.get()                #从队列中取出数据
            if page_number == -1:
                self.c_event.set()
                self.c_event.wait()             #当最后不足5个时也打包
                break
            count += 1
            self.csv_to_xml(csv_file, 'data%s.xml' % page_number)

            if count == 5:
                # 通知转换完成
                self.c_event.set()

                #等待打包完成
                self.c_event.wait()
                self.c_event.clear()
                count = 0

    def csv_to_xml(self, csv_file, xml_path):
        print('Convert csv data to %s' % xml_path)
        reader = csv.reader(csv_file)
        headers = next(reader)

        root = Element('Data')
        root.text = '\n\t'
        root.tail = '\n'

        for row in reader:
            book = SubElement(root, 'Row')
            book.text = '\n\t\t'
            book.tail = '\n\t'

            for tag, text in zip(headers, row):
                e = SubElement(book, tag)
                e.text = text
                e.tail = '\n\t\t'
            e.tail = '\n\t'
        book.tail = '\n'

        ElementTree(root).write(xml_path, encoding='utf8')class TarThread(Thread):
    def __init__(self, c_event, t_event):
        super().__init__(daemon=True)               #守护线程
        self.count = 0
        self.c_event = c_event
        self.t_event = t_event    def run(self):
        while True:
            # 等待转换完成
            self.c_event.wait()
            self.c_event.clear()

            # 打包
            self.tar_xml()

            # 通知打包完成
            self.c_event.set()

    def tar_xml(self):
        self.count += 1
        tfname = 'data%s.tgz' % self.count        print('tar %s...' % tfname)
        tf = tarfile.open(tfname, 'w:gz')               #tar压缩
        for fname in os.listdir('.'):
            if fname.endswith('.xml'):
                tf.add(fname)
                os.remove(fname)
        tf.close()

        if not tf.members:
            os.remove(tfname)if __name__ == '__main__':
    queue = Queue()
    c_event = Event()
    t_event = Event()
    t0 = time.time()
    thread_list = []
    for i in range(1, 11):
        t = DownloadThread(i, queue)
        t.start()               #启动下载线程
        thread_list.append(t)

    convert_thread = ConvertThread(queue, c_event, t_event)
    convert_thread.start()              #启动转换线程

    tar_thread = TarThread(c_event, t_event)
    tar_thread.start()              #启动打包线程

    for t in thread_list:
        t.join()                #阻塞线程,主线程等待所有子线程结束

    # 通知Convert线程退出
    queue.put((-1, None))               #将page_number置为-1

    # 等待转换线程结束
    convert_thread.join()

    print(time.time() - t0)
    print('main thread end.')

在线程间进行事件通知,目的就是线程间同步。




这篇关于40. 在线程间进行事件通知的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程