python ctrl-c 无法终止 multiprocessing pool
2021/9/19 17:08:29
本文主要是介绍python ctrl-c 无法终止 multiprocessing pool,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
python ctrl-c 无法终止 multiprocessing pool
github pages: perfectnewer.gitub.io
gitee pages: perfectnewer.gitee.io
文章目录
- 前言
- 问题1:在multiprocessing中ctrl-c 无法终止运行
- 先说解决方案:
- 参考链接
- 此处贴上stack overflow的代码
- 此处放上我的代码逻辑
- 根本原因:`Condition.wait`
- 问题2:pool中放入了过多的待处理任务
- 问题3:内存泄漏
前言
最近对一个线上脚本进行优化,因为时间紧逻辑比较多,后面还有一堆事情,所以决定先改成多进程的方式。此处记录一下遇到的问题。
问题1:在multiprocessing中ctrl-c 无法终止运行
一开始我是质疑这些答案的,毕竟我的代码可以正常接收ctrl-c,但是最后发现本质原因是一样的,方案也是通用。那么对根因的探索祥见另外一篇python2 multiprocess pool源码简单解读
先说解决方案:
参考链接
- multiprocessing - Catch Ctrl+C / SIGINT and exit multiprocesses gracefully in python - Stack Overflow
- Keyboard Interrupts with python’s multiprocessing Pool - Stack Overflow
- Python 中 Ctrl+C 不能终止 Multiprocessing Pool 的解决方案
此处贴上stack overflow的代码
注意:
- 除了resul .get会block以外,pool.join也会block信号
- 我的代码和他的不一样,我的脚本是长期运行的。我是能收到信号的。但是整个进程也没有退出,原因是worker异常退出导致。虽然原因不同,但是这个方案是可行的。因为worker不会因为ctrl-c异常退出了
#!/bin/env python from __future__ import print_function import multiprocessing import os import signal import time def run_worker(delay): print("In a worker process", os.getpid()) time.sleep(delay) def main(): print("Initializng 2 workers") original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) pool = multiprocessing.Pool(2) signal.signal(signal.SIGINT, original_sigint_handler) try: print("Starting 2 jobs of 5 seconds each") res = pool.map_async(run_worker, [5, 5]) print("Waiting for results") res.get(60) # Without the timeout this blocking call ignores all signals. except KeyboardInterrupt: print("Caught KeyboardInterrupt, terminating workers") pool.terminate() else: print("Normal termination") pool.close() pool.join() if __name__ == "__main__": main()
此处放上我的代码逻辑
def mgr(): import multiprocessing multiprocessing.log_to_stderr() logger.info("start") origin_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) pool = Pool(3) stopper = Stopper() signal.signal(signal.SIGINT, origin_handler) signal.signal(signal.SIGTERM, functools.partial(stop, stopper)) idx = 1 working_process = [] while not stopper.is_stop(): try: v = r.blpop("test", 120) ... logger.info(v) rst = pool.apply_async(worker, args=(idx, )) working_process.append(rst) except KeyboardInterrupt: logger.info("bk error ======== parent {}".format(os.getpid())) break except Exception: pass while len(working_process) >= 3: logger.info("wait exit") readys = [] for idx, rst in enumerate(working_process): rdy = rst.ready() print('1: ', idx, ' ', rdy) if rdy: readys.append(idx) if not readys: time.sleep(10) else: for idx in readys: working_process[idx] = None working_process = filter(lambda x: x is not None, working_process) idx += 1 logger.info("exit") pool.close() pool.join()
根本原因:Condition.wait
根本原因其实取决于你的具体代码逻辑。目前我遇到的有两个
- 包
threading.Condition
执行condition.wait()
的时候,会阻止信号的处理。这个是python2.7的bug,并且不会改变这个行为。在python3中已经修复。详见:threading.Condition.wait() is not interruptible in Python 2.7 - pool worker异常退出(使用Exception无法捕捉的异常),导致result没有被设置。pool一直等待worker的任务完成
问题2:pool中放入了过多的待处理任务
这个问题的原因是对进程池的理解不到位。multiprocessing.Pool
只是限制了执行任务的进程的个数,并没有限制用户放入的任务数量。虽然它保证只有规定个数的进程运行。但是你可以一直往pool里添加task。
解决方案就是自己需要对处理的任务数量统计计数。code
问题3:内存泄漏
这个一开始就能预料到了。这个主要是因为我们业务代码有进程缓存,只要定期清理就可以了。那么方案也有两个
- 自己定期清理
- multiprocessing有个maxtasksperchild参数,表示一个进程多少次任务后就销毁,重新创建新的进程
这篇关于python ctrl-c 无法终止 multiprocessing pool的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-27使用python 将ETH账户的资产打散
- 2024-09-26Python编程基础
- 2024-09-2610 种方法写出更好的 Python 代码
- 2024-09-25Python编程基础详解
- 2024-09-25Python编程入门教程
- 2024-09-25从零开始使用Python构建LLaMA 3
- 2024-09-23Python中理解和使用树形结构的简单教程
- 2024-09-23Python 编程基础入门
- 2024-09-18初探Python股票自动化交易:入门指南
- 2024-09-18Python量化入门:轻松掌握量化分析基础与实战