Python使用signal定时结束AsyncIOScheduler任务
2021/7/26 12:05:39
本文主要是介绍Python使用signal定时结束AsyncIOScheduler任务,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题
- 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)
- 如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了)
原调试代码如下:
from datetime import datetime, timedelta import aiohttp from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore async def get(session): url = 'https://httpbin.org/get?a=1' async with session.get(url) as res: print('get', res.status) return await res.text() async def post(session): url = 'https://httpbin.org/post?b=2' async with session.post(url) as res: print('post', res.status) return await res.text() async def main(): async with aiohttp.ClientSession() as session: await get(session) await post(session) if __name__ == '__main__': jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')} scheduler = AsyncIOScheduler(jobstores=jobstores) for i in range(10): # 添加10个任务 job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10)) scheduler.start()
Google后发现AsyncIOScheduler的使用需要在scheduler启动后,需要自己调用asyncio.get_event_loop().run_forever()
来启动协程任务。
但是一旦run_forever()则就会阻塞至死。除非有KeyboardInterrupt, SystemExit等异常或者强杀来停止其运行。
此时想到使用Python的signal来定时发送信号,修改后程序如下,可以正常延迟停止(感觉有点像模拟Go的defer)。
# -*- coding: utf-8 -*- """ @Time : 2021/7/23 @Auth : hanzhichao @Desc: """ from datetime import datetime, timedelta import signal import asyncio import aiohttp from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore async def get(session): url = 'https://httpbin.org/get?a=1' async with session.get(url) as res: print('get', res.status) return await res.text() async def post(session): url = 'https://httpbin.org/post?b=2' async with session.post(url) as res: print('post', res.status) return await res.text() async def main(): async with aiohttp.ClientSession() as session: await get(session) await post(session) if __name__ == '__main__': jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')} scheduler = AsyncIOScheduler(jobstores=jobstores) for i in range(10): # 添加10个任务 job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10)) scheduler.start() signal.alarm(20) # 20秒后终止程序 asyncio.get_event_loop().run_forever() # 永远运行
这篇关于Python使用signal定时结束AsyncIOScheduler任务的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-08有遇到过吗?同样的规则 Excel 中 比Python 结果大
- 2024-03-30开始python成长之路
- 2024-03-29python optparse
- 2024-03-29python map 函数
- 2024-03-20invalid format specifier python
- 2024-03-18pool.map python
- 2024-03-18threads in python
- 2024-03-14python Ai 应用开发基础训练,字符串,字典,文件
- 2024-03-13id3 algorithm python
- 2024-03-13sum array elements python