123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- package com.sundata.internalevaluation.conf;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.sundata.internalevaluation.calc.calcUnit.IndexCalcUnit;
- import com.sundata.internalevaluation.configuration.model.IndexConfigModel;
- import org.apache.calcite.DataContext;
- import org.apache.calcite.adapter.file.JsonScannableTable;
- import org.apache.calcite.jdbc.CalciteConnection;
- import org.apache.calcite.linq4j.Enumerable;
- import org.apache.calcite.linq4j.Linq4j;
- import org.apache.calcite.rel.type.RelDataType;
- import org.apache.calcite.rel.type.RelDataTypeFactory;
- import org.apache.calcite.schema.SchemaPlus;
- import org.apache.calcite.schema.impl.AbstractSchema;
- import org.apache.calcite.schema.impl.AbstractTable;
- import org.apache.calcite.sql.type.SqlTypeName;
- import org.apache.calcite.util.Pair;
- import org.apache.calcite.util.Sources;
- import java.sql.*;
- import java.util.*;
- public class JsonToCalciteExample {
- public Object jsonTotable(String jsonDate, String sql) {
- // 1. JSON 数据
- Object resValue = null;
- try{
- // 2. 使用 Jackson 解析 JSON 数据
- ObjectMapper objectMapper = new ObjectMapper();
- JsonNode rootNode = objectMapper.readTree(jsonDate);
- // 3. 创建 Calcite Schema
- Connection connection = DriverManager.getConnection("jdbc:calcite:");
- CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
- SchemaPlus rootSchema = calciteConnection.getRootSchema();
- // 注册 JSON 数据为表
- rootNode.fields().forEachRemaining(entry -> {
- String schename =entry.getKey();
- String schenameData="";
- JsonNode schenameData1=null;
- try {
- schenameData = String.valueOf(entry.getValue());
- schenameData1 =objectMapper.readTree(schenameData);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- SchemaPlus schema = rootSchema.add(schename,new AbstractSchema());
- schenameData1.fields().forEachRemaining(entrynext -> {
- String tableName =entrynext.getKey();
- JsonNode tableData =entrynext.getValue();
- schema.add(tableName, new JsonScannableTable(Sources.of(tableData.toString())));
- });
- });
- //调用张艳杰接收连接方法
- resValue = getConnectionTable(calciteConnection,sql);
- // 关闭连接信息
- connection.close();
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- return resValue;
- }
- // 自定义表类实现
- static class JsonTable extends AbstractTable {
- private final List<Map<String, Object>> rows = new ArrayList<>();
- public JsonTable(JsonNode data) {
- for (JsonNode row : data) {
- Map<String, Object> map = new HashMap<>();
- row.fieldNames().forEachRemaining(field -> {
- map.put(field, row.get(field).isNumber()
- ? row.get(field).numberValue()
- : row.get(field).asText());
- });
- rows.add(map);
- }
- }
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- // 动态推断字段类型
- if (rows.isEmpty()) {
- return typeFactory.createStructType(Collections.emptyList());
- }
- Map<String, Object> firstRow = rows.get(0);
- List<Pair<String, RelDataType>> fields = new ArrayList<>();
- for (Map.Entry<String, Object> entry : firstRow.entrySet()) {
- SqlTypeName sqlType = (entry.getValue() instanceof Number)
- ? SqlTypeName.DECIMAL
- : SqlTypeName.VARCHAR;
- fields.add(Pair.of(entry.getKey(), typeFactory.createSqlType(sqlType)));
- }
- return typeFactory.createStructType(fields);
- }
- public Enumerable<Object[]> scan(DataContext root) {
- List<Object[]> result = new ArrayList<>();
- for (Map<String, Object> row : rows) {
- result.add(row.values().toArray());
- }
- return Linq4j.asEnumerable(result);
- }
- }
- /**
- *
- * @param calciteConnection
- * @param sql
- * @return
- */
- public Object getConnectionTable (CalciteConnection calciteConnection,String sql) {
- //获取域
- SchemaPlus rootSchemafter = calciteConnection.getRootSchema();
- //根域中获取子域
- SchemaPlus childrenSchema=rootSchemafter.getSubSchema(calciteConnection.getRootSchema().getName());
- Object childrebnameobj=rootSchemafter.getSubSchemaNames();
- //子域名
- // String childrenName = childrenSchema.getName();
- // System.out.println("childrenName: " + childrenName);
- // //子域中表名
- // Set<String> tableSet = childrenSchema.getTableNames();
- // 定义返回结果
- Object value = null;
- try {
- Statement statement = calciteConnection.createStatement();
- ResultSet resultSet = statement.executeQuery(sql);
- if (resultSet.next()){
- value = resultSet.getObject(1);
- }
- } catch (SQLException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- return value;
- }
- }
|