123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package com.sundata.internalevaluation.calc.running.threads;
- import cn.hutool.core.date.DateUtil;
- import cn.hutool.core.date.TimeInterval;
- import com.sundata.internalevaluation.calc.model.CalcResult;
- import com.sundata.internalevaluation.calc.model.CalcUnit;
- import org.jgrapht.Graph;
- import org.jgrapht.graph.DefaultEdge;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.RecursiveTask;
- /**
- * 分解用的任务类,结果有 ClacResult 组成
- * @author JoeLazy
- */
- public class CalcRecursiveTask extends RecursiveTask<CalcResult<String, Object>> {
- private static final Logger log = LoggerFactory.getLogger(CalcRecursiveTask.class);
- private final String calculateInstanceNumber;
- private final CalcUnit calcUnit;
- private final ConcurrentHashMap<String, Object> content;
- private final Graph<CalcUnit, DefaultEdge> graph;
- private final ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results;
- private ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults;
- private final ConcurrentHashMap<CalcUnit, Future<CalcResult<String, Object>>> futureResults;
- public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap<String, Object> content, Graph<CalcUnit, DefaultEdge> graph, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults) {
- this.calculateInstanceNumber = calculateInstanceNumber;
- this.calcUnit = calcUnit;
- this.content = content;
- this.graph = graph;
- this.results = results;
- this.childResults = childResults;
- futureResults = new ConcurrentHashMap<>();
- }
- public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap<String, Object> content, Graph<CalcUnit, DefaultEdge> graph, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults,ConcurrentHashMap<CalcUnit, Future<CalcResult<String, Object>>> futureResults){
- this.calculateInstanceNumber = calculateInstanceNumber;
- this.calcUnit = calcUnit;
- this.content = content;
- this.graph = graph;
- this.results = results;
- this.childResults = childResults;
- this.futureResults = futureResults;
- }
- /**
- * The main computation performed by this task.
- *
- * @return the result of the computation
- */
- @Override
- protected CalcResult<String, Object> compute() {
- if (this.futureResults.get(this.calcUnit) == null) {
- futureResults.put(this.calcUnit,this);
- }else{
- try {
- return this.futureResults.get(this.calcUnit).get();
- } catch (InterruptedException|ExecutionException e) {
- log.error(e.getMessage(), e);
- }
- }
- TimeInterval interval = new TimeInterval();
- interval.start();
- log.info("计算任务开始[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
- // if (results.containsKey(calcUnit)) {
- // log.info("已经计算过,不再计算[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
- // return calcUnit.getResultContext();
- // }
- List<CalcUnit> predecessors = graph.incomingEdgesOf(calcUnit)
- .stream()
- .map(graph::getEdgeSource)
- .toList();
- // 如果有来源节点,则去计算来源节点并合并计算内容,如果没有来源节点,则处理
- if (!predecessors.isEmpty()) {
- log.debug("存在子节点,处理子节点");
- childResults = new ConcurrentHashMap<>();
- List<CalcRecursiveTask> tasks = predecessors.stream()
- .map(predecessor -> new CalcRecursiveTask(calculateInstanceNumber, predecessor, content, graph, results, childResults,this.futureResults))
- .toList();
- invokeAll(tasks);
- for (int i = 0; i < predecessors.size(); i++) {
- CalcRecursiveTask task = tasks.get(i);
- // task.fork();
- childResults.put(predecessors.get(i), task.invoke());
- }
- }
- ;// 假设节点名称是一个整数
- CalcResult<String, Object> result = calcUnit.getResultContextFuture(calculateInstanceNumber, content, childResults);
- results.put(calcUnit, result);
- log.info("计算完成,耗时:[{}][计算流水号:{},计算单元:{}-{}-{}]", DateUtil.formatBetween(interval.interval()),calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
- return result;
- }
- public Map<String, Object> getContent() {
- return content;
- }
- public String getCalculateInstanceNumber() {
- return calculateInstanceNumber;
- }
- }
|