CalcRecursiveTask.java 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package com.sundata.internalevaluation.calc.running.threads;
  2. import cn.hutool.core.date.DateUtil;
  3. import cn.hutool.core.date.TimeInterval;
  4. import com.sundata.internalevaluation.calc.model.CalcResult;
  5. import com.sundata.internalevaluation.calc.model.CalcUnit;
  6. import org.jgrapht.Graph;
  7. import org.jgrapht.graph.DefaultEdge;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. import java.util.concurrent.ExecutionException;
  14. import java.util.concurrent.Future;
  15. import java.util.concurrent.RecursiveTask;
  16. /**
  17. * 分解用的任务类,结果有 ClacResult 组成
  18. * @author JoeLazy
  19. */
  20. public class CalcRecursiveTask extends RecursiveTask<CalcResult<String, Object>> {
  21. private static final Logger log = LoggerFactory.getLogger(CalcRecursiveTask.class);
  22. private final String calculateInstanceNumber;
  23. private final CalcUnit calcUnit;
  24. private final ConcurrentHashMap<String, Object> content;
  25. private final Graph<CalcUnit, DefaultEdge> graph;
  26. private final ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results;
  27. private ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults;
  28. private final ConcurrentHashMap<CalcUnit, Future<CalcResult<String, Object>>> futureResults;
  29. public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap<String, Object> content, Graph<CalcUnit, DefaultEdge> graph, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults) {
  30. this.calculateInstanceNumber = calculateInstanceNumber;
  31. this.calcUnit = calcUnit;
  32. this.content = content;
  33. this.graph = graph;
  34. this.results = results;
  35. this.childResults = childResults;
  36. futureResults = new ConcurrentHashMap<>();
  37. }
  38. public CalcRecursiveTask(String calculateInstanceNumber, CalcUnit calcUnit, ConcurrentHashMap<String, Object> content, Graph<CalcUnit, DefaultEdge> graph, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> results, ConcurrentHashMap<CalcUnit, CalcResult<String, Object>> childResults,ConcurrentHashMap<CalcUnit, Future<CalcResult<String, Object>>> futureResults){
  39. this.calculateInstanceNumber = calculateInstanceNumber;
  40. this.calcUnit = calcUnit;
  41. this.content = content;
  42. this.graph = graph;
  43. this.results = results;
  44. this.childResults = childResults;
  45. this.futureResults = futureResults;
  46. }
  47. /**
  48. * The main computation performed by this task.
  49. *
  50. * @return the result of the computation
  51. */
  52. @Override
  53. protected CalcResult<String, Object> compute() {
  54. if (this.futureResults.get(this.calcUnit) == null) {
  55. futureResults.put(this.calcUnit,this);
  56. }else{
  57. try {
  58. return this.futureResults.get(this.calcUnit).get();
  59. } catch (InterruptedException|ExecutionException e) {
  60. log.error(e.getMessage(), e);
  61. }
  62. }
  63. TimeInterval interval = new TimeInterval();
  64. interval.start();
  65. log.info("计算任务开始[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
  66. // if (results.containsKey(calcUnit)) {
  67. // log.info("已经计算过,不再计算[计算流水号:{},计算单元:{}-{}-{}]",calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
  68. // return calcUnit.getResultContext();
  69. // }
  70. List<CalcUnit> predecessors = graph.incomingEdgesOf(calcUnit)
  71. .stream()
  72. .map(graph::getEdgeSource)
  73. .toList();
  74. // 如果有来源节点,则去计算来源节点并合并计算内容,如果没有来源节点,则处理
  75. if (!predecessors.isEmpty()) {
  76. log.debug("存在子节点,处理子节点");
  77. childResults = new ConcurrentHashMap<>();
  78. List<CalcRecursiveTask> tasks = predecessors.stream()
  79. .map(predecessor -> new CalcRecursiveTask(calculateInstanceNumber, predecessor, content, graph, results, childResults,this.futureResults))
  80. .toList();
  81. invokeAll(tasks);
  82. for (int i = 0; i < predecessors.size(); i++) {
  83. CalcRecursiveTask task = tasks.get(i);
  84. // task.fork();
  85. childResults.put(predecessors.get(i), task.invoke());
  86. }
  87. }
  88. ;// 假设节点名称是一个整数
  89. CalcResult<String, Object> result = calcUnit.getResultContextFuture(calculateInstanceNumber, content, childResults);
  90. results.put(calcUnit, result);
  91. log.info("计算完成,耗时:[{}][计算流水号:{},计算单元:{}-{}-{}]", DateUtil.formatBetween(interval.interval()),calculateInstanceNumber, calcUnit.getCalcType(),calcUnit.getCalcCode(), calcUnit.getCalcName());
  92. return result;
  93. }
  94. public Map<String, Object> getContent() {
  95. return content;
  96. }
  97. public String getCalculateInstanceNumber() {
  98. return calculateInstanceNumber;
  99. }
  100. }