40. 在线程间进行事件通知
2021/4/25 10:29:15
本文主要是介绍40. 在线程间进行事件通知,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
在之前通过使用多个DownloadThread线程进行下载(I/O)及使用一个ConvertThread线程进行转换(CPU),我们达到了多线程下载csv数据并转换为xml文件的目的。但现在有额外的要求:
实现一个打包线程TarThread,将转换出的xml文件压缩打包。例如转换线程每生产出5个xml文件,就通知打包线程将它们打包成一个xxx.tgz文件,并删除xml文件。打包完成后,打包线程反过来通知转换进程,转换进程继续转换。
解决方案:
线程间的事件通知,可以使用标准库中Threading.Event
类:
等待事件,一端调用
wait()
方法;通知事件,一端调用
set()
方法。
- 对于
threading.Event
类:
class threading.Event
实现事件对象的类。事件对象管理一个内部标志,调用set()
方法可将其设置为true。调用clear()
方法可将其设置为false。调用wait()
方法将进入阻塞直到标志为true。这个标志初始时为false。
threading.Event
类有以下方法:
is_set()
当且仅当内部标志为True时返回True。
set()
将内部标志设置为True。所有正在等待这个事件的线程将被唤醒。当标志为True时,调用wait()方法的线程不会被被阻塞。
clear()
将内部标志设置为Talse。之后调用wait()方法的线程将会被阻塞,直到调用set()方法将内部标志再次设置为true。
wait(timeout=None)
阻塞线程直到内部变量为True。如果调用时内部标志为True,将立即返回。否则将阻塞线程,直到调用set()方法将标志设置为True或者发生可选的超时。 当提供了timeout参数且不是None时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。 当内部标志在调用wait进入阻塞后被设置为True,或者调用wait时已经被设置为True时,方法返回True。也就是说,除非设定了超时且发生了超时的情况下将会返回False,其他情况该方法都将返回True。
- 方案示例:
import requestsimport base64import csvimport timeimport osimport tarfilefrom io import StringIOfrom xml.etree.ElementTree import ElementTree, Element, SubElementfrom threading import Thread,Eventfrom queue import Queue USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'PASSWORD = b'aff978c42479491f9541ace709081b99'class DownloadThread(Thread): def __init__(self, page_number, queue): super().__init__() self.page_number = page_number self.queue = queue def run(self): # IO csv_file = None while not csv_file: csv_file = self.download_csv(self.page_number) self.queue.put((self.page_number, csv_file)) #存数据到队列中 def download_csv(self, page_number): print('download csv data [page=%s]' % page_number) url = "https://api.intrinio.com/price.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s" % page_number auth = b'Basic' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD)) headers = {'Authorization' : auth} response = requests.get(url, headers=headers) if response.ok: return StringIO(response.text)class ConvertThread(Thread): def __init__(self, queue, c_event, t_event): super().__init__() self.queue = queue self.c_event = c_event self.t_event = t_event def run(self): count = 0 while True: page_number, csv_file = self.queue.get() #从队列中取出数据 if page_number == -1: self.c_event.set() self.c_event.wait() #当最后不足5个时也打包 break count += 1 self.csv_to_xml(csv_file, 'data%s.xml' % page_number) if count == 5: # 通知转换完成 self.c_event.set() #等待打包完成 self.c_event.wait() self.c_event.clear() count = 0 def csv_to_xml(self, csv_file, xml_path): print('Convert csv data to %s' % xml_path) reader = csv.reader(csv_file) headers = next(reader) root = Element('Data') root.text = '\n\t' root.tail = '\n' for row in reader: book = SubElement(root, 'Row') book.text = '\n\t\t' book.tail = '\n\t' for tag, text in zip(headers, row): e = SubElement(book, tag) e.text = text e.tail = '\n\t\t' e.tail = '\n\t' book.tail = '\n' ElementTree(root).write(xml_path, encoding='utf8')class TarThread(Thread): def __init__(self, c_event, t_event): super().__init__(daemon=True) #守护线程 self.count = 0 self.c_event = c_event self.t_event = t_event def run(self): while True: # 等待转换完成 self.c_event.wait() self.c_event.clear() # 打包 self.tar_xml() # 通知打包完成 self.c_event.set() def tar_xml(self): self.count += 1 tfname = 'data%s.tgz' % self.count print('tar %s...' % tfname) tf = tarfile.open(tfname, 'w:gz') #tar压缩 for fname in os.listdir('.'): if fname.endswith('.xml'): tf.add(fname) os.remove(fname) tf.close() if not tf.members: os.remove(tfname)if __name__ == '__main__': queue = Queue() c_event = Event() t_event = Event() t0 = time.time() thread_list = [] for i in range(1, 11): t = DownloadThread(i, queue) t.start() #启动下载线程 thread_list.append(t) convert_thread = ConvertThread(queue, c_event, t_event) convert_thread.start() #启动转换线程 tar_thread = TarThread(c_event, t_event) tar_thread.start() #启动打包线程 for t in thread_list: t.join() #阻塞线程,主线程等待所有子线程结束 # 通知Convert线程退出 queue.put((-1, None)) #将page_number置为-1 # 等待转换线程结束 convert_thread.join() print(time.time() - t0) print('main thread end.')
在线程间进行事件通知,目的就是线程间同步。
这篇关于40. 在线程间进行事件通知的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-22项目:远程温湿度检测系统
- 2024-12-21《鸿蒙HarmonyOS应用开发从入门到精通(第2版)》简介
- 2024-12-21后台管理系统开发教程:新手入门全指南
- 2024-12-21后台开发教程:新手入门及实战指南
- 2024-12-21后台综合解决方案教程:新手入门指南
- 2024-12-21接口模块封装教程:新手必备指南
- 2024-12-21请求动作封装教程:新手必看指南
- 2024-12-21RBAC的权限教程:从入门到实践
- 2024-12-21登录鉴权实战:新手入门教程
- 2024-12-21动态权限实战入门指南