python连接mysql与redis(ssh方式)
2022/8/17 2:55:52
本文主要是介绍python连接mysql与redis(ssh方式),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
python如何连接数据库(SSH方式)
性能测试时,有个支付订单的场景,需要用到已生成的订单code,如何获取订单code?
一,通过Jmeter连接数据库获取。二,直接mysql导出数据
我这里是使用python导出mysql数据,使用的SSH方式:
import pymysql import csv import pandas as pd from sshtunnel import SSHTunnelForwarder # ssh连接库 import base64 orderCode_path = r"D:/t_order.csv" # 保存地址 def mysql_ssh(sql,args=None): with SSHTunnelForwarder( ('SSH_IP', SSH_端口号), ssh_password='SSH密码', ssh_username='SSH用户名', local_bind_address=('127.0.0.1', 22), # 本地绑定端口 remote_bind_address=('远程数据库地址', 3306)) as server: print('SSH连接成功') conn = pymysql.connect(host='127.0.0.1', port=22, user='数据库名', password='数据库登录密码', database='数据库登录账号', charset='utf8') print('mysql数据库连接成功') cursor = conn.cursor() print('游标获取成功') try: print(f'执行查询语句:{sql} 参数:{args}') cursor.execute(sql,args) print('数据查询成功') conn.commit() print('事务提交成功') datas = cursor.fetchall() success = True except: print('数据查询失败') datas = None success = False print('正在关闭数据库连接') cursor.close() conn.close() return datas, success def execute_sql(sql): try: result = [] datas = mysql_ssh(sql) if datas[-1]: print(f"共找到{len(datas[0])}个数据") result = list(datas[0]) try: datas_write(result) except Exception as error: print(error) else: result.clear() else: print('数据查询失败') except Exception as e: print(e) def datas_write(a): dataframe = pd.DataFrame(a) dataframe.to_csv(orderCode_path, mode= 'w', index=False, sep=',',header=["orderCode", "userId"]) print("写入成功!") if __name__ == "__main__": sql = r"SELECT order_code,user_id FROM `t_order`;" # 导出所有的订单code execute_sql(sql)
还遇到一个需要用户token的场景,用户token是存在redis的,于是通过python获取redis的数据,SSH方式如下:
import redis import pandas as pd from sshtunnel import SSHTunnelForwarder # ssh连接库 token_path = r".\login_token.csv" # redis数据保存路径 server = SSHTunnelForwarder( ssh_address_or_host= ('SSH地址',22), # ssh地址 ssh_username= "SSH用户", # ssh连接的用户名 ssh_password= "SSH用户密码" , remote_bind_address=('远程redis地址', 6379)) def execute_sql(): try: r = redis.Redis(host='127.0.0.1', port=server.local_bind_port, decode_responses=True, password='redis密码', db=0, encoding='gb18030') #encoding可以不加 print("连接成功") keys = r.keys(pattern="*userId*") print(f"共找到{len(keys)}个数据") result = [] for i in keys: try: result.append(r.get(i)) except Exception as e: print("{}不存在".format(i)) result = None except Exception as e: print(e) print(f"共保存{len(result)}个数据") return result def token_write(a): dataframe = pd.DataFrame({'token': a}) dataframe.to_csv(token_path, mode= 'a', index=False, sep=',',header=False) print("写入成功!") if __name__ == "__main__": server.start() token = execute_sql() token_write(token) server.close()
这篇关于python连接mysql与redis(ssh方式)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-04部署MySQL集群项目实战:新手入门教程
- 2024-11-04如何部署MySQL集群资料:新手入门指南
- 2024-11-02MySQL集群项目实战:新手入门指南
- 2024-11-02初学者指南:部署MySQL集群资料
- 2024-11-01部署MySQL集群教程:新手入门指南
- 2024-11-01如何部署MySQL集群:新手入门教程
- 2024-11-01部署MySQL集群学习:新手入门教程
- 2024-11-01部署MySQL集群入门:新手必读指南
- 2024-10-23BinLog入门:新手必读的MySQL二进制日志指南
- 2024-10-23Binlog入门:MySQL数据库的日志管理指南