package com.sundata.internalevaluation.calc.running.threads; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.TimeInterval; import com.sundata.internalevaluation.calc.model.CalcResult; import com.sundata.internalevaluation.calc.model.CalcUnit; import org.jgrapht.Graph; import org.jgrapht.graph.DefaultEdge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * 分解用的任务类,结果有 ClacResult 组成 * @author JoeLazy */ public class CalcRecursiveTask extends RecursiveTask> { private static final Logger log = LoggerFactory.getLogger(CalcRecursiveTask.class); private final String calculateInstanceNumber; private final CalcUnit calcUnit; private final ConcurrentHashMap content; private final Graph graph; private final ConcurrentHashMap> results; private ConcurrentHashMap> childResults; private final ConcurrentHashMap>> futureResults; public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap content, Graph graph, ConcurrentHashMap> results, ConcurrentHashMap> childResults) { this.calculateInstanceNumber = calculateInstanceNumber; this.calcUnit = calcUnit; this.content = content; this.graph = graph; this.results = results; this.childResults = childResults; futureResults = new ConcurrentHashMap<>(); } public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap content, Graph graph, ConcurrentHashMap> results, ConcurrentHashMap> childResults,ConcurrentHashMap>> futureResults){ this.calculateInstanceNumber = calculateInstanceNumber; this.calcUnit = calcUnit; this.content = content; this.graph = graph; this.results = results; this.childResults = childResults; this.futureResults = futureResults; } /** * The main computation performed by this task. * * @return the result of the computation */ @Override protected CalcResult compute() { if (this.futureResults.get(this.calcUnit) == null) { futureResults.put(this.calcUnit,this); }else{ try { return this.futureResults.get(this.calcUnit).get(); } catch (InterruptedException|ExecutionException e) { log.error(e.getMessage(), e); } } TimeInterval interval = new TimeInterval(); interval.start(); log.info("计算任务开始[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName()); // if (results.containsKey(calcUnit)) { // log.info("已经计算过,不再计算[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName()); // return calcUnit.getResultContext(); // } List predecessors = graph.incomingEdgesOf(calcUnit) .stream() .map(graph::getEdgeSource) .toList(); // 如果有来源节点,则去计算来源节点并合并计算内容,如果没有来源节点,则处理 if (!predecessors.isEmpty()) { log.debug("存在子节点,处理子节点"); childResults = new ConcurrentHashMap<>(); List tasks = predecessors.stream() .map(predecessor -> new CalcRecursiveTask(calculateInstanceNumber, predecessor, content, graph, results, childResults,this.futureResults)) .toList(); invokeAll(tasks); for (int i = 0; i < predecessors.size(); i++) { CalcRecursiveTask task = tasks.get(i); // task.fork(); childResults.put(predecessors.get(i), task.invoke()); } } ;// 假设节点名称是一个整数 CalcResult result = calcUnit.getResultContextFuture(calculateInstanceNumber, content, childResults); results.put(calcUnit, result); log.info("计算完成,耗时:[{}][计算流水号:{},计算单元:{}-{}-{}]", DateUtil.formatBetween(interval.interval()),calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName()); return result; } public Map getContent() { return content; } public String getCalculateInstanceNumber() { return calculateInstanceNumber; } }