Procházet zdrojové kódy

improve single insert performance

apkipa před 2 měsíci
rodič
revize
2e999d8b34

+ 8 - 0
pom.xml

@@ -67,6 +67,14 @@
             <artifactId>log4j-slf4j2-impl</artifactId>
             <version>2.23.1</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>2.0.16</version>
+            <scope>test</scope>
+        </dependency>
+
 
         <dependency>
             <groupId>org.javatuples</groupId>

+ 55 - 2
src/main/java/db/DBApiEntry.java

@@ -4,10 +4,16 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.clickhouse.client.api.Client;
 import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
+import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
 import com.clickhouse.client.api.enums.Protocol;
 import com.clickhouse.client.api.metadata.TableSchema;
 import com.clickhouse.client.api.query.QueryResponse;
 import com.clickhouse.client.api.query.QuerySettings;
+import com.clickhouse.data.ClickHouseColumn;
+import com.clickhouse.data.ClickHouseFormat;
+import com.clickhouse.data.format.BinaryStreamUtils;
+import com.clickhouse.data.format.ClickHouseBinaryFormatProcessor;
+import com.clickhouse.data.format.ClickHouseRowBinaryProcessor;
 import db.dto.StckPointVal;
 import db.page.MultiPointRangePagingQuery;
 import db.page.RangePagingQuery;
@@ -519,8 +525,46 @@ ENGINE = Distributed('cluster_3S_1R', 'db1', 'tsdb_cpp', rand())
 
         pointVals.getPoint().ensureValid();
 
-        long benchTimeBegin = System.currentTimeMillis();
-        // Convert to ClickHousePointVal for insertion
+        long benchTimeBegin = System.currentTimeMillis(), benchTimeTemp = 0;
+        // NOTE: ClickHouse JAVA client has poorly designed insert APIs, we do things manually here
+        try {
+            // TODO: Maybe also use BinaryStreamUtils?
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            int count = pointVals.getCount();
+            HashMap<String, String> tags = new HashMap<>();
+            for (String key : pointVals.getTags().keySet()) {
+                if (StringUtils.equals(key, "pointName")) {
+                    continue;
+                }
+                tags.put(key, pointVals.getTags().get(key));
+            }
+            ClickHouseColumn colMetricName = ClickHouseColumn.of("metric_name", "String");
+            ClickHouseColumn colPointName = ClickHouseColumn.of("point_name", "String");
+            ClickHouseColumn colTags = ClickHouseColumn.of("tags", "Map(String, String)");
+            ClickHouseColumn colValue = ClickHouseColumn.of("value", "Float64");
+            ClickHouseColumn colNanoseconds = ClickHouseColumn.of("nanoseconds", "Int64");
+            for (int i = 0; i < count; i++) {
+                SerializerUtils.serializeData(baos, pointVals.getValue(i), colValue);
+                SerializerUtils.serializeData(baos, pointVals.getUtcTime(i), colNanoseconds);
+            }
+            // Thanks to ClickHouse JAVA client not escaping table name, we can inject INSERT SELECT here
+            String tableName = makeTableName() + " (metric_name, point_name, tags, value, nanoseconds)" +
+                    " SELECT " + Util.ToSqlLiteral(pointVals.getMetricName()) +
+                    ", " + Util.ToSqlLiteral(pointVals.getPointName()) +
+                    ", " + Util.ToSqlLiteral(tags) +
+                    ", col1, col2" +
+                    " FROM input('col1 Float64, col2 Int64')";
+            benchTimeTemp = System.currentTimeMillis();
+            LOG.debug("insertPointValsToDB: prepare insert took {}ms", benchTimeTemp - benchTimeBegin);
+            benchTimeBegin = benchTimeTemp;
+            client.insert(tableName, new ByteArrayInputStream(baos.toByteArray()), ClickHouseFormat.RowBinary).get();
+            benchTimeTemp = System.currentTimeMillis();
+            LOG.debug("insertPointValsToDB: insert took {}ms", benchTimeTemp - benchTimeBegin);
+        } catch (Exception e) {
+            LOG.error("insertPointValsToDb error: ", e);
+        }
+        /*
+        // Convert to StckPointVal for insertion
         List<StckPointVal> pointValsList = new LinkedList<>();
         String metricName = pointVals.getMetricName();
         String pointName = pointVals.getPointName();
@@ -547,6 +591,7 @@ ENGINE = Distributed('cluster_3S_1R', 'db1', 'tsdb_cpp', rand())
         } catch (Exception e) {
             LOG.error("insertPointValsToDb error: ", e);
         }
+        */
     }
 
     /// Write to ClickHouse
@@ -1482,6 +1527,14 @@ INNER JOIN
 //        return dbVal;
     }
 
+    public List<DBVal> calAggOverTimeForManyPts(List<Point> points, String aggMethod, long startTime, long endTime) {
+        List<DBVal> result = new ArrayList<>();
+        for (Point point : points) {
+            result.add(calAggOverTime(point, aggMethod, startTime, endTime));
+        }
+        return result;
+    }
+
     public Long countOverTime(Point point, long start, long end) {
         double result = this.selectPointAggValueFromDB(point, start, end, "count");
         if (Double.isNaN(result)) {

+ 45 - 0
src/main/java/db/Util.java

@@ -6,6 +6,7 @@ import java.text.DecimalFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Map;
 
 public class Util {
     /**
@@ -162,4 +163,48 @@ public class Util {
         return bytes.toString();
     }
 
+    /// 逃逸 SQL 字符串,防止 SQL 注入
+    public static String EscapeSqlName(String name) {
+        return name.replace("\\", "\\\\").replace("'", "\\'");
+    }
+
+    public static String MakeSqlStrLiteral(String value) {
+        return "'" + EscapeSqlName(value) + "'";
+    }
+
+    public static String ToSqlLiteral(String str) {
+        return MakeSqlStrLiteral(str);
+    }
+
+    public static String ToSqlLiteral(double val) {
+        return String.valueOf(val);
+    }
+
+    public static String ToSqlLiteral(int num) {
+        return String.valueOf(num);
+    }
+
+    public static String ToSqlLiteral(long num) {
+        return String.valueOf(num);
+    }
+
+    public static String ToSqlLiteral(Map<?, ?> mp) {
+        StringBuilder sb = new StringBuilder();
+        boolean isFirst = true;
+        sb.append("[");
+        for (Map.Entry<?, ?> entry : mp.entrySet()) {
+            if (!isFirst) {
+                sb.append(",");
+            }
+            sb.append("(");
+            sb.append(ToSqlLiteral(entry.getKey().toString()));
+            sb.append(",");
+            sb.append(ToSqlLiteral(entry.getValue().toString()));
+            sb.append(")");
+            isFirst = false;
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
 }

+ 39 - 0
src/test/java/TestGetSumValueList.java

@@ -0,0 +1,39 @@
+import config.Globals;
+import db.DBApiEntry;
+import db.DBVal;
+import db.Point;
+import db.Util;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class TestGetSumValueList {
+
+    public static void main(String[] args) throws ParseException {
+        String host = Globals.HOST;
+        int port = Globals.port;
+        List<Point> points = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String metricName = "metricName";
+            HashMap<String, String> tags = new HashMap<>();
+            tags.put("pointName", "pointName_test11" + i);
+            tags.put("status", "3");
+            points.add(new Point(metricName, tags));
+        }
+
+        String start = "2021-08-15 10:00:01";
+        String end = "2021-08-15 10:00:02";
+        long startTime = Util.dateStringToUTCNanoseconds(start);
+        long endTime = Util.dateStringToUTCNanoseconds(end);
+
+        long testBegin = System.currentTimeMillis();
+        DBApiEntry entry = DBApiEntry.initApiEntry(host, port);
+        List<DBVal> dbvals = entry.calAggOverTimeForManyPts(points, "sum", startTime, endTime);
+        long testEnd = System.currentTimeMillis();
+        System.out.println(String.format("spend %d (ms)", testEnd - testBegin));
+        System.out.println(String.format("result:%s", dbvals));
+    }
+
+}

+ 6 - 0
src/test/java/TestInsertSinglePoint.java

@@ -1,12 +1,18 @@
 import com.alibaba.fastjson.JSONObject;
 import config.Globals;
 import db.*;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.config.Configurator;
 
 import java.text.ParseException;
 import java.util.HashMap;
 
 public class TestInsertSinglePoint {
     public static void main(String[] args) throws ParseException {
+        //System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "debug");
+        //Configurator.setAllLevels(LogManager.getRootLogger().getName(), org.apache.logging.log4j.Level.DEBUG);
+        Configurator.setLevel(DBApiEntry.class, org.apache.logging.log4j.Level.DEBUG);
+
 //        String host = "192.168.101.87";
         String host = Globals.HOST;
         int port = Globals.port;