hive导致表锁的SQL和租户信息获取

2021/11/6 2:12:09

本文主要是介绍hive导致表锁的SQL和租户信息获取,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

hive导致表锁的SQL和租户信息获取

多租户平台存在同一个表有多个租户具有访问权限,hive提供锁机制,保障数据的安全。由此会引发由于某个租户占用锁时间较长,其他租户作业滞后,如果将导致表锁的SQL和租户信息截取到,可以提供给局方进行业务流程的优化,加快作业执行效率。本文档记录一个可实现的方法供参考

获取表锁信息

场景:hive使用分布式协调服务(Zookeeper)提供的分布式锁,hive底层由mapreduce执行,yarn统一资源调度,本文档由python语言实现

关键代码

# xxxx
# 2021/11/4 8:54
import configparser
import os
import time
from kazoo.client import KazooClient
import ConnMysqlTmpl
​
'''
#获取zookeeper集群信息
ZookeeperInfo.ini文件为:
    [zookeeper-conn]
    url=zk_host_name:port
'''
def zookeeper_info():
    cf = configparser.ConfigParser()
    pass1 = os.path.dirname(os.path.abspath('.'))
    cf.read(pass1 + "/config/ZookeeperInfo.ini")
    items = cf.items("zookeeper-conn")
    list = []
    for line in items:
        list.append(line[1])
    return list
​
'''
#定义一个zookeeper类,实现zk集群的链接和关闭
'''
class Zookeeper:
    def connection(self,zk_url):
        self.zk_url=zk_url
        try:
            self.conn_path=KazooClient(self.zk_url)
            self.conn_path.start()
            return self.conn_path
        except Exception as e:
            print(f"SSH连接异常, 错误如下: {e}")
​
    def conn_close(self):
        self.conn_path.stop()
        print("已关闭Zookeeper连接...")
​
'''
#获取需要监控的表
    HiveMonLockInfo文件格式随意,只要在zk中拿到需要监控的表路径即可
'''
def monitor_info():
    filename = 'datainfo/HiveMonLockInfo'
    if os.path.exists(filename):
        with open(filename, 'r', encoding='utf8') as rfile:
            lines = rfile.readlines()
        return lines
    else:
        print("未找到文件:【 {0} 】信息,请联系后台管理员处理".format(filename))
​
'''
#获取表锁列表
'''
def get_lock():
    zk=Zookeeper()
    zk_info = zookeeper_info()
    zk_conn=zk.connection(zk_info[0])
    table_info=monitor_info()
    locks = []
    for table in table_info:
        if zk_conn.exists("/hive_zookeeper_namespace/" + table.rstrip('\n')):
            lock_name = zk_conn.get_children("/hive_zookeeper_namespace/" + table.rstrip('\n'))
            for lock in lock_name:
                if "LOCK" in lock:
                    locks.append("/hive_zookeeper_namespace/"+table.rstrip('\n')+"/"+lock)
                else:
                    pass
        else:
            print("/hive_zookeeper_namespace/" + table.rstrip('\n'), "无锁情况", get_time())
    zk.conn_close()
    return locks
​
'''
#获取每个表锁的具体信息
table_locks_info为入库执行的语句
'''
def get_lock_info():
    zk = Zookeeper()
    zk_info = zookeeper_info()
    zk_conn = zk.connection(zk_info[0])
    all_locks = get_lock()
    print(all_locks)
    if all_locks:
        for path in all_locks:
            #获取所有锁node信息
            all_info=zk_conn.get(path)
​
            #获取query_id和sql语句
            all_sql_info = str(all_info).split("ZnodeStat")[0]
            query_id = (all_sql_info.split(":")[0])[3:]
            sql_info_tmp = all_sql_info.split(":")[3]
            sql_info_tmp1=sql_info_tmp.replace("\n","\\n")
            sql_info=sql_info_tmp1.replace("\'","\\'")
​
            #获取节点创建信息
            time_info = str(all_info).split("ZnodeStat")[1]
            create_time = time_info.split(",")[2]
​
            #时间戳转换
            timeStamp = int(create_time.split("=")[1])
            '''
            爬取下来的时间戳长度都是13位的数字,而time.localtime的参数要的长度是10位,所以我们需要将其/1000并取整即可
            '''
            timeArray = time.localtime(timeStamp/1000)
            otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
            try:
                ConnMysqlTmpl.table_locks_info(query_id,sql_info,otherStyleTime,path.split("/")[3])
            except Exception as e:
                print(f"执行SQL失败,请排查:{e}")
    else:
        print("所监控的表无锁情况~", get_time())
    zk.conn_close()
​
def get_time():
    create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    return create_time
​
if __name__ == '__main__':
    get_lock_info()

主要说明

#获取所有锁node信息

#获取query_id和sql语句
all_sql_info = str(all_info).split("ZnodeStat")[0]
query_id = (all_sql_info.split(":")[0])[3:]
sql_info_tmp = all_sql_info.split(":")[3]
sql_info_tmp1=sql_info_tmp.replace("\n","\\n")
sql_info=sql_info_tmp1.replace("\'","\\'")

/SQL语句和SQL语句的query_id/

获取运行作业的query_id和租户

关键代码

import requests
import configparser
import os
import AppSourceInfo
import ConnMysqlTmpl
from json import JSONDecodeError
from requests.auth import HTTPBasicAuth
​
'''
#获取配置信息:访问url
'''
def get_app_info():
    cf=configparser.ConfigParser()
    pass1=os.path.dirname(os.path.abspath("."))
    cf.read(pass1 + "/config/AppExecSqlInfo.ini")
    items = cf.items("appexecsql-conn")
    list = []
    for line in items:
        list.append(line[1])
    return list
​
'''
#获取运行状态下作业的query_id和租户
'''
def job_info():
    conn_info=get_app_info()
    app_ids = AppSourceInfo.get_each_app1()
    if app_ids:
        for line in app_ids:
            try:
                all_info_tmp=requests.get(conn_info[0]+line+"/ws/v1/mapreduce/jobs/"+str(line).replace("application","job")+"/conf",
                              auth=HTTPBasicAuth(conn_info[1],conn_info[2]))
                try:
                    info_json=all_info_tmp.json()
                    result=info_json["conf"]
                    all_info=result["property"]
                    list=[]
                    for line in all_info:
                        '''
                        if line["name"] == "hive.query.string":
                            #list.append(str(line["value"]))
                            sql_info_tmp = line["value"]
                            if sql_info_tmp:
                                sql_info_tmp1 = sql_info_tmp.replace("\n", "\\n")
                                sql_info = sql_info_tmp1.replace("\'", "\\'")
                                list.append(sql_info)
                            else:
                                sql_info = "0"
                                list.append(sql_info)
                        elif line["name"] == "mapreduce.jdbc.input.query":
                            #list.append(str(line["value"]))
                            jdbc_query_tmp = line["value"]
                            #print(type(jdbc_query_tmp))
                            if len(jdbc_query_tmp):
                                jdbc_query_tmp1 = jdbc_query_tmp.replace("\n", "\\n")
                                jdbc_query = jdbc_query_tmp1.replace("\'", "\\'")
                                list.append(jdbc_query)
                            else:
                                jdbc_query = "0"
                                list.append(jdbc_query)
                        '''
                        
                        #获取query_id
                        if line["name"] == "hive.query.id":
                            #list.append(str(line["value"]))
                            query_id_tmp = line["value"]
​
                            if query_id_tmp:
                                query_id = line["value"]
                                list.append(query_id)
                            else:
                                query_id = "0"
                                list.append(query_id)
                                
                         #获取执行作业的队列名称,就能对应到租户
                        elif line["name"] == "mapreduce.job.queuename":
                            #list.append(str(line["value"]))
                            user_info = (line["source"])[1]
                            user = str(user_info).split("/")
                            queue_name = user[4]
                            list.append(queue_name)
                        else:
                            pass
                    print(list)
                    print(len(list))
                    try:
                        pass
                        ConnMysqlTmpl.user_app_query_info1(list)
                    except Exception as e:
                        print(f"数据入库失败,请排查:{e}")
                except JSONDecodeError as e:
                    print("{0}作业非MapReduce任务,请排查~".format(line))
            except Exception as e:
                print("获取app信息失败,请排查 {0}".format(e))
    else:
        print("获取app_id信息失败,请排查~")

主要说明

· 执行前(ACCEPTED)状态下的作业,内容编码存在异常,无法通过yarn rest api进行读取

· 执行完成的作业(FINISHED)的作业无异常情况下都会及时的释放锁,如果集群开始了日志服务,可以访问jobhistoryserver服务读取对应日志

获取运行状态下作业的信息,链接为:

/http://resouecemanage:port/applicationID/ws/v1/mapreduce/jobs/jobId/conf/

将返回的数据进行json格式转换:

info_json=all_info_tmp.json() result=info_json["conf"] all_info=result["property"]

获取query_id:

if line["name"] == "hive.query.id":
                            #list.append(str(line["value"]))
                            query_id_tmp = line["value"]

获取资源队列名称:

line["name"] == "mapreduce.job.queuename":
                            #list.append(str(line["value"]))
                            user_info = (line["source"])[1]
                            user = str(user_info).split("/")
                            queue_name = user[4]
                            list.append(queue_name)

效果呈现

SELECT
    a.query_id,
    a.queue_name,
    b.sql_info,
    b.table_name,
    b.ctime
FROM
    user_app_query_info a
JOIN zookeeper_lock_info b
WHERE
    a.query_id = b.query_id

两个表根据query_id进行关联,获取到锁表情况下,锁表的SQL语句和租户信息。


这篇关于hive导致表锁的SQL和租户信息获取的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程