package com.sundata.internalevaluation.conf; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.sundata.internalevaluation.calc.calcUnit.IndexCalcUnit; import com.sundata.internalevaluation.configuration.model.IndexConfigModel; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.file.JsonScannableTable; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Sources; import java.sql.*; import java.util.*; public class JsonToCalciteExample { public Object jsonTotable(String jsonDate, String sql) { // 1. JSON 数据 Object resValue = null; try{ // 2. 使用 Jackson 解析 JSON 数据 ObjectMapper objectMapper = new ObjectMapper(); JsonNode rootNode = objectMapper.readTree(jsonDate); // 3. 创建 Calcite Schema Connection connection = DriverManager.getConnection("jdbc:calcite:"); CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 注册 JSON 数据为表 rootNode.fields().forEachRemaining(entry -> { String schename =entry.getKey(); String schenameData=""; JsonNode schenameData1=null; try { schenameData = String.valueOf(entry.getValue()); schenameData1 =objectMapper.readTree(schenameData); } catch (JsonProcessingException e) { throw new RuntimeException(e); } SchemaPlus schema = rootSchema.add(schename,new AbstractSchema()); schenameData1.fields().forEachRemaining(entrynext -> { String tableName =entrynext.getKey(); JsonNode tableData =entrynext.getValue(); schema.add(tableName, new JsonScannableTable(Sources.of(tableData.toString()))); }); }); //调用张艳杰接收连接方法 resValue = getConnectionTable(calciteConnection,sql); // 关闭连接信息 connection.close(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } return resValue; } // 自定义表类实现 static class JsonTable extends AbstractTable { private final List> rows = new ArrayList<>(); public JsonTable(JsonNode data) { for (JsonNode row : data) { Map map = new HashMap<>(); row.fieldNames().forEachRemaining(field -> { map.put(field, row.get(field).isNumber() ? row.get(field).numberValue() : row.get(field).asText()); }); rows.add(map); } } @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { // 动态推断字段类型 if (rows.isEmpty()) { return typeFactory.createStructType(Collections.emptyList()); } Map firstRow = rows.get(0); List> fields = new ArrayList<>(); for (Map.Entry entry : firstRow.entrySet()) { SqlTypeName sqlType = (entry.getValue() instanceof Number) ? SqlTypeName.DECIMAL : SqlTypeName.VARCHAR; fields.add(Pair.of(entry.getKey(), typeFactory.createSqlType(sqlType))); } return typeFactory.createStructType(fields); } public Enumerable scan(DataContext root) { List result = new ArrayList<>(); for (Map row : rows) { result.add(row.values().toArray()); } return Linq4j.asEnumerable(result); } } /** * * @param calciteConnection * @param sql * @return */ public Object getConnectionTable (CalciteConnection calciteConnection,String sql) { //获取域 SchemaPlus rootSchemafter = calciteConnection.getRootSchema(); //根域中获取子域 SchemaPlus childrenSchema=rootSchemafter.getSubSchema(calciteConnection.getRootSchema().getName()); Object childrebnameobj=rootSchemafter.getSubSchemaNames(); //子域名 // String childrenName = childrenSchema.getName(); // System.out.println("childrenName: " + childrenName); // //子域中表名 // Set tableSet = childrenSchema.getTableNames(); // 定义返回结果 Object value = null; try { Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); if (resultSet.next()){ value = resultSet.getObject(1); } } catch (SQLException e) { e.printStackTrace(); throw new RuntimeException(e); } return value; } }