python操作pgsql备份还原

2022/6/8 2:21:35

本文主要是介绍python操作pgsql备份还原,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

python操作pgsql备份还原

common.py

#!/usr/bin/env python
# encoding: utf-8

import argparse

dbs = [
"test1",
"test2"
]

#  ct: pgsql容器的名称
ct = "pg"

# 环境主机地址
host = {
    "dev": "ip地址1",
    "test": "ip地址2"
}
# 环境pg端口
port = {
    "dev": 5432,
    "test": 5432
}

# 数据库账号密码
password = {
    "user1": "passwd1",
    "user2": "passwd2"
}

def get_parser():
    parser = argparse.ArgumentParser(description='Krum')
    parser.add_argument('-e', required=True, choices=['dev', 'test'])
    return parser

database_back.py

#!/usr/bin/env python
# encoding: utf-8

import os
import time
from datetime import datetime
from common import *

def backup_postgresql(env="test", backup_dir="."):
    cmd = f'mkdir -p {backup_dir}'
    print(cmd)
    os.system(cmd)

    for db in dbs:
        cmd = f'docker exec -i {ct} bash -c '
        cmd += f'"export PGPASSWORD={password[env]} && pg_dump -h {host[env]} -Uroot {db} " > {backup_dir}/{db}.sql;'
        print(cmd)
        os.system(cmd)


def backup_mysql(env="test", backup_dir="."):
    cmd = f'mkdir -p {backup_dir}'
    print(cmd)
    os.system(cmd)

    db = "lts"
    ct = "mysql"

    cmd = f'docker exec -i {ct} '
    cmd += f'mysqldump -h {host[env]} -uroot -p{password[env]} {db} > {backup_dir}/{db}.sql;'
    print(cmd)
    os.system(cmd)

if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args()
    env = args.e

    now = datetime.now().strftime("%Y%m%d")
    now_more = datetime.now().strftime("%Y%m%d%H%M%S")
    backup_dir = f'./backup/{env}/{env}_{now}'
    backup_dir_more = f'./backup/{env}/{env}_{now_more}'

    backup_postgresql(env=env, backup_dir=backup_dir)
    backup_mysql(env=env, backup_dir=backup_dir)
    cmd = f'tar -zcvf {backup_dir_more}.tar.gz {backup_dir}'
    #cmd = f'cp -r {backup_dir} {backup_dir_more}'
    print(cmd)
    os.system(cmd)

备份用法:

python3 backup_db.py -e prod

import_db.py

#!/usr/bin/env python
# encoding: utf-8

from email.policy import default
import os
from pydoc import describe
import time
from datetime import datetime
import argparse

# 清理旧数据库并创建新的数据库
def clear_db(db, ct="postgres"):
    if not db:
        print("db=None")
        exit()

    cmd = f'docker exec {ct} bash -c '
    cmd += f'"export PGPASSWORD={pgpd}; psql -Uroot -w -h {tghost} -p {tgport} -c '
    cmd += f"\\\"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE datname='{db}' AND pid<>pg_backend_pid();\\\"\""
    os.system(cmd)

    print(f"[+] DATABASE {db}")
    cmd = f'docker exec {ct} bash -c '
    cmd += f'"export PGPASSWORD={pgpd}; psql -Uroot -w -h {tghost} -p {tgport} -c '
    cmd += f'\'DROP DATABASE {db};\'"'
    os.system(cmd)

    cmd = f'docker exec {ct} bash -c '
    cmd += f'"export PGPASSWORD={pgpd}; psql -Uroot -w -h {tghost} -p {tgport} -c '
    cmd += f'\'CREATE DATABASE {db};\'"'
    os.system(cmd)
# 导入数据
def import_pgsql(db, ct='postgres'):
    print(f"DBNAME: {db} TARGET: {tghost}")

    clear_db(db)

    cmd = f'docker exec -i {ct} bash -c "export PGPASSWORD={pgpd} && psql -o /dev/null -q -h {tghost} -p {tgport} -Uroot -w {db}" < {dpath}/{db}.sql'
    os.system(cmd)
    print()
    print("[+] IMPORT DB SUCCESS!")
    print()

def import_db_all(dbname):
    print(f"TARGET: {tghost} PORT: {tgport}")
    confirm = input("please confirm this information[yes/no]:")
    if confirm != 'yes' and confirm != 'y':
        print("Not confirm and Exit...")
        exit()
    if not dbname:
        print("\033[31m未获取到数据库文件,检查路径:'" + dpath + "' 是否正确\033[0m")
        exit()
    for db in dbname:
        import_pgsql(db)

def get_parser():
    parser = argparse.ArgumentParser(description='import database')
    parser.add_argument('-d', required=True)
    parser.add_argument('-tg', required=True)
    parser.add_argument('-tgp', required=False,default=5432)
    return parser

if __name__ ==  "__main__":
    parser = get_parser()
    args = parser.parse_args()
    dpath = args.d
    tghost = args.tg
    tgport = args.tgp
    list_file = os.listdir(dpath)
    pgpd = 'dbpasswd'
    
    dbname = []
    for filename in list_file:
        if 'lts' not in filename and 'sql' in list_file:
            dbname.append(filename.split('.')[0])
    print('-d:sql文件存放路径;-tg: 导入主机;-tgp:导入主机端口')
    import_db_all(dbname)

导入用法:

python3 import_db.py -d:sql文件存放路径;-tg: 导入主机;-tgp:导入主机端口


这篇关于python操作pgsql备份还原的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程