积累--Python常用工具函数
2022/4/11 20:12:40
本文主要是介绍积累--Python常用工具函数,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1. 随机生成6位验证码
import random def get_verify_code(n=6,alpha=False): ''' n: 验证码位数 alpha:是否需要字母验证码 ''' s = '' # 创建字符串变量,存储生成的验证码 for i in range(n): # 通过for循环控制验证码位数 num = random.randint(0,9) # 生成随机数字0-9 if alpha: # 需要字母验证码,关键字alpha=True upper_alpha = chr(random.randint(65,90)) lower_alpha = chr(random.randint(97,122)) num = random.choice([num,upper_alpha,lower_alpha]) s = s + str(num) return s
2. 获取上个月最后一天的日期
# 1 import datetime def last_day_of_month(any_day): next_month = any_day.replace(day=28) + datetime.timedelta(days=4) # this will never fail return next_month - datetime.timedelta(days=next_month.day) print last_day_of_month(datetime.date.today()) # 2 import calendar # 导入库 import datetime date = datetime.datetime.now() print(date.year, date.month) # print(year, month) lastDay = calendar.monthrange(date.year, date.month - 1)[1] # 指定年月的最后一天,即指定年月的整月总天数 print(lastDay)
3. 代理IP获取
import requests import time headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36' } def ip_text(li, valid_IP): try: url = "https://www.baidu.com//" for info in li: ip = info['ip'] print(f'开始校验IP地址:{ip}') try: port = info['port'] protocol = info['protocol'] rep = requests.get(url, proxies={protocol: f'{protocol}://{ip}:{port}'}, headers=headers, timeout=0.5) if rep.status_code == 200: # 如果放回的状态码是200,那么说明该IP地址可用 valid_IP.append(f'{protocol}://{ip}:{port}') print(f"该代理IP有效:{ip}, 响应时间为:{rep.elapsed.total_seconds()}") else: print("该代理IP无效:" + ip) except: print("该代理IP无效:" + ip) except: print("IP测试失败") def get_proxy(): url = 'https://ip.jiangxianli.com/api/proxy_ips' params = { 'page': 1 } validIp = [] while 1: data = requests.get(url=url, params=params, headers=headers).json() if not data['data']['data']: break info = data['data']['data'] ip_text(info, validIp) params['page'] += 1 time.sleep(2) break print(validIp) if __name__ == '__main__': get_proxy()
4. Redis延迟队列的实现
class RedisDelayQueue(object): """Simple Queue with Redis Backend dq = RedisDelayQueue('delay:commtrans') dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)}) print(dq.get()) """ def __init__(self, name, namespace='queue'): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db = get_redis_engine(database_name='spdb') self.key = '%s:%s' % (namespace, name) def qsize(self): """Return the approximate size of the queue.""" return self.__db.zcard(self.key) def empty(self): """Return True if the queue is empty, False otherwise.""" return self.qsize() == 0 def rem(self, value): return self.__db.zrem(self.key, value) def get(self): # 获取任务,以0和当前时间为区间,返回一条记录 items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1) if items: item = items[0] if self.rem(item): # 解决并发问题 如能删就让谁取走 return json.loads(item) return None def put(self, interval, item): """:param interval 延时秒数""" # 以时间作为score,对任务队列按时间戳从小到大排序 """Put item into the queue.""" d = json.dumps(item) return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
5. 后端代码sh文件部署
#!/bin/sh cd /root/www/assist_api_new echo "进行 git pull 更新,时间可能会比较长,请稍等一会,不要退出或者停止当前脚本" git pull echo "restart api engine " supervisorctl restart assist_api_new
6. 前端Vue代码sh文件部署
#!/bin/sh cd /root/sources/assist/front_new/ echo "进行 git pull 更新,时间可能会比较长,请稍等一会,不要退出或者停止当前脚本" git pull echo 'front_new ====================> npm build' /usr/local/node/bin/npm run build >> service_deploy.log && rm -rf /root/www/assist_front_new/* && \cp -rf ./dist/* /root/www/assist_front_new/
7. 自动安装对应Chrome版本的Webdriver
from selenium import webdriver from webdriver_manager.chrome import ChromeDriverManager driver = webdriver.Chrome(ChromeDriverManager().install())
8. sqlAchemy 实现分页器
from math import ceil class Pagination: def __init__(self, query, per_page: int = 5, page: int = 1): """ 初始化分页参数 :param query: 查询对象 :param per_page: 一页多少内容 :param page: 第几页 1起 """ self.query = query self.per_page = per_page self.page = page @property def items(self): """ 得到分页后的内容 :return: [model row / Model] """ if self.page > self.pages: return [] offset_num = (self.page - 1) * self.per_page return self.query.limit(self.per_page).offset(offset_num).all() @property def counts(self): """ 总数据量 :return: int """ return self.query.count() @property def pages(self): """ 总页数 :return: int """ return ceil(self.counts / self.per_page) @property def next_num(self): """下一页""" next_num = self.page + 1 if self.pages < next_num: return None return next_num @property def prev_num(self): """上一页""" prev_num = self.page - 1 if prev_num < 1: return None return prev_num def iter_pages(self, left=2, right=2): length = left + right + 1 # 页数大于 if self.page > self.pages: range_start = self.pages - length if range_start <= 0: range_start = 1 return range(range_start, self.pages + 1) # 页数小于最少分页数 if self.pages < length: return range(1, self.pages + 1) # 页数正常的情况下,至少大于 length 长度 l_boundary, r_boundary = left + 1, self.pages - right + 1 if l_boundary < self.page < r_boundary: return range(self.page - left, self.page + right + 1) if self.page <= left: return range(1, length + 1) return range(self.pages - length, self.pages + 1) if __name__ == '__main__': per_page = 1 pages = 4 paginate = Pagination(query=[1, 2, 3, 4, 5, 6, 7, 8], per_page=per_page, page=pages) ret = paginate.iter_pages() # 显示在前端的页码列表 print(ret) # paginate.items # 分页后的数据 [] # paginate.pages # 共xxx页 # paginate.page # 当前页码 从1开始 # paginate.per_page # 一页几行 # paginate.prev_num # 上一页页码 # paginate.next_num # 下一页页码
9. 企业微信机器人
from copy import copy import os import requests import json import time import datetime from urllib3 import encode_multipart_formdata class Robot: def __init__(self, wx_url, key): self.wx_url = wx_url self.key = key self.wx_upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file' def upload_file(self, file_path): file_name = file_path.split("/")[-1] with open(file_path, 'rb') as f: length = os.path.getsize(file_path) data = f.read() headers = {"Content-Type": "application/octet-stream"} params = { "filename": file_name, "filelength": length, } file_data = copy(params) file_data['file'] = (file_path.split('/')[-1:][0], data) encode_data = encode_multipart_formdata(file_data) file_data = encode_data[0] headers['Content-Type'] = encode_data[1] r = requests.post(self.wx_upload_url, data=file_data, headers=headers) print(r.text) media_id = r.json()['media_id'] return media_id def qi_ye_wei_xin_file(self, media_id): headers = {"Content-Type": "text/plain"} data = { "msgtype": "file", "file": { "media_id": media_id } } r = requests.post( url=self.wx_url, headers=headers, json=data) print(r.text) def qi_ye_wei_xin_text(self, content): content = str(content, encoding='utf-8') headers = {"Content-Type": "application/json"} data = { "msgtype": 'text', "text": { "content": content, "mentioned_mobile_list": '' }, } r = requests.post(self.wx_url, headers=headers, data=json.dumps(data, ensure_ascii=False)) print(r.text) if __name__ == '__main__': url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8f064ec6-b9b9-4a37-a2de-b251ea81207b' key = '8f064ec6-b9b9-4a37-a2de-b251ea81207b' robot = Robot(url, key)
10. sha256withRSA验签
from base64 import decodebytes, encodebytes from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import MD5, SHA1, SHA256 import logging import json sha_rsa_log = logging.getLogger('sha_rsa_log') str2 = "{\"测试字段\":\"test\"}" str1 = "NpA0IaNbGXe4BlTX4VPfGqBPlJYRkXHGR6D7MWYdlOfC5UI35KGfBW6eSVVh619nW0UFAsAz9XDYGRjPFFctTlbb+yuNkQOLFqcjlUI9X/AOSyNWhNXRUeByPkNbzixtXPPlAg6mr4DBGVb42H6QzhCb2lfS7U3xchk1TL1r15A=" publicKey = """MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDr+PnAY5TwEDl4cCay3J/IPH74 rlUpn0zSB7yFfx1lvo21wEr5F8psnv6eDfthbGVbGhmDcW7HvCja9hEQ+0t4n6vf mzOhFMOBCkXFeWH5GjCDgLqIUQn3YXUHGsG0A11sO0L1cGVIyIJixZThHrB8XKvR Be6mAJHgwuv1P8ZwVwIDAQAB""" PUBLICKEY_TEST = """MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDYVSjr6IU8u7W6PuRUb05ONZJj sJ1lNI9ofcujYehjAO4mOOrWBIMZkadhej0HdtErj3b3c11HHbshPiXcbKoHbfeF qvsLCo7Cg6ZjhMsY1/nkR1+x+LZU2MBygW9h/hnKK8hGluIAGiJVbiQGuFxa9V48 z74KTXFbZhGkfAxl+QIDAQAB""" def verify(data, signature, public_key): """ Sha256 with RSA 验签 """ key = RSA.importKey(decodebytes(public_key.encode())) hash_value = SHA256.new(data.encode()) verifier = PKCS1_v1_5.new(key) if verifier.verify(hash_value, decodebytes(signature.encode())): sha_rsa_log.info('The signature is authentic.') print(True) return True else: sha_rsa_log.info('The signature is not authentic.') return False if __name__ == '__main__': verify(str2, str1, publicKey)
这篇关于积累--Python常用工具函数的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-03用FastAPI掌握Python异步IO:轻松实现高并发网络请求处理
- 2025-01-02封装学习:Python面向对象编程基础教程
- 2024-12-28Python编程基础教程
- 2024-12-27Python编程入门指南
- 2024-12-27Python编程基础
- 2024-12-27Python编程基础教程
- 2024-12-27Python编程基础指南
- 2024-12-24Python编程入门指南
- 2024-12-24Python编程基础入门
- 2024-12-24Python编程基础:变量与数据类型