apkipa vor 2 Monaten
Ursprung
Commit
6a67e6c435
2 geänderte Dateien mit 126 neuen und 17 gelöschten Zeilen
  1. 82 17
      src/main/java/db/DBApiEntry.java
  2. 44 0
      src/test/java/clustertemp/TestClusterInsert.java

+ 82 - 17
src/main/java/db/DBApiEntry.java

@@ -51,9 +51,14 @@ public class DBApiEntry implements AutoCloseable {
     private long lookBack = 10L * 60 * 1000_000_000;
     private String currentDatabase = "default";
     private String currentTable = TABLE_NAME;
-//    private QuerySettings querySettings = new QuerySettings()
+    //    private QuerySettings querySettings = new QuerySettings()
 //            .serverSetting("http_max_field_value_size", "26214400");
     private QuerySettings querySettings = new QuerySettings();
+    private static String secureCertPath = "";
+
+    public static void SetCertForSecureConnection(String certPath) {
+        secureCertPath = certPath;
+    }
 
     /**
      * 初始化数据驱动构造方法
@@ -61,19 +66,38 @@ public class DBApiEntry implements AutoCloseable {
      * @throws IOException
      */
     public DBApiEntry(String addressHost, int publicPort) throws IOException {
+        this(addressHost, publicPort, "default", TABLE_NAME);
+    }
+
+    public DBApiEntry(String addressHost, int publicPort, String token) throws IOException {
 //        this.host = addressHost;
 //        this.port = publicPort;
+//        this.token = token;
 //        this.insertPort = this.port + 1;
 //        this.selectPort = this.port + 2;
-        client = new Client.Builder()
-                .addEndpoint(Protocol.HTTP, addressHost, publicPort, false)
+        this(addressHost, publicPort);
+    }
+
+    public DBApiEntry(String addressHost, int publicPort, String database, String table) throws IOException {
+        this(addressHost, publicPort, database, table, false);
+    }
+
+    public DBApiEntry(String addressHost, int publicPort, String database, String table, boolean useSecure) throws IOException {
+        this.currentDatabase = database;
+        this.currentTable = table;
+
+        Client.Builder builder = new Client.Builder()
+                .addEndpoint(Protocol.HTTP, addressHost, publicPort, useSecure)
                 .setUsername("default")
                 .setPassword("")
                 .compressClientRequest(true)
                 .compressServerResponse(true)
                 .serverSetting("http_max_field_value_size", "26214400")
-                .serverSetting("empty_result_for_aggregation_by_empty_set", "1")
-                .build();
+                .serverSetting("empty_result_for_aggregation_by_empty_set", "1");
+        if (useSecure) {
+            builder.setRootCertificate(secureCertPath);
+        }
+        client = builder.build();
 
         // Init ClickHouse client
 //        client.execute("SET max_query_size = 26214400");
@@ -84,31 +108,72 @@ public class DBApiEntry implements AutoCloseable {
         client.register(StckPointVal.class, schema);
     }
 
-    public DBApiEntry(String addressHost, int publicPort, String token) throws IOException {
-//        this.host = addressHost;
-//        this.port = publicPort;
-//        this.token = token;
-//        this.insertPort = this.port + 1;
-//        this.selectPort = this.port + 2;
-        this(addressHost, publicPort);
-    }
-
     /**
      * 静态方法初始化时序数据源驱动类
      *
      * @return DBApiEntry
      */
     public static DBApiEntry initApiEntry(String addressHost, int publicPort) {
+        return initApiEntry(addressHost, publicPort, "default", TABLE_NAME);
+    }
+
+    public static DBApiEntry initApiEntry(String addressHost, int publicPort, String token) {
+        return initApiEntry(addressHost, publicPort);
+    }
+
+    public static DBApiEntry initApiEntry(String addressHost, int publicPort, String database, String table) {
+        return initApiEntry(addressHost, publicPort, database, table, false);
+    }
+
+    public static DBApiEntry initApiEntry(String addressHost, int publicPort, String database, String table, boolean useSecure) {
         try {
-            return new DBApiEntry(addressHost, publicPort);
+            return new DBApiEntry(addressHost, publicPort, database, table, useSecure);
         } catch (Exception e) {
             LOG.error("initApiEntry error: ", e);
             return null;
         }
     }
 
-    public static DBApiEntry initApiEntry(String addressHost, int publicPort, String token) {
-        return initApiEntry(addressHost, publicPort);
+    public String getCurrentTable() {
+        return currentTable;
+    }
+
+    public void setCurrentTable(String currentTable) {
+        this.currentTable = currentTable;
+    }
+
+    public String getCurrentDatabase() {
+        return currentDatabase;
+    }
+
+    public void setCurrentDatabase(String currentDatabase) {
+        this.currentDatabase = currentDatabase;
+    }
+
+    public void doMigrateTable() {
+        // TODO...
+    }
+
+    public void doMigrateDistributedTable() {
+        // TODO...
+        /*
+CREATE TABLE db1.tsdb_cpp_dist ON CLUSTER cluster_3S_1R
+(
+
+    `metric_name` LowCardinality(String) COMMENT 'Metric name',
+
+    `point_name` LowCardinality(String) COMMENT 'Point name',
+
+    `tags` Map(LowCardinality(String),
+ LowCardinality(String)) COMMENT 'Point tags',
+
+    `value` Float64 COMMENT 'Point value',
+
+    `nanoseconds` Int64 COMMENT 'Point time in nanoseconds' CODEC(DoubleDelta,
+ LZ4)
+)
+ENGINE = Distributed('cluster_3S_1R', 'db1', 'tsdb_cpp', rand())
+         */
     }
 
     /**

+ 44 - 0
src/test/java/clustertemp/TestClusterInsert.java

@@ -0,0 +1,44 @@
+package clustertemp;
+
+import db.DBApiEntry;
+import db.DBVal;
+import db.Point;
+import db.PointVals;
+
+import java.util.HashMap;
+
+public class TestClusterInsert {
+    public static void main(String[] args) {
+        String host = "";
+        int port = 8123;
+        String metricName = "metricName";
+        String databaseName = "db1";
+        String tableName = "tsdb_cpp_dist";
+        HashMap<String, String> tags = new HashMap<>();
+        tags.put("pointName", "pointName_test111");
+        Point point = new Point(metricName, tags);
+
+        // 向节点1插入数据
+        host = "172.31.48.206";
+        {
+            DBApiEntry.SetCertForSecureConnection("Z:/tcboxlocal_host_ca.crt");
+            DBApiEntry entry = DBApiEntry.initApiEntry(host, 8443, databaseName, tableName, true);
+            double[] values = new double[1];
+            values[0] = 4.2;
+            long[] timestamps = new long[1];
+            timestamps[0] = System.currentTimeMillis() * 1000000;
+            PointVals pointVals = new PointVals(point, 1, timestamps, values);
+            entry.insertSinglPoint(pointVals);
+            System.out.println("Insert data to node1 successfully.");
+        }
+
+        // 从节点2查询数据
+        {
+            host = "172.31.48.207";
+            DBApiEntry entry = DBApiEntry.initApiEntry(host, port, databaseName, tableName);
+            DBVal dbVal = entry.getRTValue(point);
+            System.out.println("Query data from node2 successfully.");
+            System.out.println(dbVal.toString());
+        }
+    }
+}