JsonToCalciteExample.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package com.sundata.internalevaluation.conf;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.JsonNode;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.sundata.internalevaluation.calc.calcUnit.IndexCalcUnit;
  6. import com.sundata.internalevaluation.configuration.model.IndexConfigModel;
  7. import org.apache.calcite.DataContext;
  8. import org.apache.calcite.adapter.file.JsonScannableTable;
  9. import org.apache.calcite.jdbc.CalciteConnection;
  10. import org.apache.calcite.linq4j.Enumerable;
  11. import org.apache.calcite.linq4j.Linq4j;
  12. import org.apache.calcite.rel.type.RelDataType;
  13. import org.apache.calcite.rel.type.RelDataTypeFactory;
  14. import org.apache.calcite.schema.SchemaPlus;
  15. import org.apache.calcite.schema.impl.AbstractSchema;
  16. import org.apache.calcite.schema.impl.AbstractTable;
  17. import org.apache.calcite.sql.type.SqlTypeName;
  18. import org.apache.calcite.util.Pair;
  19. import org.apache.calcite.util.Sources;
  20. import java.sql.*;
  21. import java.util.*;
  22. public class JsonToCalciteExample {
  23. public Object jsonTotable(String jsonDate, String sql) {
  24. // 1. JSON 数据
  25. Object resValue = null;
  26. try{
  27. // 2. 使用 Jackson 解析 JSON 数据
  28. ObjectMapper objectMapper = new ObjectMapper();
  29. JsonNode rootNode = objectMapper.readTree(jsonDate);
  30. // 3. 创建 Calcite Schema
  31. Connection connection = DriverManager.getConnection("jdbc:calcite:");
  32. CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
  33. SchemaPlus rootSchema = calciteConnection.getRootSchema();
  34. // 注册 JSON 数据为表
  35. rootNode.fields().forEachRemaining(entry -> {
  36. String schename =entry.getKey();
  37. String schenameData="";
  38. JsonNode schenameData1=null;
  39. try {
  40. schenameData = String.valueOf(entry.getValue());
  41. schenameData1 =objectMapper.readTree(schenameData);
  42. } catch (JsonProcessingException e) {
  43. throw new RuntimeException(e);
  44. }
  45. SchemaPlus schema = rootSchema.add(schename,new AbstractSchema());
  46. schenameData1.fields().forEachRemaining(entrynext -> {
  47. String tableName =entrynext.getKey();
  48. JsonNode tableData =entrynext.getValue();
  49. schema.add(tableName, new JsonScannableTable(Sources.of(tableData.toString())));
  50. });
  51. });
  52. //调用张艳杰接收连接方法
  53. resValue = getConnectionTable(calciteConnection,sql);
  54. // 关闭连接信息
  55. connection.close();
  56. } catch (Exception e) {
  57. e.printStackTrace();
  58. throw new RuntimeException(e);
  59. }
  60. return resValue;
  61. }
  62. // 自定义表类实现
  63. static class JsonTable extends AbstractTable {
  64. private final List<Map<String, Object>> rows = new ArrayList<>();
  65. public JsonTable(JsonNode data) {
  66. for (JsonNode row : data) {
  67. Map<String, Object> map = new HashMap<>();
  68. row.fieldNames().forEachRemaining(field -> {
  69. map.put(field, row.get(field).isNumber()
  70. ? row.get(field).numberValue()
  71. : row.get(field).asText());
  72. });
  73. rows.add(map);
  74. }
  75. }
  76. @Override
  77. public RelDataType getRowType(RelDataTypeFactory typeFactory) {
  78. // 动态推断字段类型
  79. if (rows.isEmpty()) {
  80. return typeFactory.createStructType(Collections.emptyList());
  81. }
  82. Map<String, Object> firstRow = rows.get(0);
  83. List<Pair<String, RelDataType>> fields = new ArrayList<>();
  84. for (Map.Entry<String, Object> entry : firstRow.entrySet()) {
  85. SqlTypeName sqlType = (entry.getValue() instanceof Number)
  86. ? SqlTypeName.DECIMAL
  87. : SqlTypeName.VARCHAR;
  88. fields.add(Pair.of(entry.getKey(), typeFactory.createSqlType(sqlType)));
  89. }
  90. return typeFactory.createStructType(fields);
  91. }
  92. public Enumerable<Object[]> scan(DataContext root) {
  93. List<Object[]> result = new ArrayList<>();
  94. for (Map<String, Object> row : rows) {
  95. result.add(row.values().toArray());
  96. }
  97. return Linq4j.asEnumerable(result);
  98. }
  99. }
  100. /**
  101. *
  102. * @param calciteConnection
  103. * @param sql
  104. * @return
  105. */
  106. public Object getConnectionTable (CalciteConnection calciteConnection,String sql) {
  107. //获取域
  108. SchemaPlus rootSchemafter = calciteConnection.getRootSchema();
  109. //根域中获取子域
  110. SchemaPlus childrenSchema=rootSchemafter.getSubSchema(calciteConnection.getRootSchema().getName());
  111. Object childrebnameobj=rootSchemafter.getSubSchemaNames();
  112. //子域名
  113. // String childrenName = childrenSchema.getName();
  114. // System.out.println("childrenName: " + childrenName);
  115. // //子域中表名
  116. // Set<String> tableSet = childrenSchema.getTableNames();
  117. // 定义返回结果
  118. Object value = null;
  119. try {
  120. Statement statement = calciteConnection.createStatement();
  121. ResultSet resultSet = statement.executeQuery(sql);
  122. if (resultSet.next()){
  123. value = resultSet.getObject(1);
  124. }
  125. } catch (SQLException e) {
  126. e.printStackTrace();
  127. throw new RuntimeException(e);
  128. }
  129. return value;
  130. }
  131. }