多线程导入数据到mysql 一
2021/12/19 2:20:44
本文主要是介绍多线程导入数据到mysql 一,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
导入入口controller
import com.hanxiaozhang.dictonecode.domain.TestDO; import com.hanxiaozhang.dictonecode.service.TestService; import com.hanxiaozhang.utils.EntityListToExcelUtil; import com.hanxiaozhang.utils.JsonUtil; import com.hanxiaozhang.utils.R; import com.hanxiaozhang.utils.StringUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.InvocationTargetException; import java.util.Map; /** * 〈一句话功能简述〉<br> * 〈〉 * */ @Slf4j @Controller @RequestMapping() public class TestController { @Autowired private TestService testService; @GetMapping public String excelTest(){ return "importExcel"; } @ResponseBody @PostMapping("/importExcel") public R importExcel(@RequestParam(value = "file") MultipartFile file) { if (file == null) { return R.error(1, "文件不能为空"); } if (StringUtil.isBlank(file.getOriginalFilename()) || file.getSize() == 0) { return R.error(1, "文件不能为空"); } long startTime = System.currentTimeMillis(); log.info("Excel开始导入,logId:[{}]", startTime); //数据导入处理 R r = testService.importExcel(file); if ("1".equals(r.get("code").toString())) { Map<String, Object> map = (Map) r.get("map"); map.put("logId",startTime); log.info("Excel导入出错,logId:[{}]", startTime); return R.error(1, map, "导入时有错误信息"); } long endTime = System.currentTimeMillis(); log.info("Excel导入成功,logId:[{}],导入Excel耗时(ms):[{}]", startTime,endTime-startTime); return r; } @ResponseBody @PostMapping("/exportExcel") public void exportExcel(@RequestParam("data") String data, HttpServletResponse response) throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { //将Json字符串转Map Map<String, Object> params = JsonUtil.jsonToMapSO(data); log.info("Excel导出错误信息,logId:[{}]", params.get("logId").toString()); //response设置返回类型 setDownloadExcelResponse(response, params.get("fileName").toString()); //数据导出为excel EntityListToExcelUtil.getInstance(). executeXLSX(JsonUtil.jsonToLinkedHashMapSS(params.get("title").toString()), JsonUtil.jsonToList(params.get("errorData").toString(), TestDO.class), response.getOutputStream()); } /** * 设置下载文件响应信息 * * @param response * @param fileName */ private void setDownloadExcelResponse(HttpServletResponse response, String fileName) { try { fileName = new String(fileName.getBytes(), "ISO8859-1"); } catch (UnsupportedEncodingException e) { log.error("该文件[{}]不支持此编码转换,异常消息:[{}]",fileName,e.getMessage()); } response.setContentType("application/vnd.ms-excel;charset=UTF-8"); response.setHeader("Content-Disposition", "attachment;filename=" + fileName); //使用Content-Disposition,一定要确保没有禁止浏览器缓存的操作 response.setHeader("Pragma", "No-cache"); response.setHeader("Cache-Control", "No-cache"); response.setDateHeader("Expires", 0); } }
导入模板
/** * 〈一句话功能简述〉<br> * 〈数据字典数据导入〉 */ @Slf4j public abstract class SaveExcelTemplateHandle<T> implements SaveExcelService<T> { @Override @Transactional(rollbackFor = Exception.class) public ErrorInfoEntity batchSave(List<T> list, MultiThreadEndFlag flag) throws Exception { int resultFlag = 0; try { log.info("batchSave(),当前线程名称:[{}]", Thread.currentThread().getName()); //创建返回错误信息实体 ErrorInfoEntity errorInfoEntity = new ErrorInfoEntity(); //业务操作 List<T> errorList = handle(list); //赋值错误数据 errorInfoEntity.setErrorList(errorList); //操作成功 resultFlag = 1; //等待其他线程完成操作 flag.waitForEnd(resultFlag); //其他线程异常手工回滚 if (resultFlag == 1 && !flag.allSuccessFlag()) { String message = "子线程未全部执行成功,对线程[" + Thread.currentThread().getName() + "]进行回滚"; log.info(message); throw new Exception(message); } return errorInfoEntity; } catch (Exception e) { log.info("batchSave() error,当前线程名称:[{}]", Thread.currentThread().getName()); log.error(e.toString()); throw e; } finally { //本身线程异常抛出异常,并且没有调用flag.waitForEnd()时触发 if (resultFlag == 0) { flag.waitForEnd(resultFlag); } } } protected abstract List<T> handle(List<T> list); }
import com.hanxiaozhang.sourcecode.executor.MyThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import static java.util.concurrent.Executors.*; /** * 〈一句话功能简述〉<br> * 〈导入Excel执行器〉 * <p> * */ @Slf4j public class ImportExcelExecutor { private final static int MAX_THREAD = 10; /** * 执行方法(分批创建子线程) * * @param saveService 保存的服务 * @param lists 数据List * @param groupLen 分组的长度 * @return * @throws ExecutionException * @throws InterruptedException */ public static <T> List<T> execute(SaveExcelService<T> saveService, List<T> lists, int groupLen) throws ExecutionException, InterruptedException { if (lists == null || lists.size() == 0) { return null; } List<T> errorList = new ArrayList<>(); //创建一个固定线程池 ExecutorService executorService = new MyThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), defaultThreadFactory(), new MyThreadPoolExecutor.MyAbortPolicy()); //创建一个Future集合 List<Future<ErrorInfoEntity>> futures = new ArrayList<>(); //集合的元素个数 int size = lists.size(); //适配线程池数与分组长度 //Math.ceil()对小数向下“”取整” int batches = (int) Math.ceil(size * 1.0 / groupLen); //分组超长最大线程限制,则设置分组数为最大线程限制,计算分组集合尺寸 if (batches > MAX_THREAD) { batches = MAX_THREAD; groupLen = (int) Math.ceil(size * 1.0 / batches); } log.info("总条数:[{}],批次数量:[{}],每批数据量:[{}]", size, batches, groupLen); MultiThreadEndFlag flag = new MultiThreadEndFlag(batches); int startIndex, toIndex, maxIndex = lists.size(); for (int i = 0; i < batches; i++) { //开始索引位置 startIndex = i * groupLen; //截止索引位置 toIndex = startIndex + groupLen; //如果截止索引大于最大索引,截止索引等于最大索引 if (toIndex > maxIndex) { toIndex = maxIndex; } //截取数组 List<T> temp = lists.subList(startIndex, toIndex); if (temp == null || temp.size() == 0) { continue; } futures.add(executorService.submit(new ImportExcelTask(saveService, temp, flag))); } flag.end(); //子线程全部等待返回(存在异常,则直接抛向主线程) for (Future<ErrorInfoEntity> future : futures) { errorList.addAll(future.get().getErrorList()); } //所有线程返回后,关闭线程池 executorService.shutdown(); return errorList; } private static int getPoolInfo(ThreadPoolExecutor tpe) { int queueSize = tpe.getQueue().size(); System.out.println("当前排队线程数:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("当前活动线程数:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("执行完成线程数:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("总线程数:" + taskCount); //线程池中当前线程的数量,为0时意味着没有任何线程,线程池会终止,此值不会超过MaximumPoolSize System.out.println("当前线程的数量:" + tpe.getPoolSize()); //线程池的初始线程数量(当没有任务提交,或提交任务数小于此值值,实际并不会产生那么多线程数) System.out.println("线程池的初始线程数量:" + tpe.getCorePoolSize()); //线程池可允许最大的线程数 System.out.println("线程池可允许最大的线程数" + tpe.getMaximumPoolSize()); tpe.getLargestPoolSize(); return queueSize; } }
import java.util.List; import java.util.concurrent.Callable; /** * 〈功能描述〉<br> * 〈导入Excel任务〉 * */ public class ImportExcelTask<T> implements Callable<ErrorInfoEntity> { /** * 保存Excel服务 */ private SaveExcelService excelService; /** * 数据集合 */ private List<T> list; /** * 多线程数据结束标志 */ private MultiThreadEndFlag flag; /** * 构造函数 * * @param excelService * @param list * @param flag */ public ImportExcelTask(SaveExcelService<T> excelService,List<T> list,MultiThreadEndFlag flag){ this.excelService=excelService; this.list=list; this.flag=flag; } @Override public ErrorInfoEntity call() throws Exception { return excelService.batchSave(list,flag); } }
import lombok.extern.slf4j.Slf4j; /** * 功能描述: <br> * 〈多线程结束标志〉 * */ @Slf4j public class MultiThreadEndFlag { /** * 是否解除等待 */ private volatile boolean releaseWaitFlag = false; /** * 是否全部执行成功 */ private volatile boolean allSuccessFlag = false; /** * 线程个数 */ private volatile int threadCount = 0; /** * 失败个数 */ private volatile int failCount = 0; /** * 失败个数 */ private int count = 0; /** * 初始化子线程的总数 * * @param count */ public MultiThreadEndFlag(int count) { threadCount = count; } public boolean allSuccessFlag() { return allSuccessFlag; } /** * 等待全部结束 * * @param resultFlag */ public synchronized void waitForEnd(int resultFlag) { //统计失败的线程个数 if (resultFlag == 0) { failCount++; } threadCount--; // log.info("waitForEnd(),等待全部结束:[{}],[{}]", threadCount, Thread.currentThread().getName()); while (!releaseWaitFlag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 执行结束通知 */ public synchronized void go() { releaseWaitFlag = true; //结果都显示成功 allSuccessFlag = (failCount == 0); notifyAll(); } /** * 等待结束 */ public void end() { while (threadCount > 0) { waitFunc(50); } log.info("线程全部执行完毕通知"); go(); } /** * 等待 */ private void waitFunc(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
import java.util.List; /** * 〈一句话功能简述〉<br> * 〈保存ExcelService〉 * */ public interface SaveExcelService<T> { /** * 批量保存 * * @param list * @param flag * @return * @throws Exception */ ErrorInfoEntity batchSave(List<T> list, MultiThreadEndFlag flag) throws Exception; }
@Slf4j @Service public class TestSaveExcelServiceImpl extends SaveExcelTemplateHandle<TestDO> { @Resource private TestDao testDao; /** * 处理相关数据(抛异常则全部回滚,否则则返回前端未上传列表) * 业务类 * @param list * @return */ protected List<TestDO> handle(List<TestDO> list) { List<TestDO> errorList = new ArrayList<>(); list.forEach(x -> { boolean flag=true; List<String> errorMsg=new ArrayList<>(); //模拟一个业务数据错误,姓名不能为空 if (StringUtil.isBlank(x.getName())) { errorMsg.add("姓名不能为空!"); flag=false; throw new RuntimeException(); } //模拟一个业务数据错误,类型不能为空 if (StringUtil.isBlank(x.getType())){ errorMsg.add("类型不能为空!"); flag=false; } if (flag){ testDao.save(x); }else { x.setRemarks(String.join("\n",errorMsg)); errorList.add(x); } }); return errorList; } }
import com.hanxiaozhang.dictonecode.dao.TestDao; import com.hanxiaozhang.dictonecode.domain.TestDO; import com.hanxiaozhang.dictonecode.service.TestService; import com.hanxiaozhang.importexcel.TestSaveExcelServiceImpl; import com.hanxiaozhang.importexcel.ImportExcelExecutor; import com.hanxiaozhang.importexcelnew.TestSaveExcelNewServiceImpl; import com.hanxiaozhang.utils.ExcelToEntityListUtil; import com.hanxiaozhang.utils.R; import lombok.extern.slf4j.Slf4j; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; import javax.annotation.Resource; import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; @Slf4j @Service public class TestServiceImpl implements TestService { @Resource private TestDao testDao; @Autowired private TestSaveExcelServiceImpl testSaveExcelService; @Autowired private TestSaveExcelNewServiceImpl dictSaveExcelNewService; @Override public R importExcel(MultipartFile file) { try { //读取Excel中数据 ArrayList<TestDO> list = ExcelToEntityListUtil.getInstance().execute(TestDO.class, file.getInputStream(), initTitleToAttr()); log.info("读取Excel中数据的条数:[{}]",list.size()); //多线程处理数据,并导出错误数据 List<TestDO> errorList = ImportExcelExecutor.execute(testSaveExcelService, list, 14); // List<TestDO> errorList = ImportExcelNewExecutor.execute(dictSaveExcelNewService, list, 14); //封装错误数据 if (errorList!=null&&!errorList.isEmpty()) { Map<String, Object> map = new HashMap<String, Object>(); map.put("errorData", errorList); map.put("title", initAttrToTitle()); map.put("fileName", "有问题数据.xlsx"); return R.error(map); } } catch (IOException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvalidFormatException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return R.ok(); } private Map<String,String> initTitleToAttr(){ Map<String, String> map = new LinkedHashMap<>(8); map.put("姓名","name"); map.put("值","value"); map.put("类型","type"); map.put("描述","description"); map.put("时间","createDate"); return map; } private Map<String,String> initAttrToTitle(){ Map<String, String> map = new LinkedHashMap<>(8); map.put("name","姓名"); map.put("value","值"); map.put("type","类型"); map.put("description","描述"); map.put("createDate","时间"); map.put("remarks","数据问题备注"); return map; } }
public interface TestService { R importExcel(MultipartFile file); }
这篇关于多线程导入数据到mysql 一的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16MySQL资料:新手入门教程
- 2024-11-16MySQL资料:新手入门教程
- 2024-11-15MySQL教程:初学者必备的MySQL数据库入门指南
- 2024-11-15MySQL教程:初学者必看的MySQL入门指南
- 2024-11-04部署MySQL集群项目实战:新手入门教程
- 2024-11-04如何部署MySQL集群资料:新手入门指南
- 2024-11-02MySQL集群项目实战:新手入门指南
- 2024-11-02初学者指南:部署MySQL集群资料
- 2024-11-01部署MySQL集群教程:新手入门指南
- 2024-11-01如何部署MySQL集群:新手入门教程