|
@@ -0,0 +1,174 @@
|
|
|
+package util;
|
|
|
+
|
|
|
+import db.DBApiEntry;
|
|
|
+import db.DriverLocationFieldName;
|
|
|
+import db.DriverLocationVal;
|
|
|
+import db.Util;
|
|
|
+import demo.Globals;
|
|
|
+import org.apache.commons.csv.CSVFormat;
|
|
|
+import org.apache.commons.csv.CSVRecord;
|
|
|
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
|
|
|
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
|
|
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
|
|
|
+import org.apache.iotdb.tsfile.write.record.Tablet;
|
|
|
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.Reader;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Paths;
|
|
|
+import java.text.ParseException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.logging.Logger;
|
|
|
+
|
|
|
+public class DataReader {
|
|
|
+ public static final Logger logger = Logger.getLogger(DBApiEntry.class.getName());
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description 将一系列DriverLocationVal数据转换为多个tables形式,其中result.key为缓冲函数提供作用
|
|
|
+ * @param driverLocationValHashtable
|
|
|
+ * @return java.util.Map<java.lang.String,org.apache.iotdb.tsfile.write.record.Tablet>
|
|
|
+ * @dateTime 2023/1/11 21:56
|
|
|
+ */
|
|
|
+ public static Map<String, Tablet> driverLocationValToTablet(
|
|
|
+ Hashtable<String, List<DriverLocationVal>> driverLocationValHashtable) {
|
|
|
+ if(driverLocationValHashtable == null || driverLocationValHashtable.isEmpty()) {
|
|
|
+ logger.info(String.format("driverLocationValHashtable is null or empty!"));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Tablet> tablets = new HashMap<>();
|
|
|
+ long totalTabletSize = 0L;
|
|
|
+ // 遍历每一台driverId,为其声明tablet,相当于以汽车id命名的表
|
|
|
+ for(String driverId : driverLocationValHashtable.keySet()) {
|
|
|
+ List<DriverLocationVal> driverLocationValList = driverLocationValHashtable.get(driverId);
|
|
|
+
|
|
|
+ if(driverLocationValList == null || driverLocationValList.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 根据driverId命名设备id,并充当度量名
|
|
|
+ String deviceId = String.format(Globals.STORAGEGROUP + "." + driverId);
|
|
|
+ List<MeasurementSchema> schema = new ArrayList<>();
|
|
|
+ // measurementId可乱序,相当于set集合,表明某个度量值具体的value属性
|
|
|
+ MeasurementSchema measurementschema0 = new MeasurementSchema(
|
|
|
+ DriverLocationFieldName._CLASS.getValue(),
|
|
|
+ TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.SNAPPY);
|
|
|
+ MeasurementSchema measurementschema1 = new MeasurementSchema(
|
|
|
+ DriverLocationFieldName.DIRECTION.getValue(),
|
|
|
+ TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
|
|
|
+ MeasurementSchema measurementschema2 = new MeasurementSchema(
|
|
|
+ DriverLocationFieldName.ELEVATION.getValue(),
|
|
|
+ TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY);
|
|
|
+ MeasurementSchema measurementschema3 = new MeasurementSchema(
|
|
|
+ DriverLocationFieldName.LAT.getValue(),
|
|
|
+ TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
|
|
|
+ MeasurementSchema measurementschema4 = new MeasurementSchema(
|
|
|
+ DriverLocationFieldName.LNG.getValue(),
|
|
|
+ TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
|
|
|
+ MeasurementSchema measurementschema5 = new MeasurementSchema(
|
|
|
+ DriverLocationFieldName.SPEED.getValue(),
|
|
|
+ TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
|
|
|
+ // 添加策略集
|
|
|
+ schema.add(measurementschema0);
|
|
|
+ schema.add(measurementschema1);
|
|
|
+ schema.add(measurementschema2);
|
|
|
+ schema.add(measurementschema3);
|
|
|
+ schema.add(measurementschema4);
|
|
|
+ schema.add(measurementschema5);
|
|
|
+ // 声明tablet:指定单个tablet最大行数为10000,默认1024
|
|
|
+ Tablet tablet = new Tablet(deviceId, schema, 10000);
|
|
|
+ if(tablet == null) {
|
|
|
+ logger.info(String.format("declaration tablet is null"));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 针对单独tablet添加value值
|
|
|
+ // 可同时为多个度量插入若干各dataPoint(支持留空白,会自添空值),添加value指明度量名即可
|
|
|
+ int index = 0;
|
|
|
+ for(DriverLocationVal driverLocationVal : driverLocationValList) {
|
|
|
+ tablet.addTimestamp(index, driverLocationVal.getUtcTimeMilliSeconds());
|
|
|
+
|
|
|
+ tablet.addValue(DriverLocationFieldName._CLASS.getValue(),
|
|
|
+ index, driverLocationVal.get_class());
|
|
|
+ tablet.addValue(DriverLocationFieldName.DIRECTION.getValue(),
|
|
|
+ index, driverLocationVal.getDirection());
|
|
|
+ tablet.addValue(DriverLocationFieldName.ELEVATION.getValue(),
|
|
|
+ index, driverLocationVal.getElevation());
|
|
|
+ tablet.addValue(DriverLocationFieldName.LAT.getValue(),
|
|
|
+ index, driverLocationVal.getLat());
|
|
|
+ tablet.addValue(DriverLocationFieldName.LNG.getValue(),
|
|
|
+ index, driverLocationVal.getLng());
|
|
|
+ tablet.addValue(DriverLocationFieldName.SPEED.getValue(),
|
|
|
+ index, driverLocationVal.getSpeed());
|
|
|
+
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ // Bitmap
|
|
|
+ tablet.initBitMaps();
|
|
|
+ tablet.rowSize += driverLocationValList.size();
|
|
|
+ totalTabletSize += driverLocationValList.size();
|
|
|
+
|
|
|
+ // 将tablet添加进Map
|
|
|
+ tablets.put(deviceId, tablet);
|
|
|
+ }
|
|
|
+ logger.info(String.format("all tablet size is:[%d]", totalTabletSize));
|
|
|
+ return tablets;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description 从指定csv文件内读取数据,转换为DriverLocationVal格式
|
|
|
+ * @param filePath
|
|
|
+ * @return java.util.Hashtable<java.lang.String,db.DriverLocationVal>
|
|
|
+ * @dateTime 2023/1/11 21:17
|
|
|
+ */
|
|
|
+ public static Hashtable<String, List<DriverLocationVal>> GetDriverLocationValFromCSV(
|
|
|
+ String filePath) {
|
|
|
+ if(filePath.isEmpty()) {
|
|
|
+ logger.info(String.format("csv file path is null!"));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ Hashtable<String, List<DriverLocationVal>> result = new Hashtable<>();
|
|
|
+
|
|
|
+ try (Reader reader = Files.newBufferedReader(Paths.get(filePath))) {
|
|
|
+ Iterable<CSVRecord> records = CSVFormat.DEFAULT.parse(reader);
|
|
|
+ records.iterator().next();
|
|
|
+ for (CSVRecord record : records) {
|
|
|
+ try {
|
|
|
+ // 从record获取数据,具体索引为列号
|
|
|
+ DriverLocationVal driverLocationVal = new DriverLocationVal();
|
|
|
+ driverLocationVal.setUtcTime(Util.dateStringToUTCMilliSeconds(record.get(0)));
|
|
|
+ driverLocationVal.setElevation(Integer.parseInt(record.get(1)));
|
|
|
+ driverLocationVal.setDriverId(record.get(2));
|
|
|
+ driverLocationVal.setLng(Double.parseDouble(record.get(3)));
|
|
|
+ driverLocationVal.set_class(record.get(4));
|
|
|
+ driverLocationVal.setLat(Double.parseDouble(record.get(5)));
|
|
|
+ driverLocationVal.setSpeed(Double.parseDouble(record.get(6)));
|
|
|
+ driverLocationVal.setDirection(Double.parseDouble(record.get(7)));
|
|
|
+
|
|
|
+ // 判断result内是否具备driverId,具备则采取List添加即可
|
|
|
+ if(result.containsKey(driverLocationVal.getDriverId())) {
|
|
|
+ result.get(driverLocationVal.getDriverId()).add(driverLocationVal);
|
|
|
+ }
|
|
|
+ // 不具备添加HashTable映射
|
|
|
+ else {
|
|
|
+ List<DriverLocationVal> driverLocationValList = new ArrayList<>();
|
|
|
+ driverLocationValList.add(driverLocationVal);
|
|
|
+ result.put(driverLocationVal.getDriverId(), driverLocationValList);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (ParseException e) {
|
|
|
+ logger.severe(String.format("Parse Time error, driverId:[%s] createTime:[%s]",
|
|
|
+ record.get(2), record.get(0)));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ logger.severe(String.format("read csv file exception:%s", e));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+}
|