|
@@ -12,6 +12,8 @@ import org.slf4j.LoggerFactory;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.RecursiveTask;
|
|
|
|
|
|
/**
|
|
@@ -28,6 +30,7 @@ public class CalcRecursiveTask extends RecursiveTask<CalcResult<String, Object>>
|
|
|
private final Graph<CalcUnit, DefaultEdge> graph;
|
|
|
private final ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results;
|
|
|
private ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults;
|
|
|
+ private final ConcurrentHashMap<CalcUnit, Future<CalcResult<String, Object>>> futureResults;
|
|
|
|
|
|
public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap<String, Object> content, Graph<CalcUnit, DefaultEdge> graph, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults) {
|
|
|
this.calculateInstanceNumber = calculateInstanceNumber;
|
|
@@ -36,6 +39,16 @@ public class CalcRecursiveTask extends RecursiveTask<CalcResult<String, Object>>
|
|
|
this.graph = graph;
|
|
|
this.results = results;
|
|
|
this.childResults = childResults;
|
|
|
+ futureResults = new ConcurrentHashMap<>();
|
|
|
+ }
|
|
|
+ public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap<String, Object> content, Graph<CalcUnit, DefaultEdge> graph, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults,ConcurrentHashMap<CalcUnit, Future<CalcResult<String, Object>>> futureResults){
|
|
|
+ this.calculateInstanceNumber = calculateInstanceNumber;
|
|
|
+ this.calcUnit = calcUnit;
|
|
|
+ this.content = content;
|
|
|
+ this.graph = graph;
|
|
|
+ this.results = results;
|
|
|
+ this.childResults = childResults;
|
|
|
+ this.futureResults = futureResults;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -45,13 +58,23 @@ public class CalcRecursiveTask extends RecursiveTask<CalcResult<String, Object>>
|
|
|
*/
|
|
|
@Override
|
|
|
protected CalcResult<String, Object> compute() {
|
|
|
+ if (this.futureResults.get(this.calcUnit) == null) {
|
|
|
+ futureResults.put(this.calcUnit,this);
|
|
|
+ }else{
|
|
|
+ try {
|
|
|
+ return this.futureResults.get(this.calcUnit).get();
|
|
|
+ } catch (InterruptedException|ExecutionException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
TimeInterval interval = new TimeInterval();
|
|
|
interval.start();
|
|
|
log.info("计算任务开始[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
|
|
|
- if (results.containsKey(calcUnit)) {
|
|
|
- log.info("已经计算过,不再计算[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
|
|
|
- return results.get(calcUnit);
|
|
|
- }
|
|
|
+// if (results.containsKey(calcUnit)) {
|
|
|
+// log.info("已经计算过,不再计算[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
|
|
|
+// return calcUnit.getResultContext();
|
|
|
+// }
|
|
|
List<CalcUnit> predecessors = graph.incomingEdgesOf(calcUnit)
|
|
|
.stream()
|
|
|
.map(graph::getEdgeSource)
|
|
@@ -61,8 +84,9 @@ public class CalcRecursiveTask extends RecursiveTask<CalcResult<String, Object>>
|
|
|
log.debug("存在子节点,处理子节点");
|
|
|
childResults = new ConcurrentHashMap<>();
|
|
|
List<CalcRecursiveTask> tasks = predecessors.stream()
|
|
|
- .map(predecessor -> new CalcRecursiveTask(calculateInstanceNumber, predecessor, content, graph, results, childResults))
|
|
|
+ .map(predecessor -> new CalcRecursiveTask(calculateInstanceNumber, predecessor, content, graph, results, childResults,this.futureResults))
|
|
|
.toList();
|
|
|
+ invokeAll(tasks);
|
|
|
for (int i = 0; i < predecessors.size(); i++) {
|
|
|
CalcRecursiveTask task = tasks.get(i);
|
|
|
// task.fork();
|