|
@@ -1,5 +1,6 @@
|
|
|
package com.sundata.internalevaluation.conf;
|
|
|
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.sundata.internalevaluation.calc.calcUnit.IndexCalcUnit;
|
|
@@ -18,83 +19,52 @@ import org.apache.calcite.sql.type.SqlTypeName;
|
|
|
import org.apache.calcite.util.Pair;
|
|
|
import org.apache.calcite.util.Sources;
|
|
|
|
|
|
-import java.sql.Connection;
|
|
|
-import java.sql.DriverManager;
|
|
|
+import java.sql.*;
|
|
|
import java.util.*;
|
|
|
|
|
|
public class JsonToCalciteExample {
|
|
|
|
|
|
- public Object jsonTotable(String schemaName, String jsonDate, String sql) {
|
|
|
+ public Object jsonTotable(String jsonDate, String sql) {
|
|
|
// 1. JSON 数据
|
|
|
Object resValue = null;
|
|
|
try{
|
|
|
// 2. 使用 Jackson 解析 JSON 数据
|
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
|
JsonNode rootNode = objectMapper.readTree(jsonDate);
|
|
|
-
|
|
|
// 3. 创建 Calcite Schema
|
|
|
Connection connection = DriverManager.getConnection("jdbc:calcite:");
|
|
|
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
|
|
|
-
|
|
|
SchemaPlus rootSchema = calciteConnection.getRootSchema();
|
|
|
|
|
|
- SchemaPlus schema = rootSchema.add(schemaName,new AbstractSchema());
|
|
|
+
|
|
|
|
|
|
// 注册 JSON 数据为表
|
|
|
rootNode.fields().forEachRemaining(entry -> {
|
|
|
- String tableName = entry.getKey();
|
|
|
- JsonNode tableData = entry.getValue();
|
|
|
- schema.add(tableName, new JsonScannableTable(Sources.of(tableData.toString())));
|
|
|
+ String schename =entry.getKey();
|
|
|
+ String schenameData="";
|
|
|
+ JsonNode schenameData1=null;
|
|
|
+ try {
|
|
|
+ schenameData = String.valueOf(entry.getValue());
|
|
|
+ schenameData1 =objectMapper.readTree(schenameData);
|
|
|
+ } catch (JsonProcessingException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ SchemaPlus schema = rootSchema.add(schename,new AbstractSchema());
|
|
|
+ schenameData1.fields().forEachRemaining(entrynext -> {
|
|
|
+ String tableName =entrynext.getKey();
|
|
|
+ JsonNode tableData =entrynext.getValue();
|
|
|
+ schema.add(tableName, new JsonScannableTable(Sources.of(tableData.toString())));
|
|
|
+ });
|
|
|
});
|
|
|
- // 4. 执行 SQL 查询
|
|
|
-// Statement statement = connection.createStatement();
|
|
|
-// String sql = """
|
|
|
-// SELECT "test"."input"."method" FROM "test"."input"
|
|
|
-// """;
|
|
|
-//
|
|
|
-// ResultSet resultSet = statement.executeQuery(sql);
|
|
|
-// while (resultSet.next()){
|
|
|
-// String method=resultSet.getString("method");
|
|
|
-// System.out.println(method);
|
|
|
-// }
|
|
|
- // 打印查询结果
|
|
|
- /*while (resultSet.next()) {
|
|
|
- System.out.printf("Order ID: %d, Amount: %.2f, Name: %s%n",
|
|
|
- resultSet.getInt("user_id"),
|
|
|
- resultSet.getDouble("amount"),
|
|
|
- resultSet.getString("name"));
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- // 关闭连接
|
|
|
- /*resultSet.close();
|
|
|
- statement.close();*/
|
|
|
- //获取域
|
|
|
-// SchemaPlus rootSchemafter = calciteConnection.getRootSchema();
|
|
|
-// //根域中获取子域
|
|
|
-// SchemaPlus childrenSchema=rootSchemafter.getSubSchema("test");
|
|
|
-// //子域名
|
|
|
-// String childrenName = childrenSchema.getName();
|
|
|
-// System.out.println("childrenName: " + childrenName);
|
|
|
-// //子域中表名
|
|
|
-// Set<String> tableNames = childrenSchema.getTableNames();
|
|
|
-// for (String tableName : tableNames) {
|
|
|
-// System.out.println("Table: " + tableName);
|
|
|
-// }
|
|
|
-
|
|
|
- //调用张艳杰接收连接方法,传参 calciteConnection
|
|
|
- //方法示例
|
|
|
- IndexCalcUnit model = new IndexCalcUnit(schemaName,"",Map.of(),new IndexConfigModel());
|
|
|
- resValue = model.getConnectionTable(calciteConnection,sql);
|
|
|
+
|
|
|
+ //调用张艳杰接收连接方法
|
|
|
+ resValue = getConnectionTable(calciteConnection,sql);
|
|
|
|
|
|
connection.close();
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
- finally {
|
|
|
- return resValue;
|
|
|
- }
|
|
|
-
|
|
|
+ return resValue;
|
|
|
}
|
|
|
|
|
|
// 自定义表类实现
|
|
@@ -140,4 +110,36 @@ public class JsonToCalciteExample {
|
|
|
return Linq4j.asEnumerable(result);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @param calciteConnection
|
|
|
+ * @param sql
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public Object getConnectionTable (CalciteConnection calciteConnection,String sql) {
|
|
|
+ //获取域
|
|
|
+ SchemaPlus rootSchemafter = calciteConnection.getRootSchema();
|
|
|
+ //根域中获取子域
|
|
|
+ SchemaPlus childrenSchema=rootSchemafter.getSubSchema(calciteConnection.getRootSchema().getName());
|
|
|
+ Object childrebnameobj=rootSchemafter.getSubSchemaNames();
|
|
|
+ //子域名
|
|
|
+// String childrenName = childrenSchema.getName();
|
|
|
+// System.out.println("childrenName: " + childrenName);
|
|
|
+// //子域中表名
|
|
|
+// Set<String> tableSet = childrenSchema.getTableNames();
|
|
|
+ // 定义返回结果
|
|
|
+ Object value = null;
|
|
|
+ try {
|
|
|
+ Statement statement = calciteConnection.createStatement();
|
|
|
+ ResultSet resultSet = statement.executeQuery(sql);
|
|
|
+ if (resultSet.next()){
|
|
|
+ value = resultSet.getObject(1);
|
|
|
+ }
|
|
|
+ } catch (SQLException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return value;
|
|
|
+ }
|
|
|
}
|