package com.sundata.internalevaluation.calc.calcUnit; import cn.hutool.extra.spring.SpringUtil; import com.sundata.common.util.JsonUtil; import com.sundata.internalevaluation.calc.model.CalcResult; import com.sundata.internalevaluation.calc.model.CalcUnit; import com.sundata.internalevaluation.calc.model.finals.CalcType; import com.sundata.internalevaluation.calc.util.CalciteUtil; import com.sundata.internalevaluation.configuration.model.DataSourceModel; import com.sundata.internalevaluation.configuration.model.Interface; import com.sundata.internalevaluation.configuration.service.DataSourcesService; import com.sundata.internalevaluation.configuration.service.InterfaceService; import org.apache.calcite.adapter.file.JsonScannableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * Created by IntelliJ IDEA. * * @author JoeLazy * @date 2025-02-12 10:46:08 * @description: 数据来源计算单元 */ public class DataSourcesCalcUnit extends CalcUnit { private static final Logger log = LoggerFactory.getLogger(DataSourcesCalcUnit.class); final DataSourceModel dataSourcesModel; /** /** * 创建数据单元的绝对对象,对象必须包含如下参数 * * @param calcCode 计算对象编号 * @param calcName 计算对象名称 * @param calcType 计算类型 * @param initContext 计算单元初始化参数 */ public DataSourcesCalcUnit(String calcCode, String calcName,CalcType calcType, Map initContext, DataSourceModel dataSourcesModel) { super(calcCode, calcName, calcType, initContext); this.dataSourcesModel = dataSourcesModel; } /** * 判断是否已经计算过数据了 * * @param calculateInstanceNumber 计算流水号 * @return 是否计算过 true 计算过 false 没有计算过 */ @Override public boolean isCalcFinished(String calculateInstanceNumber) { return false; } /** * 初始化计算结果的方法,如果已经计算过,在实现过程中,应当在此方法中根据计算流水号重新初始化 resultContext 结果对象,为其他依赖对象做准备 * 若明明计算过本单元但再次计算时没有初始化该对象,则计算依赖出现问题无法定位与处理 * * @param calculateInstanceNumber 计算流水号 */ @Override public void initResultContext(String calculateInstanceNumber) { } /** * 根据节点配置获取源节点; * * @return 所有源头节点 */ @Override public List getSourceCalcUnits() { // 通过bean获取对象 DataSourcesService sourcesService = SpringUtil.getBean(DataSourcesService.class); InterfaceService sysInterfaceService = SpringUtil.getBean(InterfaceService.class); DataSourceModel selectModel = sourcesService.selectDetailData(this.dataSourcesModel.getDataSourcesNo()); // 记录所有源头节点 List allUnitList = new ArrayList<>(); List sysInterfaceModelList = new ArrayList<>(); // 接口源节点 if ("INTERFACE".equals(dataSourcesModel.getDataSourcesType())) { // List interfaceNos = selectModel.getRequestInterfaces(); // for ( String s : interfaceNos) { // sysInterfaceModelList.add(sysInterfaceService.getById(s)); // } sysInterfaceModelList.add(sysInterfaceService.getById(selectModel.getRequestInterface())); List interFaceList = sysInterfaceModelList.stream().map(interfaceModel -> new InterfaceCalcUnit(interfaceModel.getInterfaceNo(),interfaceModel.getInterfaceName(),CalcType.INTERFACE,Map.of(),interfaceModel)).collect(Collectors.toList()); allUnitList.addAll(interFaceList); } // 查询逻辑源节点 if ("JDBC".equals(dataSourcesModel.getDataSourcesType())) { List queryList = dataSourcesModel.getQueryLogic().stream().map(queryParam -> new QueryLogicCalcUnit(queryParam.getDataItemName(),queryParam.getScriptDescription(),CalcType.QUERYLOGIC,Map.of(),queryParam)).collect(Collectors.toList()); allUnitList.addAll(queryList); } return allUnitList; } /** * 计算之后的方法,可实现为空 * * @param context */ @Override public void afterCalc(Map context) { } /** * 计算之前,可实现空 * * @param context */ @Override public void beforeCalc(Map context) { } /** * 必须实现的主体计算内容 * * @param thisResult 本计算单元的结果 * @param calculateInstanceNumber 计算流水号 * @param context 节点计算参数清单 * @param sourceResults 源头计算节点的结果 */ @Override public void calc(CalcResult thisResult, String calculateInstanceNumber, Map context, Map> sourceResults) { log.info("当前计算节点为:[{}-{}-{}],计算流水号为:{}", this.getCalcType(), this.getCalcCode(), this.getCalcName(), calculateInstanceNumber); sourceResults.forEach((calcUnit,result)->{ // 合并结果集 Map allMap = new HashMap<>(); if ("INTERFACE".equals(dataSourcesModel.getDataSourcesType())) { // 数据来源类型为接口时处理逻辑 if (calcUnit instanceof InterfaceCalcUnit ) { // 获取json String json = result.get(calcUnit.getCalcCode()) instanceof String ? (String) result.get(calcUnit.getCalcCode()) : ""; Map objectMap = JsonUtil.jsonToMap(json); // 调用utils类构建表 Map tableMap = CalciteUtil.createTableMap(objectMap); allMap.putAll(tableMap); // List parseList = JsonUtil.jsonToList(json); // // 结果封装 // thisResult.put(this.getCalcCode(), parseList); } } else if ("JDBC".equals(dataSourcesModel.getDataSourcesType())) { // 数据来源类型为jdbc时处理逻辑 if (calcUnit instanceof QueryLogicCalcUnit queryLogicCalcUnit) { // 声明本地表数据集合 Map localTable = new HashMap<>(); localTable.put(queryLogicCalcUnit.queryLogicModel.getDataItemName(),result.get(queryLogicCalcUnit.queryLogicModel.getDataItemName())); // 调用util类构造表 Map tableMap = CalciteUtil.createTableMap(localTable); allMap.putAll(tableMap); // 结果封装,数据项名做key // thisResult.put(queryLogicCalcUnit.queryLogicModel.getDataItemName(), result.get(queryLogicCalcUnit.getCalcCode())); } } // 将构造好的table表结果封装 thisResult.put(this.getCalcCode(),allMap); }); } }