IndexCalcUnit.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package com.sundata.internalevaluation.calc.calcUnit;
  2. import cn.hutool.extra.spring.SpringUtil;
  3. import com.sundata.common.util.JsonUtil;
  4. import com.sundata.internalevaluation.calc.model.CalcResult;
  5. import com.sundata.internalevaluation.calc.model.CalcUnit;
  6. import com.sundata.internalevaluation.calc.model.finals.CalcType;
  7. import com.sundata.internalevaluation.conf.JsonToCalciteExample;
  8. import com.sundata.internalevaluation.configuration.model.DataSetConfigModel;
  9. import com.sundata.internalevaluation.configuration.model.IndexConfigModel;
  10. import com.sundata.internalevaluation.configuration.model.IndexSourceModel;
  11. import com.sundata.internalevaluation.configuration.service.DataSetConfigService;
  12. import com.sundata.internalevaluation.configuration.service.IndexConfigService;
  13. import com.sundata.internalevaluation.script.ScriptUtil;
  14. import com.sundata.internalevaluation.script.TemplateUtil;
  15. import org.apache.calcite.jdbc.CalciteConnection;
  16. import org.apache.calcite.schema.SchemaPlus;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import java.sql.ResultSet;
  20. import java.sql.SQLException;
  21. import java.sql.Statement;
  22. import java.util.*;
  23. import java.util.concurrent.atomic.AtomicInteger;
  24. import java.util.stream.Collectors;
  25. public class IndexCalcUnit extends CalcUnit {
  26. private final IndexConfigModel indexConfigModel;
  27. private static final Logger log = LoggerFactory.getLogger(IndexCalcUnit.class);
  28. // 接收表集
  29. private Set<String> tableSet = null;
  30. /**
  31. * 创建数据单元的绝对对象,对象必须包含如下参数
  32. *
  33. * @param calcCode 计算对象编号
  34. * @param calcName 计算对象名称
  35. * @param initContext 计算单元初始化参数
  36. */
  37. public IndexCalcUnit(String calcCode, String calcName, Map<String, Object> initContext, IndexConfigModel indexConfigModel) {
  38. super(calcCode, calcName, CalcType.INDEX, initContext);
  39. this.indexConfigModel = indexConfigModel;
  40. }
  41. /**
  42. * 判断是否已经计算过数据了
  43. *
  44. * @param calculateInstanceNumber 计算流水号
  45. * @return 是否计算过 true 计算过 false 没有计算过
  46. */
  47. @Override
  48. public boolean isCalcFinished(String calculateInstanceNumber) {
  49. // TODO 计算是否已经计算
  50. return false;
  51. // return DataImages.indexCalcUnitHashMap.containsKey(calculateInstanceNumber) && DataImages.indexCalcUnitHashMap.get(calculateInstanceNumber).stream().anyMatch(a -> a.getCalcCode().equals(this.getCalcCode()));
  52. }
  53. /**
  54. * 初始化计算结果的方法,如果已经计算过,在实现过程中,应当在此方法中根据计算流水号重新初始化 resultContext 结果对象,为其他依赖对象做准备
  55. * 若明明计算过本单元但再次计算时没有初始化该对象,则计算依赖出现问题无法定位与处理
  56. *
  57. * @param calculateInstanceNumber 计算流水号
  58. */
  59. @Override
  60. public void initResultContext(String calculateInstanceNumber) {
  61. // TODO 初始化
  62. // List<IndexCalcUnit> indexCalcUnits = new ArrayList<>();
  63. //// List<IndexCalcUnit> indexCalcUnits = DataImages.indexCalcUnitHashMap.get(calculateInstanceNumber);
  64. // // 筛选并查找对象,如果找不到则报错
  65. // if (indexCalcUnits.stream().noneMatch(a -> a.getCalcCode().equals(this.getCalcCode()))) {
  66. // throw new CalcException(calculateInstanceNumber, StrUtil.format("无法找到已计算完成的结果,计算单元编号:{},计算流水号为:{}。", this.getCalcCode(), calculateInstanceNumber));
  67. // }
  68. // this.setResultContext(indexCalcUnits.stream().filter(a -> a.getCalcCode().equals(this.getCalcCode())).findFirst().get().getResultContext());
  69. }
  70. /**
  71. * 根据节点配置获取源节点;
  72. *
  73. * @return 所有源头节点
  74. */
  75. @Override
  76. public List<CalcUnit> getSourceCalcUnits() {
  77. // TODO 获取源头节点
  78. DataSetConfigService dataSetService = SpringUtil.getBean(DataSetConfigService.class);
  79. IndexConfigService indexConfigService = SpringUtil.getBean(IndexConfigService.class);
  80. List<IndexSourceModel> selectList = indexConfigService.getIndexSourceList(this.indexConfigModel);
  81. // 定义存放数据集和其他指标的集合
  82. List<IndexConfigModel> indexList = new ArrayList<>();
  83. List<DataSetConfigModel> dataSetList = new ArrayList<>();
  84. // 查询个指标编号和数据集对应的数据
  85. for ( IndexSourceModel m : selectList) {
  86. if ("DATASET".equals(m.getDataSourceType())) {
  87. DataSetConfigModel conditionModel = new DataSetConfigModel();
  88. conditionModel.setDataSetNo(m.getDataSetNo());
  89. DataSetConfigModel datasetModel = dataSetService.selectDetailData(conditionModel);
  90. dataSetList.add(datasetModel);
  91. } else if ("INDEX".equals(m.getDataSourceType())) {
  92. IndexConfigModel conditionModel = new IndexConfigModel();
  93. conditionModel.setIndexNo(m.getOtherIndexNo());
  94. IndexConfigModel indexConfigModel = indexConfigService.selectDetailData(conditionModel);
  95. indexList.add(indexConfigModel);
  96. }
  97. }
  98. List<CalcUnit> indexCalcList = null;
  99. List<CalcUnit> dataSetCalcList = null;
  100. List<CalcUnit> resultCalcMap = new ArrayList<>();
  101. // 数据源头节点生成
  102. if (!indexList.isEmpty()) {
  103. indexCalcList = indexList.stream().map(indexConfigModel -> new IndexCalcUnit
  104. (indexConfigModel.getIndexNo(),indexConfigModel.getIndexName(),Map.of(),indexConfigModel)).collect(Collectors.toList());
  105. }
  106. if (!dataSetList.isEmpty()) {
  107. dataSetCalcList = dataSetList.stream().map(dataSetModel -> new DataSetCalcUnit
  108. (dataSetModel.getDataSetNo(),dataSetModel.getDataSetName(),Map.of(),dataSetModel)).collect(Collectors.toList());
  109. }
  110. // selectList.stream().map(sourceModel -> new IndexSourceCalcUnit
  111. // (sourceModel.getOtherIndexNo(),"",CalcType.INDEX,Map.of(),sourceModel));
  112. // 结果封装
  113. if (null != indexCalcList) {
  114. resultCalcMap.addAll(indexCalcList);
  115. }
  116. if (null != dataSetCalcList) {
  117. resultCalcMap.addAll(dataSetCalcList);
  118. }
  119. return resultCalcMap;
  120. // return ConfigImages.indexCalcUnitListMap.get(this);
  121. }
  122. /**
  123. * 计算之后的方法,可实现为空
  124. *
  125. * @param context 计算参数过程数据
  126. */
  127. @Override
  128. public void afterCalc(Map<String, Object> context) {
  129. log.debug("计算之后的参数结构:{}",context);
  130. }
  131. /**
  132. * 计算之前,可实现空
  133. *
  134. * @param context 计算参数过程数据
  135. */
  136. @Override
  137. public void beforeCalc(Map<String, Object> context) {
  138. log.debug("计算之前的参数结构:{}",context);
  139. }
  140. /**
  141. * 必须实现的主体计算内容
  142. *
  143. * @param context 节点计算参数清单
  144. * @param sourceResults 整个计算过程中的节点结果
  145. */
  146. @Override
  147. public void calc(final CalcResult<String, Object> thisResult, String calculateInstanceNumber, Map<String, Object> context, Map<CalcUnit, CalcResult<String, Object>> sourceResults) {
  148. // TODO 实际的计算过程
  149. // 记录数据集出现次数
  150. AtomicInteger dataSetNumber = new AtomicInteger();
  151. // 记录其他指标出现次数
  152. AtomicInteger indexNumber = new AtomicInteger();
  153. // 声明只存放指标源头节点变量
  154. Map<String, Object> indexResult = new HashMap<>();
  155. // 声明只存放所有数据集源头节点变量
  156. Map<String, Object> dataSetResult = new HashMap<>();
  157. sourceResults.forEach((calcUnit,result) -> {
  158. // 数据集
  159. if ( calcUnit instanceof DataSetCalcUnit dataSetCalcUnit){
  160. if(dataSetCalcUnit.getCalcType() == CalcType.DATASET){
  161. dataSetNumber.getAndIncrement();
  162. dataSetResult.putAll((Map) result.get(calcUnit.getCalcCode()));
  163. }
  164. }
  165. // 其他指标
  166. if( calcUnit instanceof IndexCalcUnit indexCalcUnit){
  167. if(indexCalcUnit.getCalcType() == CalcType.INDEX){
  168. indexNumber.getAndIncrement();
  169. indexResult.putAll(result);
  170. }
  171. }
  172. });
  173. // TODO
  174. // TODO
  175. if (dataSetNumber.get() > 0 && indexNumber.get() == 0){
  176. String json = JsonUtil.toJSONString(dataSetResult);
  177. JsonToCalciteExample jsonUtil = new JsonToCalciteExample();
  178. // 如果数据集的数量大于0
  179. // 获取指标逻辑
  180. String logic = indexConfigModel.getIndexLogic();
  181. Object resValue = jsonUtil.jsonTotable(this.getCalcCode(),json,logic);
  182. // 执行sql,并放进结果集内
  183. // TODO 计算逻辑
  184. thisResult.put(this.getCalcCode(),resValue);
  185. }
  186. if(indexNumber.get() > 0 && dataSetNumber.get() == 0){
  187. // 如果指标的数量大于0
  188. // 获取指标逻辑
  189. String logic = indexConfigModel.getIndexLogic();
  190. // 执行公式,结果放入结果集
  191. Object result = ScriptUtil.executeScript(indexConfigModel.getIndexNo(),logic,indexResult);
  192. // JsonToCalciteExample.jsonTotable()
  193. thisResult.put(this.getCalcCode(),result);
  194. }
  195. if (dataSetNumber.get() > 0 && indexNumber.get() >0) {
  196. // 同时具有指标和数据集
  197. JsonToCalciteExample jsonUtil = new JsonToCalciteExample();
  198. // 获取指标逻辑
  199. String logic = indexConfigModel.getIndexLogic();
  200. // 先将sql中指标公式替换掉,再将得到的sql执行
  201. String editSql = TemplateUtil.execute(indexConfigModel.getIndexNo(),logic,indexResult);
  202. // 将数据集得到的数据转json
  203. String json = JsonUtil.toJSONString(dataSetResult);
  204. // 执行拿结果
  205. Object resValue = jsonUtil.jsonTotable(this.indexConfigModel.getIndexNo(),json,editSql);
  206. // 放结果
  207. thisResult.put(this.getCalcCode(),resValue);
  208. }
  209. }
  210. /**
  211. *
  212. * @param calciteConnection
  213. * @param sql
  214. * @return
  215. */
  216. public Object getConnectionTable (CalciteConnection calciteConnection,String sql) {
  217. //获取域
  218. SchemaPlus rootSchemafter = calciteConnection.getRootSchema();
  219. //根域中获取子域
  220. SchemaPlus childrenSchema=rootSchemafter.getSubSchema(this.getCalcCode());
  221. //子域名
  222. String childrenName = childrenSchema.getName();
  223. System.out.println("childrenName: " + childrenName);
  224. //子域中表名
  225. tableSet = childrenSchema.getTableNames();
  226. String value = "";
  227. try {
  228. Statement statement = calciteConnection.createStatement();
  229. ResultSet resultSet = statement.executeQuery(sql);
  230. if (resultSet.next()){
  231. //value = resultSet.getString(1);
  232. return resultSet.getObject(1);
  233. // System.out.println();
  234. }
  235. } catch (SQLException e) {
  236. throw new RuntimeException(e);
  237. }
  238. return null;
  239. }
  240. }