如何实现一条sql查询返回多个数据库结果

2021/9/18 19:08:50

本文主要是介绍如何实现一条sql查询返回多个数据库结果,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

有很多从事海外业务的公司,都不止上架一两个马甲包,而是会同时上架十几个甚至几十个产品,当我们在提取数据时,明明需要用到查询的sql是一样的,但每次都要一个一个数据库的查,这样十分耗费时间,这时可以选用python,写个通用查询脚本,一次性查询出所有内容并且整合到一起。

基本思路:1、将数据库连接封装到同一个类下;

                   2、查询时依次调用相应的数据库,返回结果合并到一个DataFrame中。

类的封装可以参考下面的代码(有些数据库是通过ssh连的,这里也给出案例):

from sqlalchemy import create_engine
import pandas as pd
import pymysql
import time
from sshtunnel import SSHTunnelForwarder

class MySQLTool(object):
    def __init__(self,con):
        self.con = con
        self.con_post = create_engine('mysql+pymysql://read:post001@post.cshgrltcl1da.ap-south-1.rds.amazonaws.com:3306/post?charset=utf8')
        self.con_zest = create_engine('mysql+pymysql://read:zest001@zest.c8g4xu1qjzu8.ap-south-1.rds.amazonaws.com:3306/zest?charset=utf8')
        self.con_kin = create_engine('mysql+pymysql://read:kin001@kin.ccvv6shbauub.ap-south-1.rds.amazonaws.com:3306/kin?charset=utf8')

    def read(self,sql):
        if self.con == 'post':
            df = pd.read_sql(con=self.con_post,sql=sql)
        elif self.con == 'zest':
            df = pd.read_sql(con=self.con_zest,sql=sql)
        elif self.con == 'kin':
            df = pd.read_sql(con=self.con_kin,sql=sql) 
        elif self.con == 'iDomp':
            server =  SSHTunnelForwarder(
                ("106.00.100.30", 22), #ssh IP和port
                ssh_password = "ssh12346",#ssh 密码
                ssh_username = "sshroot",#ssh账号
                remote_bind_address = ("172.00.00.132", 3306)) #数据库所在的IP和端口
            
            server.start()
        
            conn = pymysql.connect(host = "127.0.0.1", #固定写法
                           port = server.local_bind_port,
                           user = "approot", #数据库账号
                           passwd = "********",#数据库密码
            #                db = "order"     # 可以限定,只访问特定的数据库,否则需要在mysql的查询或者操作语句中,指定好表名
                            )
            df = pd.read_sql(sql, conn)
            server.close()  
        else:
            print('请输入正确信息')
        return df 
  

读sql的代码:

def run_data(db,sql):
    
    tt = time.time()
    
    aa = MySQLTool(db)

    ab = aa.read(sql)
#    ab.insert(1,'app',db)
    print('Time from {1} used: {0} sec'.format(time.time()-tt,db))
    return ab 

def read_sql(db,sql,n=1):
    if n<=10:
        try:            
            return run_data(db,sql)
        except Exception as e:
            print('错误,重跑')
            print(e)
            n+=1
            return read_sql(db,sql,n)  

再写个list:

def dbname():
    return ['post','kin','iDomp','zest']

这样准备工作就做好了,接下来就是实际应用

sql = """select * from order_info limit 10;"""

data = pd.DataFrame()
for db in dbname():
    locals()[db] = read_sql(db,sql)
    data = pd.concat([data,locals()[db]],ignore_index=True)

学会了就加到自己的武器库里吧~



这篇关于如何实现一条sql查询返回多个数据库结果的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程