Python schedule 库定时任务
2021/11/3 17:39:50
本文主要是介绍Python schedule 库定时任务,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Python schedule 库定时任务
schedule的使用
# 用于scrapy定时任务设置 import schedule import time def job(): print("Do Jod", time.time()) schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).day.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job)
schedule在scrapy的应用
import subprocess, schedule, time, datetime, logging from multiprocessing import Process from scrapy import cmdline def crawl_work(): print("I'm working...") ## cmd = "scrapy crawl NanPing" ## subprocess.Popen(cmd) ## run(cmd, shell=True) ## pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout ## print(pipe.read()) print('-'*100) args = ["scrapy", "crawl", 'it'] while True: start = time.time() p = Process(target=cmdline.execute, args=(args,)) p.start() p.join() logging.debug("### use time: %s" % (time.time() - start)) if __name__=='__main__': print('*'*10+'开始执行定时爬虫'+'*'*10) schedule.every(1).minutes.do(crawl_work) print('当前时间为{}'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) print('*' * 10 + '定时爬虫开始运行' + '*' * 10) while True: schedule.run_pending() time.sleep(10)
Sqlalchemy连接postgresql
import os from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker # postgresql+psycopg2://username:password@host/dbname engine = create_engine("postgresql+psycopg2://postgres:88595073@localhost/lecture3") db = scoped_session(sessionmaker(bind=engine)) def main(): result = db.execute("SELECT * FROM table_name").fetchall() for r in result: print(r) if __name__ == "__main__": main()
Scrapy通过连接池连接mysql工具
import pymysql import traceback from DBUtils.PooledDB import PooledDB from scrapy.utils.project import get_project_settings class MySqlUtil(object): # 获取scrapy中settings的信息 settings = get_project_settings() config = { "host": settings.get("MYSQL_HOST"), "port": settings.get("MYSQL_PORT"), "database": settings.get("MYSQL_DATABASE"), "user": settings.get("MYSQL_USER"), "password": settings.get("MYSQL_PASSWORD"), "charset": settings.get("MYSQL_CHARSET") } """ MYSQL数据库对象,负责产生数据库连接 """ # 连接池对象 __pool = None def __init__(self): self._conn = MysqlUtil.get_conn() self._cursor = self._conn.cursor() @staticmethod def get_conn(): """ @summary: 静态方法,从连接池中取出连接 @return: MySQLdb.connection """ if MysqlUtil.__pool is None: __pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host=MysqlUtil.config['host'], port=MysqlUtil.config['port'], user=MysqlUtil.config['user'], passwd=MysqlUtil.config['password'], db=MysqlUtil.config['database'], charset=MysqlUtil.config['charset'])) return __pool.connection() def get_all(self, sql, param=None): """ @summary: 执行查询, 并返回所有结果集 @param sql: 查询sql,如果有查询条件,请指定参数列表,并使用[param]传入 @param param: 可选参数,条件列表值(元组/列表) @return: result list(字典对象)/boolean 查询到的结果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchall() else: result = False return result except Exception as e: traceback.print_exc(e) def get_one(self, sql, param=None): """ @summary: 执行查询, 并返回所有结果集 @param sql: 查询sql,如果有查询条件,请指定参数列表,并使用[param]传入 @param param: 可选参数,条件列表值(元组/列表) @return: result list/boolean 查询到的结果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchone() else: result = False return result except Exception as e: traceback.print_exc(e) def get_count(self, sql, param=None): """ @summary: 执行查询, 并返回所有结果集 @param sql: 查询sql,如果有查询条件,请指定参数列表,并使用[param]传入 @param param: 可选参数,条件列表值(元组/列表) @return: result list/boolean 查询到的结果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) return count except Exception as e: traceback.print_exc(e) def get_many(self, sql, num, param=None): """ @summary: 执行查询,并取出num条结果 @param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param num:取得的结果条数 @param param: 可选参数,条件列表值(元组/列表) @return: result list/boolean 查询到的结果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchmany(num) else: result = False return result except Exception as e: traceback.print_exc(e) def insert_one(self, sql, value): """ @summary: 向数据表插入一条记录 @param sql:要插入的sql格式 @param value:要插入的记录数据tuple/list @return: insertId 受影响的行数 """ try: row_count = self._cursor.execute(sql, value) return row_count except Exception as e: traceback.print_exc(e) self.end("rollback") def insert_many(self, sql, values): """ @summary: 向数据表插入多条记录 @param sql:要插入的sql格式 @param values:要插入的记录数据tuple(tuple)/list[list] @return: count 受影响的行数 """ try: row_count = self._cursor.executemany(sql, values) return row_count except Exception as e: traceback.print_exc(e) self.end("rollback") def __query(self, sql, param=None): try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) return count except Exception as e: traceback.print_exc(e) def update(self, sql, param=None): """ @summary: 更新数据表记录 @param sql: sql格式及条件,使用(%s,%s) @param param: 要更新的 值 tuple/list @return: count 受影响的行数 """ return self.__query(sql, param) def delete(self, sql, param=None): """ @summary: 删除数据表记录 @param sql: sql格式及条件,使用(%s,%s) @param param: 要删除的条件 值 tuple/list @return: count 受影响的行数 """ return self.__query(sql, param) def begin(self): """ @summary: 开启事务 """ self._conn.autocommit(0) def end(self, option='commit'): """ @summary: 结束事务 """ if option == 'commit': self._conn.commit() else: self._conn.rollback() def dispose(self, is_end=1): """ @summary: 释放连接池资源 """ if is_end == 1: self.end('commit') else: self.end('rollback') self._cursor.close() self._conn.close()
# 调用 pipeline from torrentSpider.utils.db_util import MysqlUtil import traceback import logging class MySqlPipeline(object): pool = None def __init__(self): pass # 开启爬虫时链接数据库 def open_spider(self, spider): self.pool = MysqlUtil() # 处理 def process_item(self, item, spider): try: # 执行sql语句 # sql = "select * from torrent_ye" # count = self.pool.get_all(sql, None) # print('查询数量为:' + str(count)) # 先去数据库查询,查到了就不入库了 sql_select = """select count(1) from torrent_ye where torrent_url = %(torrent_url)s""" params_select = {'torrent_url': item['torrent_url']} flag = self.pool.get_count(sql_select, params_select) if flag > 0: logging.info('记录已经存在:[%s][%s]', item['torrent_title'], item['torrent_url']) return sql_insert = """insert into torrent_ye(torrent_title, torrent_name, torrent_director, torrent_actor, torrent_language, torrent_type, torrent_region, torrent_update_time, torrent_status, torrent_show_time, torrent_introduction, torrent_url) values (%(torrent_title)s,%(torrent_name)s,%(torrent_director)s,%(torrent_actor)s,%(torrent_language)s, %(torrent_type)s,%(torrent_region)s,%(torrent_update_time)s,%(torrent_status)s,%(torrent_show_time)s,%(torrent_introduction)s,%(torrent_url)s)""" params = {'torrent_title': item['torrent_title'], 'torrent_name': item['torrent_name'], 'torrent_director': item['torrent_director'], 'torrent_actor': item['torrent_actor'], 'torrent_language': item['torrent_language'], 'torrent_type': item['torrent_type'], 'torrent_region': item['torrent_region'], 'torrent_update_time': item['torrent_update_time'], 'torrent_status': item['torrent_status'], 'torrent_show_time': item['torrent_show_time'], 'torrent_introduction': item['torrent_introduction'], 'torrent_url': item['torrent_url']} self.pool.insert_one(sql_insert, params) self.pool.end("commit") except Exception as e: logging.error('发生异常:[%s]', e) traceback.print_exc(e) self.pool.end("rollback") # 结束 def close_spider(self, spider): pass
这篇关于Python schedule 库定时任务的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-03用FastAPI掌握Python异步IO:轻松实现高并发网络请求处理
- 2025-01-02封装学习:Python面向对象编程基础教程
- 2024-12-28Python编程基础教程
- 2024-12-27Python编程入门指南
- 2024-12-27Python编程基础
- 2024-12-27Python编程基础教程
- 2024-12-27Python编程基础指南
- 2024-12-24Python编程入门指南
- 2024-12-24Python编程基础入门
- 2024-12-24Python编程基础:变量与数据类型