csmarapi
2021/10/2 23:12:25
本文主要是介绍csmarapi,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
#coding=utf-8 ''' ''' import json import urllib3 from csmarapi.UrlUtil import UrlUtil import logging import time import zipfile import csv import os from urllib import parse import datetime from websocket import create_connection import sys class CsmarService(object): ''' csmarapi服务接口 ''' ''' 存放loadData方法解析的结果集 ''' dataList=[] def __init__(self): ''' Constructor ''' self.urlUtil= UrlUtil() logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s %(levelname)s %(message)s', datefmt='%a %d %b %Y %H:%M:%S') self.logger = logging.getLogger('CsmarService') fh = logging.FileHandler('csmar-log.log', mode='a', encoding='utf-8', delay=False) fh.setLevel(logging.INFO) fmt = logging.Formatter('%(asctime)s %(filename)s %(levelname)s %(message)s') fh.setFormatter(fmt) self.logger.handlers.append(fh) def logon(self,account,pwd,lang='0',belong='0'): if type(account).__name__!='str' or type(pwd).__name__!='str': self.logger.error('Parameter type error') return False targetUrl = self.urlUtil.getLoginUrl() encodedArgs = {'account': str(account),'force':'1','pwd':str(pwd),"clientType":'5',"version":self.urlUtil.getVersionNum()} body = json.dumps(encodedArgs).encode('utf-8') headers={'Content-Type': 'application/json','lang': lang,'belong': belong} return self.doPost(targetUrl,body=body,headers=headers) def getToken(self,account,pwd): loginInfo = self.logon(account, pwd) return loginInfo['data']['token'] def login(self,account,pwd,lang='0',belong='0'): ''' 函数功能:用户登录信息以及语言设置写入文件 参数: account(str):账号 pwd(str):密码 __lang(str):语言:简体":"0","英文":"1","繁体":"2" 返回值:无 ''' if lang not in ['0','1','2']: lang='0' if belong not in ['0','1']: belong='0' loginInfo = self.logon(account, pwd, lang, belong) if loginInfo['code']==0: self.logger.info(loginInfo['msg']) self.writeToken(loginInfo['data']['token'],lang,belong) else: self.writeToken('',lang,belong) def getTokenFromFile(self): fo = open('token.txt','r') alist = fo.readlines() fo.close() if len(alist)==0: self.logger.error('The account has been offline, please sign in again.') return False return alist def getListDbs(self): ''' 函数功能:查看已购买的数据库列表 参数:无 返回值:json字符串 ''' targetUrl = self.urlUtil.getListDbsUrl() alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'belong':alist[2]} dataDic = self.doGet(targetUrl, headers=headers) if dataDic['code']==0: dataList = dataDic['data'] return dataList def getListFields(self,tableName): ''' 函数功能:查看数据表的字段列表 参数:无 返回值:json字符串 ''' if type(tableName).__name__!='str': self.logger.error('Parameter type error') return False targetUrl = self.urlUtil.getListFieldsUrl() alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'belong':alist[2]} targetUrl=targetUrl+'?table='+tableName dataDic = self.doGet(targetUrl, headers=headers) if dataDic['code']==0: dataList = dataDic['data'] return dataList def getListTables(self,databaseName): ''' 函数功能:查看已购买的数据库表列表 参数:无 返回值:json字符串 ''' if type(databaseName).__name__!='str': self.logger.error('Parameter type error') return False #字符编码强制转换 databaseName=parse.quote(databaseName) targetUrl = self.urlUtil.getListTablesUrl() alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'belong':alist[2]} listFieldsUrl=targetUrl+'?dbName='+databaseName dataDic = self.doGet(listFieldsUrl, headers=headers) if dataDic['code']==0: dataList = dataDic['data'] return dataList def preview(self,tableName): ''' 函数功能:查看已购买的数据库表列表 参数: tableName(str):表名称 返回值:json字符串 ''' if type(tableName).__name__!='str': self.logger.error('Parameter type error') return False targetUrl = self.urlUtil.getPreviewUrl() body =json.dumps( {'table':tableName}).encode('utf-8') alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'Content-Type': 'application/json'} dataDic = self.doPost(targetUrl,body=body,headers=headers) if dataDic['code']==0: dataList = dataDic['data']['previewDatas'] return dataList def query(self,columns,condition,tableName,*_other_args): ''' 函数功能:查询已购买的数据库表数据 参数: columns(list):打包的字段 condition(str):查询条件 tableName(str):大包表名称 startTime(str):开始时间 endTime(str):结束时间 返回值:json字符串 ''' if type(columns).__name__!='list' or type(condition).__name__!='str' or type(tableName).__name__!='str': self.logger.error('Parameter type error') return targetUrl = self.urlUtil.getQueryUrl() __http = urllib3.PoolManager() bodyDic={'columns': columns,'condition':condition,'table':tableName} if _other_args!=None and len(_other_args)>=1: startTime=_other_args[0] if startTime!=None and len(startTime)!=0: if not self.is_valid_date(startTime): self.logger.error("Date format error, should be 'yyyy-MM-dd'") return False bodyDic['startTime']=startTime if _other_args!=None and len(_other_args)>=2: endTime=_other_args[1] if endTime!=None and len(endTime)!=0: if not self.is_valid_date(endTime): self.logger.error("Date format error, should be 'yyyy-MM-dd'") return False bodyDic['endTime']=endTime body =json.dumps(bodyDic).encode('utf-8') alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'Content-Type': 'application/json'} dataDic = self.doPost(targetUrl,body=body,headers=headers) if dataDic['code']==0: dataList = dataDic['data']['previewDatas'] return dataList def queryCount(self,columns,condition,tableName,*_other_args): ''' 函数功能:查询已购买的数据库表数据总数 参数: columns(list):打包的字段 condition(str):查询条件 tableName(str):大包表名称 startTime(str):开始时间 endTime(str):结束时间 返回值:数据总数 ''' if type(columns).__name__!='list' or type(condition).__name__!='str' or type(tableName).__name__!='str': self.logger.error('Parameter type error') return targetUrl = self.urlUtil.getQueryCountUrl() __http = urllib3.PoolManager() bodyDic={'columns': columns,'condition':condition,'table':tableName} if _other_args!=None and len(_other_args)>=1: startTime=_other_args[0] if startTime!=None and len(startTime)!=0: if not self.is_valid_date(startTime): self.logger.error("Date format error, should be 'yyyy-MM-dd'") return bodyDic['startTime']=startTime if _other_args!=None and len(_other_args)>=2: endTime=_other_args[1] if endTime!=None and len(endTime)!=0: if not self.is_valid_date(endTime): self.logger.error("Date format error, should be 'yyyy-MM-dd'") return bodyDic['endTime']=endTime body =json.dumps(bodyDic).encode('utf-8') alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'Content-Type': 'application/json'} dataDic = self.doPost(targetUrl,body=body,headers=headers) if dataDic['code']==0: self.logger.info('The total number of data obtained by condition is ' + str(dataDic['data'])) return dataDic['data'] def pack(self,columns,condition,tableName,*_other_args): ''' 函数功能:向接口发送打包指令 参数: columns(list):打包的字段 condition(str):查询条件 startTime(str):开始时间 endTime(str):结束时间 tableName(str):大包表名称 返回值:json字符串 ''' if type(columns).__name__!='list' or type(condition).__name__!='str' or type(tableName).__name__!='str': self.logger.error('Parameter type error') return #字符编码强制转换 #condition=parse.quote(condition) targetUrl = self.urlUtil.getPackUrl() __http = urllib3.PoolManager() bodyDic={'columns': columns,'condition':condition,'table':tableName} bodyDic={'columns': columns,'condition':condition,'table':tableName} if _other_args!=None and len(_other_args)>=1: startTime=_other_args[0] if startTime!=None and len(startTime)!=0: if not self.is_valid_date(startTime): self.logger.error("Date format error, should be 'yyyy-MM-dd'") return bodyDic['startTime']=startTime if _other_args!=None and len(_other_args)>=2: endTime=_other_args[1] if endTime!=None and len(endTime)!=0: if not self.is_valid_date(endTime): self.logger.error("Date format error, should be 'yyyy-MM-dd'") return bodyDic['endTime']=endTime body =json.dumps(bodyDic).encode('utf-8') alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'Content-Type': 'application/json'} return self.doPost(targetUrl,body=body,headers=headers) def packSignCodeWriteToFile(self,columns,condition,tableName,*_other_args): ''' 函数功能:获取打包结果的状态码写入本地文件 参数: columns(list):打包的字段 condition(str):查询条件 startTime(str):开始时间 endTime(str):结束时间 tableName(str):大包表名称 返回值:打包结果的状态码 ''' if type(columns).__name__!='list' or type(condition).__name__!='str' or type(tableName).__name__!='str': self.logger.error('Parameter type error') return False packRequest = self.pack(columns,condition,tableName,*_other_args) if packRequest['code']==0: signCode = packRequest['data'] fo = open('signCode.txt','w') fo.write(signCode) fo.close() self.logger.info('packaging return code:signCode='+signCode) return signCode return False def getPackResultBySignCode(self,signCode): ''' 函数功能:获取服务器上面的打包文件 参数: signCode(str):获取打包结果的状态码 返回值:json字符串 ''' if type(signCode).__name__!='str': self.logger.error('Parameter type error') return False targetUrl = self.urlUtil.getPackResultUrl() alist = self.getTokenFromFile() headers={'Lang': alist[1].strip('\n'),'Token':alist[0].strip('\n'),'belong':alist[2]} listFieldsUrl=targetUrl+'/'+signCode return self.doGet(listFieldsUrl, headers=headers) def getSignCodeFromFile(self): try: fo = open('signCode.txt','r') signCode = fo.read() fo.close() except FileNotFoundError as err: self.logger.error(format(err)) return None if signCode==None or len(signCode)==0: self.logger.error('用户未发送打包指令') return None self.logger.info('打包返回码:signCode='+signCode) return signCode def getPackResult(self,columns,condition,tableName,*_other_args): ''' 函数功能:发送打包指令成功之后,每10秒向接口发送获取服务器上面的打包文件的请求,直到返回状态status等于1,说明打包成功 返回打包文件的下载地址filePath,函数会自动下载到默认文件夹c:\\csmardata\\zip\\下面 参数: columns(list):打包的字段 condition(str):查询条件 startTime(str):开始时间 endTime(str):结束时间 tableName(str):大包表名称 返回值:json字符串 ''' #字符编码强制转换 #condition=parse.quote(condition) signCode = self.packSignCodeWriteToFile(columns,condition,tableName,*_other_args) if signCode==False: self.logger.error('Failed to get the packaging return code [signCode]') return while True: packResult = self.getPackResultBySignCode(signCode) if packResult['code'] != 0: break if packResult['data']['status']=='1': filePath= str(packResult['data']['filePath']) self.logger.info('Package successfully. File size is:'+str(packResult['data']['fileSize'])) # self.logger.info('生成文件下载请求'+filePath) _http = urllib3.PoolManager() r = _http.request('GET', filePath) data = r.data downloadDir = 'c:\\csmardata\\zip\\' if os.path.exists(downloadDir)==False: os.makedirs(downloadDir) f = open(downloadDir+signCode+'.zip', 'wb+') f.write(data) f.close() self.logger.info('Generate the local file path as :c:/csmardata/zip/'+signCode+'.zip') break elif packResult['data']['status']=='0': self.logger.info('An exception occurred during file packaging,please check the query criteria') break else: self.logger.info('Downloading, please wait patiently!') time.sleep(10) def getPackResultExt(self,columns,condition,tableName,*_other_args): ''' 函数功能:发送打包指令成功之后,每10秒向接口发送获取服务器上面的打包文件的请求,直到返回状态status等于1,说明打包成功 返回打包文件的下载地址filePath,函数会自动下载到默认文件夹c:\\csmardata\\zip\\下面 参数: columns(list):打包的字段 condition(str):查询条件 startTime(str):开始时间 endTime(str):结束时间 tableName(str):大包表名称 返回值:json字符串 ''' #字符编码强制转换 #condition=parse.quote(condition) signCode = self.packSignCodeWriteToFile(columns,condition,tableName,*_other_args) if signCode==False: return ws = create_connection(self.urlUtil.getWebsocketUrl()) tokenList = self.getTokenFromFile() token = tokenList[0].strip('\n') bodyDic = {'outlineId': signCode, 'status': 'start', 'token': token} cred_text = json.dumps(bodyDic) ws.send(cred_text) self.logger.info("downloading...") while True: result = ws.recv() packResult = json.loads(result) if packResult['code'] != 0: print('\n') self.logger.error(packResult['msg']) break if packResult['data']['status'] == '1': self.process_bar(packResult['data']['percentage']) filePath = str(packResult['data']['filePath']) print('\n') self.logger.info('Package successfully. File size is:' + str(packResult['data']['fileSize'])) # self.logger.info('生成文件下载请求' + filePath) _http = urllib3.PoolManager() r = _http.request('GET', filePath) data = r.data downloadDir = 'c:\\csmardata\\zip\\' if os.path.exists(downloadDir) == False: os.makedirs(downloadDir) zipPath = downloadDir + signCode + '.zip' f = open(zipPath, 'wb+') f.write(data) f.close() self.logger.info('Generate the local file path as :' + zipPath) break elif packResult['data']['status']=='0': self.logger.info('An exception occurred during file packaging,please check the query criteria') break else: self.process_bar(packResult['data']['percentage']) def unzipSingle(self,fileName): ''' 函数功能:解压单个文件到默认文件夹c:\\csmardata\\ 参数: fileName(str):完整文件名称fileName。例如C:\\csmardata\\zip\\662348367897071616.zip 返回值:无 ''' #解压单个文件到默认文件夹c:\\csmardata\\ zf = zipfile.ZipFile(fileName) (filePath,tempFileName)=os.path.split(fileName) (preFileName,extendsName)=os.path.splitext(tempFileName) destDir = 'c:\\csmardata\\'+preFileName try: zf.extractall(path=destDir) self.logger.info('The file has been successfully extracted to the directory:'+destDir) except RuntimeError as e: self.logger.error(e) zf.close() def loadData(self,fileName,count=10000): ''' 函数功能:解析的csv文件,把结果存入dataList。例如C:\\csmardata\\BETA_Ybeta.csv 参数: fileName(str):完整文件名称fileName。例如C:\\csmardata\\BETA_Ybeta.csv count(int):截取的记录数 返回值:无 ''' if fileName.endswith('.csv')==False: self.logger.error('Wrong file type that needs to be resolved') return False if type(fileName).__name__!='str' or type(count).__name__!='int': self.logger.error('Parameter type error') return False with open(fileName, encoding='utf-8-sig') as csvf: reader = csv.reader(csvf) for i,row in enumerate(reader): tempList=[] for tempStr in row: if tempStr.find('\t')!=-1: tempList.append(tempStr.replace('\t','')) else: tempList.append(tempStr) self.dataList.append(tempList) if count!=None and i==count: self.logger.info(tempList ) break else: self.logger.info(tempList ) return self.dataList #由于需求调整,该功能合并到loadData里面去了。 def getDataList(self): ''' 函数功能:返回loadData方法解析的结果集 参数:无 返回值:dataList ''' return self.dataList def is_valid_date(self, str_date): # 判断是否为自定义日期格式; try: time.strptime(str_date, "%Y-%m-%d") return True except Exception: return False def writeToken(self, token, lang, belong): fo = open('token.txt','w') alist=[token+'\n',str(lang)+'\n',str(belong)] fo.writelines(alist) fo.close() def process_bar(self, i): a = int((i + 1) / 5) b = 20 - a sys.stdout.write('\r|%s%s|%d%%' % (a * '▇', b * ' ', i)) sys.stdout.flush() time.sleep(.1) def doPost(self, targetUrl, body=None, headers=None): __http = urllib3.PoolManager() resp = __http.request('POST', targetUrl, body=body, headers=headers) dataDic =json.loads(resp.data.decode('utf-8')) if dataDic['code']==-3004: self.logger.error("The account has been offline, please sign in again.") elif dataDic['code']==-1: self.logger.error("System error.") elif dataDic['code']==0: return dataDic else: self.logger.error(dataDic['msg']) return dataDic def doGet(self, targetUrl, headers=None): __http = urllib3.PoolManager() resp = __http.request('GET', targetUrl, headers=headers) dataDic =json.loads(resp.data.decode('utf-8')) if dataDic['code']==-3004: self.logger.error("The account has been offline, please sign in again.") elif dataDic['code']==-1: self.logger.error("System error.") elif dataDic['code']==0: return dataDic else: self.logger.error(dataDic['msg']) return dataDic
这篇关于csmarapi的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-09CMS内容管理系统是什么?如何选择适合你的平台?
- 2025-01-08CCPM如何缩短项目周期并降低风险?
- 2025-01-08Omnivore 替代品 Readeck 安装与使用教程
- 2025-01-07Cursor 收费太贵?3分钟教你接入超低价 DeepSeek-V3,代码质量逼近 Claude 3.5
- 2025-01-06PingCAP 连续两年入选 Gartner 云数据库管理系统魔力象限“荣誉提及”
- 2025-01-05Easysearch 可搜索快照功能,看这篇就够了
- 2025-01-04BOT+EPC模式在基础设施项目中的应用与优势
- 2025-01-03用LangChain构建会检索和搜索的智能聊天机器人指南
- 2025-01-03图像文字理解,OCR、大模型还是多模态模型?PalliGema2在QLoRA技术上的微调与应用
- 2025-01-03混合搜索:用LanceDB实现语义和关键词结合的搜索技术(应用于实际项目)