一种基于二分法的变步长批量处理算法
2022/1/20 22:41:36
本文主要是介绍一种基于二分法的变步长批量处理算法,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1、前言
变步长批量处理算法,在实现某些功能时,非常需要。如数据传输和数据导入时,使用可变步长的批量处理算法,可以极大地提高系统的性能,。
1.1、数据传输
在不稳定的网络环境下,传输失败的几率提高,大的数据块可能会传输失败,如果分为小的数据块,可以传输成功,但由于传输开销,传输效率低。因此希望在网络好的时候,传输大的数据块或高分辨率的图像;网络差的时候,传输小的数据块或对图像进行降质处理,调低图像分辨率,提高压缩比等。因此,可变概念,可以提升服务功能的质量,提升系统的可用性。
1.2、数据导入
数据导入,特别是ETL,大量数据导入,效率非常重要。对于大多数数据库,如Myql,批量新增(如100条记录)和单记录新增的时间消耗相差无几,但处理能力有百倍之差。
曾经,笔者使用逐条数据insert,300万条记录导入,化了半个小时,简直无法忍受,于是,后来改为使用100条批量insert,但由于数据中存在这种那种的异常数据,经常出现一条异常数据导致成批(100条)的数据导入失败,这样,一次导入数据中可能有几条坏数据,导致几百上千条数据没有入库成功,于是,再修改代码,针对这些没入库成功的几百上千条记录里,逐条导入,检测出具体的坏数据。整个过程不堪回首。
因此,数据导入需要可变步长算法,这样可用极大提升数据导入的处理能力。
另外,还有一种Excel数据导入,如规定按记录的编码(字符串类型,如身份证号、手机号、订单编号等,唯一键字段)作为记录的特征字段,但表格数据中有新增的,还有修改的,即如果为新的编码值,为新增记录,如果数据库中编码值已存在,则需要修改记录。这种导入,如果采用固定批量值导入,新增的失败率是很高的,如果批量一旦失败就改为逐条导入的策略,也是效率不高,可变步长算法可非常高效地解决此问题。
1.3、其它应用场景
对这种可变的需求,具体很大的通用性,如与搜索相关的,也可以用可变步长算法来提升处理性能。
2、算法原理
2.1、算法概述
可变步长算法,不是个新鲜的概念,区别在于变化的依据和策略,如最大似然估计、梯度下降等,以及算法复杂度和是否简单易用。
本算法可以归于简单的决策树,有贪婪算法的因子,计算量很少。接口调用可以内嵌方法(类似C++的指针函数)来执行批量处理工作和单条数据修正处理工作。二分法是非常经典的算法,本算法的核心还是二分法,也可以说依据下列公式而开发:
\[1 + 2 + 4 + ... + 2^n = 2^{n+1}-1 \]使用QoS(Quality of Service,服务等级)的概念,将处理等级与处理能力(批量值)建立联系,QoS等级对应的批量值为\(2^0\)到\(2^n\)的一个连续序列,如:[128,64,32,16,8,4,2,1],上限为\(2^n\),这里n=7,下限为约定为1。等级0对应128,等级7对应1,等级值即为等级数组的下标。
具体算法如下:
- 对于长度为n的输入数据列表,类型为泛型类型T,即数据为List
。另外提供2个T类型的列表参数,为批量处理成功的数据列表和修正处理成功的数据列表,便于外部进一步处理(如相关数据一致性处理)。 - 算法返回处理异常记录的日志列表,字符串类型。使用者可以调整日志的格式和内容,以便定位异常数据的具体位置。
- 设置一个布尔型的异常标志值bError,用于记录之前是否发生了批量处理的异常,初值为false,该异常标志值在单条记录的修正处理后被复位为false。
- 设置一个数据列表下标锚点anchorIdx,表示当前正在处理的数据位置,初始为0。
- 设置一个当前等级值levelIdx,初值随意,这里设置初值为0,即从最高等级开始。
- 如果bError为false,即当前无异常,按照当前等级对应的批量值进行批量处理,如果成功,则提升一个等级(如果已为等级上限,则维持),并更新锚点anchorIdx到新的位置;如果处理失败,则设置bError为true,锚点anchorIdx不变,如当前等级不为等级下限,则下降一个等级;如果已是等级下限,则按照单条数据的修正处理方法进行处理,如果修正成功,则加入修正数据列表中,如果修正失败,则加入异常日志列表中,不管修正成功与否,修正处理后,anchorIdx均加1,且设置bError为false,表示异常数据已给检测出并按照规则进行处置了。
- 如果bError为true,即当前处于异常状态,按照当前等级对应的批量值进行批量处理,如果成功,则下降一个等级(如果已为等级下限,则维持),并更新锚点anchorIdx到新的位置;如果失败,且不为等级下限值,则锚点anchorIdx不变,下降一个等级;如果失败,且当前等级为等级下限值,则按照单条数据的修正处理方法进行处理,如果修正成功,则加入修正数据列表中,如果修正失败,则加入异常日志列表中,不管修正成功与否,修正处理后,anchorIdx均加1,且设置bError为false。
- 结束条件,anchorIdx到达n,即所有数据被处理完毕。
这里有两个外部方法,批量处理方法和单条修正方法,这是一个非常抽象的接口方法,具体对数据进行怎么处理,由外部根据需要自行确定。后面提供的单元测试代码,给出批量数据类型转换的例子。
当bError为true时,批量处理成功,仍然要降级,是因为前面等级发生异常时,数据范围包括了后面等级批量值之和的范围:
\[2^{n+1}>2^n+...+2+1 \]当等级批量值\(2^{n+1}\)批量尝试发生异常时,设置bError为true,然后如果\(2^n\)批量处理成功,意味着后面\(2^n\)批量必然失败,因此必须降级,才可能避免失败;当然如果前\(2^n\)批量处理失败,意味着异常数据至少在这些数据中存在,也需要降级尝试,才可能避免失败。
2.2、算法代码
算法代码使用java语言,很容易转成python或其它语言,代码在github上:https://github.com/alabo1999/framework_algoset/commit/dacefcbc9bf2ad95c05ab7671a12054178e7f90e 包含一个算法类文件BatchProcess.java和单元测试文件BatchProcessTest.java,其中BatchProcess.java只有一个方法,该方法的接口形式如下:
/** * @methodName : varStepBatchProcess * @description : 可变步长的批量处理算法 * @param <T> : 泛型类型 * @param object : 提供batchProcMethod和singleProcMethod方法的类对象 * @param dataRowList : 待处理的T类型对象数据列表 * @param normalList : 正常处理的对象列表 * @param correctList : 修改处理的对象列表 * @param batchProcMethod : 正常批量处理的方法 * @param singleProcMethod : 单条修正处理的方法 * @param debugLevel : 调试信息输出设置,bit0-输出修正处理信息,bit1-输出详细步骤,bit2:输出尝试次数 * @return : 处理过程产生的异常日志列表 */ public static <T> List<String> varStepBatchProcess( Object object, List<T> dataRowList, List<T> normalList, List<T> correctList, Method batchProcMethod, Method singleProcMethod, int debugLevel);
2.3、算法测试
单元测试文件BatchProcessTest.java,给出了算法测试,代码如下:
package com.abc.example.service; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; import com.abc.example.common.impexp.BatchProcess; /** * @className : BatchProcessTest * @description : 批量处理测试类 * @summary : * @history : * ------------------------------------------------------------------------------ * date version modifier remarks * ------------------------------------------------------------------------------ * 2022/01/20 1.0.0 sheng.zheng 初版 * */ @RunWith(SpringRunner.class) @SpringBootTest @Transactional public class BatchProcessTest { @Test // 可变步长的批量处理算法测试 public void varStepBatchProcessTest() { // 构造待处理的数据,数据类型为String List<String> dataRowList = new ArrayList<String>(); int idx = 0; for (int i = 0; i < 1000; i++) { String str = ""; if (i % 129 == 0) { str = i + ".1"; idx ++; if (idx % 2 == 0) { str += "abc"; } }else { str = i + ""; } dataRowList.add(str); } System.out.println(dataRowList); // 调用算法 Method method1 = getMethodByName(this,"batchProcMethod"); Method method2 = getMethodByName(this,"singleProcMethod"); // 用于存放正常批量处理的数据 List<String> normalList = new ArrayList<String>(); // 用于存放修正处理的数据 List<String> correctList = new ArrayList<String>(); // 调用算法 List<String> errorList = BatchProcess.varStepBatchProcess(this, dataRowList, normalList, correctList, method1, method2,0x05); // 打印errorList System.out.println("errorList: " + errorList.toString()); // 打印correctList System.out.println("correctList: " + correctList.toString()); } // 构造批量处理的方法 // 将列表中字符串,批量转为整型,被反射调用,必须是public的 public void batchProcMethod(List<String> subDataList) { for (String item : subDataList) { Integer.valueOf(item); } } // 构造单记录处理的方法,被反射调用,必须是public的 public String singleProcMethod(Exception e,String item) { String errInfo = ""; try { Double.valueOf(item).intValue(); }catch(Exception ex) { errInfo = ex.getMessage(); } return errInfo; } // 根据方法名称获取方法对象 private Method getMethodByName(Object object,String methodName) { Class<?> class1 = object.getClass(); Method retItem = null; Method[] methods = class1.getMethods(); for (int i = 0; i < methods.length; i++) { Method item = methods[i]; // System.out.println(item.getName()); if (item.getName() == methodName) { retItem = item; break; } } return retItem; } }
构造了1000个字符串的列表,这里使用String来作为数据类型,除了下标为129的整数倍之外的值=i+""的字符串,其它的取值,如果为129的偶数倍,则为i+"0.1"的字符串;如果为129的奇数倍,则为i+”0.1abc“。
批量处理方法batchProcMethod,实现对当前批量数据的类型转换,将字符串转为整数,当数据列表包含.1或.1abc的异常数据时,Integer.valueOf(item);会抛出异常,导致该批量处理失败;
单数据修正方法singleProcMethod,实现了处理"0.1"之类的double型字符串转换成,但针对".1abc"后缀的数据仍然处理失败。
执行测试代码,输出结果如下:
single tryNum = 9, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true 0.1: can be fixed. single tryNum = 24, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true 129.1abc: can not be fixed. single tryNum = 39, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 258,bError=true 258.1: can be fixed. single tryNum = 54, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 387,bError=true 387.1abc: can not be fixed. single tryNum = 69, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 516,bError=true 516.1: can be fixed. single tryNum = 84, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 645,bError=true 645.1abc: can not be fixed. single tryNum = 99, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 774,bError=true 774.1: can be fixed. single tryNum = 114, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 903,bError=true 903.1abc: can not be fixed. tryNum: 120 errorList: [For input string: "129.1abc", For input string: "387.1abc", For input string: "645.1abc", For input string: "903.1abc"] correctList: [0.1, 258.1, 516.1, 774.1]
tryNum: 120,表示这个算法总共尝试了120次处理尝试,包括批量处理和修正处理。需要想要看每一次处理尝试的详细信息,将debugLevel设置为7,可以看到详细的输出(摘录部分):
batch tryNum = 1, levelIdx = 0,levelNum = 128,batchNum = 128,anchorIdx = 0,bError=false java.lang.reflect.InvocationTargetException batch tryNum = 2, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 0,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 3, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 0,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 4, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 0,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 5, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 0,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 6, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 0,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 7, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 0,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 8, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true java.lang.reflect.InvocationTargetException single tryNum = 9, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 0,bError=true 0.1: can be fixed. batch tryNum = 10, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 1,bError=false batch tryNum = 11, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 3,bError=false batch tryNum = 12, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 7,bError=false batch tryNum = 13, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 15,bError=false batch tryNum = 14, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 31,bError=false batch tryNum = 15, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 63,bError=false batch tryNum = 16, levelIdx = 0,levelNum = 128,batchNum = 128,anchorIdx = 127,bError=false java.lang.reflect.InvocationTargetException batch tryNum = 17, levelIdx = 1,levelNum = 64,batchNum = 64,anchorIdx = 127,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 18, levelIdx = 2,levelNum = 32,batchNum = 32,anchorIdx = 127,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 19, levelIdx = 3,levelNum = 16,batchNum = 16,anchorIdx = 127,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 20, levelIdx = 4,levelNum = 8,batchNum = 8,anchorIdx = 127,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 21, levelIdx = 5,levelNum = 4,batchNum = 4,anchorIdx = 127,bError=true java.lang.reflect.InvocationTargetException batch tryNum = 22, levelIdx = 6,levelNum = 2,batchNum = 2,anchorIdx = 127,bError=true batch tryNum = 23, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true java.lang.reflect.InvocationTargetException single tryNum = 24, levelIdx = 7,levelNum = 1,batchNum = 1,anchorIdx = 129,bError=true 129.1abc: can not be fixed.
3、算法在数据导入时的应用
批量处理方法batchProcMethod,实现insertItems语句即可,注意,不要使用try/catch,否则拦截了异常,算法会认为批量处理成功。
单数据修正方法singleProcMethod,实现updateItems语句即可,注意由于此时实现按特征字段的值,修改记录的导入字段的值,实际上只修改了一条记录,必须使用选择性字段更新方法,避免未导入字段被item对象的默认值所覆盖。
大致形式如下:
public <T> void batchProcMethod(List<T> subDataList) { xxxDao.insertItems(subDataList); } // 构造单记录处理的方法,被反射调用,必须是public的 public <T> String singleProcMethod(Exception e,T item) { String errInfo = ""; if (e instanceof DuplicateKeyException) { // 如果唯一键重复,则update // 根据导入字段列表,抽取相关字段 // String[] importFieldNames = new String[]{"fieldname1","fieldname2",...,"fieldnamen"}; // Map<String,Object> map = itemToMap(item,importFieldNames); xxxDao.updateItems(map) }else{ errInfo = e.getMessage(); } return errInfo; }
其中itemToMap方法,可使用反射方法,编写为静态方法,供所有实体类使用。
4、算法应用注意事项
算法代码在github上,公开状态,使用了apache license 2.0,如需使用,请按照许可证要求即可。
这篇关于一种基于二分法的变步长批量处理算法的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-28微服务架构中API版本控制的实践
- 2024-09-28AI给的和自己写的Python代码,都无法改变输入框的内容,替换也不行
- 2024-09-27Sentinel配置限流资料:新手入门教程
- 2024-09-27Sentinel配置限流资料详解
- 2024-09-27Sentinel限流资料:新手入门教程
- 2024-09-26Sentinel限流资料入门详解
- 2024-09-26Springboot框架资料:初学者入门教程
- 2024-09-26Springboot框架资料详解:新手入门教程
- 2024-09-26Springboot企业级开发资料:新手入门指南
- 2024-09-26SpringBoot企业级开发资料新手指南