多线程读取mysql中一张2.4T的表并dump为文件
2021/9/17 19:05:50
本文主要是介绍多线程读取mysql中一张2.4T的表并dump为文件,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
多线程读取mysql中一张2.4T的表并dump为文件
最近公司一个mysql集群要关闭,需要将一张表中的数据dump下来,八个节点,每个节点300g,总共2.4t,由于没有服务器密码只能通过脚本的方式将数据dump下来
from multiprocessing.dummy import Pool as TPool import traceback import MySQLdb import pandas as pd from dbutils.pooled_db import PooledDB class LoadFromMysql(): def __init__(self): self.mysql_pool = PooledDB( creator=MySQLdb, mincached=5, maxcached=10, host='******', port=****, user='*****', passwd='*****', charset='utf8' ) self.db = self.mysql_pool.connection() # 将数据库查询出的元组转为df def get_df_from_db(self, sql): cursor = self.db.cursor() cursor.execute(sql) data = cursor.fetchall() columnDes = cursor.description # 获取连接对象的描述信息 columnNames = [columnDes[i][0] for i in range(len(columnDes))] df = pd.DataFrame([list(i) for i in data], columns=columnNames) return df def extract_data(self, wtid, protocolid): sql_pro = ( f'select descrcn from config.propaths where transtype =2 and descrcn regexp "平均风速|最小风速|最大风速|平均有功功率|最小有功功率|最大有功功率|平均无功功率|最小无功功率|最大无功功率" and' ' protocolid = {protocolid} order by offsets ASC'.format(protocolid=protocolid)) pro_df = self.get_df_from_db(sql_pro) if not pro_df.empty: cloums = ['wfid', 'wtid', 'rectime'] for descrcn in pro_df["descrcn"].values: cloums.append(descrcn) # 得到列长度 cloums_size = len(cloums) statisticdata_sql = 'select wfid,wtid,rectime,id0,id1,id2,id3,id4,id5,id6,id7,id8,id9,id10,id11,id12 from dbo.statisticdata where wtid = {wtid}'.format( wtid=wtid) sta_df = self.get_df_from_db(statisticdata_sql) # 抽取需要的列 extract_df = sta_df.iloc[:, 0:cloums_size] # 转换列名 extract_df.columns = cloums return (extract_df, wtid) def write_data(self, df, wtid): # 写出为文件 /data/wff_algo/data_file/ #/home/lijiapeng/projects/mysql_cluster_load/data/ df.to_csv(path_or_buf='E:\\{wtid}.csv'.format(wtid=wtid), sep=str(','), index=False, na_rep='', header=True, encoding="utf_8_sig") def load_from_mysql(self): # 获取所有wtid以及对应的协议id wtinfo_sql = 'select wtid,protocolid from config.wtinfo' wtinfo_df = self.get_df_from_db(wtinfo_sql) pool = TPool(6) for wtid, protocolid in wtinfo_df.values: try: res = pool.apply_async(self.extract_data,(wtid, protocolid)) kw_tup = res.get() if kw_tup: pool.apply_async(self.write_data, (kw_tup[0], kw_tup[1])) #print(f"执行完毕{wtid}".format(wtid=wtid)) except Exception as e: print("写入文件时发生异常,异常wtid为{wtid}".format(wtid=wtid)) traceback.print_exc() pool.close() pool.join() if __name__ == '__main__': load = LoadFromMysql() load.load_from_mysql() #pass
这篇关于多线程读取mysql中一张2.4T的表并dump为文件的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-04部署MySQL集群项目实战:新手入门教程
- 2024-11-04如何部署MySQL集群资料:新手入门指南
- 2024-11-02MySQL集群项目实战:新手入门指南
- 2024-11-02初学者指南:部署MySQL集群资料
- 2024-11-01部署MySQL集群教程:新手入门指南
- 2024-11-01如何部署MySQL集群:新手入门教程
- 2024-11-01部署MySQL集群学习:新手入门教程
- 2024-11-01部署MySQL集群入门:新手必读指南
- 2024-10-23BinLog入门:新手必读的MySQL二进制日志指南
- 2024-10-23Binlog入门:MySQL数据库的日志管理指南