Python 函数链2 一个基于规则集串联的场景
2021/6/7 20:55:03
本文主要是介绍Python 函数链2 一个基于规则集串联的场景,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
说明
先看一个将多个规则集串联起来的场景。这是规则集刚完成,一个比较粗糙的集成调用,主要的目的是为了观察一下应用过程的流程,看看哪里有问题。在开发函数链的时候可以一并解决掉。
内容
1 场景
实现数据获取、数据预处理、数据处理和输出的功能。
2 数据获取
从数据源获取数据。感觉比较麻烦的地方是换了一个环境,逐个的检查依赖的函数。之后可以写一个函数,自动检查依赖函数(fs.xxx
),读入之后再检查是否还有依赖,自动载入。
import FuncDict as fd import pandas as pd # 逐个导入依赖函数列表的函数 # cur_func_dict.dynamic_load_func_db('path_traceback', method='force') # 依赖函数列表 depend_list = ['run_ruleset_v2','WebMsg1','get_rule_input_kw_v2','run_a_rule_v2', 'rulev002_check_and_pass_dict_keys_in_sets','rulev002_gen_slice_list', 'slice_list_by_batch1','rulev002_read_log_simple','rulev002_real_slice_list', 'create_folder_if_notexist','rulev002_simple_get_iter_slice_list', 'web_get_by_json','get_time_str','to_pickle','logging_str_a_row', 'path_traceback','rulev002_simple_get_report','parse_log1','amend_path_slash']
- 1 读入规则集
query_res_list = func_lmongo.query_many(dbname='rules', cname='rules',kv_list = [{'rule_set':'BatchSimpleGet'}], max_recs=10000)[0] BatchSimpleGet_df = pd.DataFrame(query_res_list)
- 2 输入参数
# 自由定义 folder_path = './test001_simple_get/' max_files_in_folder = 1000 id_min = 0 id_max = 2000 read_step = 500 url_template = 'http://xxx/solr/xxx/select?q=docId:%s' # 公网测试地址 # 局域网地址 target_url = 'http://192.168.1.146:20002/api/' # -- 封装 cur_dict ={} cur_dict['input_dict'] = {} cur_dict['input_dict']['folder_path'] = folder_path cur_dict['input_dict']['max_files_in_folder'] = max_files_in_folder cur_dict['input_dict']['id_min'] = id_min cur_dict['input_dict']['id_max'] = id_max cur_dict['input_dict']['read_step'] = read_step cur_dict['input_dict']['url_template'] = url_template cur_dict['input_dict']['target_url'] = target_url
- 3 执行
fs.run_ruleset_v2('simple_get', BatchSimpleGet_df, cur_dict, fs) --- {'name': 'simple_get', 'status': True, 'msg': 'ok', 'duration': 8, 'data': {'total_success_recs': 4, 'cur_success_recs': 0, 'cur_start_dt': None, 'cur_end_dt': None, 'cur_mean_seconds': None, 'data_path': './test001_simple_get/', 'log_path': './test001_simple_get/read.log', 'cur_slice_list': []}}
3 数据预处理
- 1 导包
import FuncDict as fd import pandas as pd cur_func_dict = fd.FuncDict1('ner_preprocessing', pack_fpath='./funcs/',lmongo=fd.func_lmongo) import funcs as fs cur_func_dict.fs = fs func_lmongo = fd.func_lmongo
- 2 导入依赖函数
# 依赖函数列表 depend_list = ['from_pickle','get_batch_file1','judge_doc_pull_data_get_results', 'flat_dict','parse_judge_doc_to_df', 'rulev002_get_docid_ss','md5_trans','txt_etl','set_interval_judge', 'interval_judge','rulev002_get_company_ss_hash','extract_str_base', 'str_contains_words','rulev002_for_ss_list','split_series_by_interval', 'rulev002_ner_proprocessing_output']
- 3 将get到的响应json转换
这一步没有纳入规则集里,因为这个会随着不同的任务变
get_input_folder = './test001_simple_get/' pkl_file_list = fs.get_batch_file1(get_input_folder, '.pkl', 1000) response_list = [] for pkl_file in pkl_file_list: pkl_name = pkl_file.replace('.pkl','') tem_list = fs.from_pickle(pkl_name, path = get_input_folder) response_list+=tem_list res_df = fs.parse_judge_doc_to_df(response_list, fs=fs)
将原始数据转为了只有doc_id
和content
两列的df。
- 4 读入规则集
query_res_list = func_lmongo.query_many(dbname='rules', cname='rules',kv_list = [{'rule_set':'NerPreProcessing'}], max_recs=10000)[0] NerPreProcessing_df = pd.DataFrame(query_res_list)
- 5 定义输入类型
# 自由定义 # 每个短句列表的最大长度,考虑内存/显存 max_ss_list_len = 4000 # 每个短句允许的最大长度,模型有处理的最大限制(认为公司名称最小不短于4个字) ss_len_min = 4 ss_len_max = 100 # 这个参数也可以不写(默认左闭右开) intever_type = 'left_close' # 数据部分 # data = [{'doc_id' : 'a001','content':'oppo公司竟然不支持鸿蒙'}, # {'doc_id':'a002','content':'华为公司开源了鸿蒙,是个好东西'}, # {'doc_id':'a003','content':'你觉得鸿蒙怎么样'}] data =data_dict data_type = 'dict_list' # 一般性过滤文本 default_pass_str = r'[\u4e00-\u9fa5\da-zA-Z\-()().,]' # 如果考虑人名再加两个点 default_pass_str = '[\u4e00-\u9fa5\da-zA-Z\-()().,·•]' # 输出模式 realtime/persistent # output_mode = 'realtime' output_mode = 'persistent' output_folder = './test001_pre_processed/' # 实体识别任务 ner_task='company' # -- 封装 cur_dict ={} cur_dict['input_dict'] = {} cur_dict['input_dict']['max_ss_list_len'] = max_ss_list_len cur_dict['input_dict']['ss_len_min'] = ss_len_min cur_dict['input_dict']['ss_len_max'] = ss_len_max cur_dict['input_dict']['intever_type'] = intever_type cur_dict['input_dict']['data'] = data cur_dict['input_dict']['data_type'] = data_type cur_dict['input_dict']['default_pass_str'] = default_pass_str cur_dict['input_dict']['output_mode'] = output_mode cur_dict['input_dict']['output_folder'] = output_folder cur_dict['input_dict']['ner_task'] = ner_task
- 5 调用规则集
fs.run_ruleset_v2('ner_preprocessing', NerPreProcessing_df, cur_dict, fs) --- takes 0.22 data save to pickle: ./test001_pre_processed/7e65213bc8dea995441378335b14780d.pkl
关于数据处理的模式稍微有点乱:
- 1 如果是离线模式,是文件夹到文件夹还是独立处理的
- 2 如果是在线模式,文件应该放哪
4 数据处理
- 1 传入模型数据
fs.unzip_a_folder('./model.zip', './model')
应该搭一个ftp,使用wget方式来获取文件。这块属于文件的IO。
- 2 模型的参数
这块可能需要做一些管理。一个模型应该有个名字,对应的参数可以封装好放在一起。
# 要载入模型 # 模型初始设置 path_name = './' model_checkpoint = path_name + 'model/model_v0/' label_list = ['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC', 'B-T', 'I-T'] max_len = 200 import torch import transformers from transformers import AutoTokenizer, AutoModelForTokenClassification # Setting up the device for GPU usage from torch import cuda device = 'cuda' if cuda.is_available() else 'cpu' print('device available', device) tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) assert isinstance(tokenizer, transformers.PreTrainedTokenizerFast) # 预载入(device=cpu) model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list)) from functools import partial tencoder = partial(tokenizer.encode,truncation=True, max_length=max_len, is_split_into_words=True, return_tensors="pt")
- 3 载入规则集
query_res_list = func_lmongo.query_many(dbname='rules', cname='rules',kv_list = [{'rule_set':'NerModelPredict'}], max_recs=10000)[0] NerModelPredict_df = pd.DataFrame(query_res_list)
- 4 初始化
感觉处理数据的格式并不统一,有必要规定离线文件的标准格式
# 依赖函数列表 depend_list = ['unzip_a_folder','rulev002_check_pkl_files','get_list_gap', 'rulev002_ner_predict_pkl_files','ner_batch_predict', 'inv_convert_chinese_word_tag_list2BIO','rulev002_ner_merge_predict_result'] # 自由定义 # -- 封装 cur_dict ={} cur_dict['input_dict'] = {} # 模型参数 cur_dict['input_dict']['model_checkpoint'] = model_checkpoint cur_dict['input_dict']['label_list'] = label_list cur_dict['input_dict']['max_len'] = max_len cur_dict['input_dict']['device'] = device cur_dict['input_dict']['tokenizer'] = tokenizer cur_dict['input_dict']['model'] = model cur_dict['input_dict']['tencoder'] = tencoder # 输入数据 cur_dict['input_dict']['input_folder'] = './test001_pre_processed/' # 允许的最大文件数(如果数据大的话就需要即时的把处理完成的文件删除掉) cur_dict['input_dict']['allow_fetch_num'] = 100000 # 指定设备。只有cuda和cpu两种,如果device没有cuda,那么就自动转为cpu cur_dict['input_dict']['user_device'] = 'cuda' --- fs.run_ruleset_v2('ner_modelpredict', NerModelPredict_df, cur_dict, fs) {'name': 'ner_modelpredict', 'status': True, 'msg': 'ok', 'duration': 4, 'data': {'cur_processed_files': 0, 'already_processed_files': 1, 'save_folder': './test001_pre_processed_predict/', 'cur_file_list': []}}
总结
整体效果比较符合预期:多付出一点时间在管理上,换来使用的可靠和方便。
这些规则集都是单独开发的,第一次在全新的环境下拼凑起来,整个过程还是很流畅的。一半的迁移时间花在导入依赖函数上,另外还有一些时间花在文件资源(模型)的导入上。
虽然在单独开发规则集的时候没有统一的IO考虑(这是个问题),但是并没有给集成使用造成太多麻烦,没有出现担心的bug调试。
不过5原则。
我发现一个规则集,不超过5条规则是比较舒服的。我在集成的时候也就分为拉取数据、预处理数据、处理数据和准备输出4部分。 这样在每一个层级都是非常清晰的,复杂度可以通过多层来实现。假设一个函数是5的复杂度(5个计算逻辑,有时候可以堆一二十个),一个规则集的复杂度就是25,一个函数链就是125。但通过分层,我们要面对的一直是5的复杂度,所以感觉比较轻松。在100复杂度下足够处理一个专门的业务问题了。
规则断点报错
。这部分还不是很完善,有的时候断了没有报(例如函数的参数不对)。但是看到某个规则集的某条规则错了,定位起来很舒服。
- 设计函数链需要考虑的几个改进:
- 1 IO模式的规定与统一
- 2 载入包的简化
- 3 连接的规定
这篇关于Python 函数链2 一个基于规则集串联的场景的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-08有遇到过吗?同样的规则 Excel 中 比Python 结果大
- 2024-03-30开始python成长之路
- 2024-03-29python optparse
- 2024-03-29python map 函数
- 2024-03-20invalid format specifier python
- 2024-03-18pool.map python
- 2024-03-18threads in python
- 2024-03-14python Ai 应用开发基础训练,字符串,字典,文件
- 2024-03-13id3 algorithm python
- 2024-03-13sum array elements python