Explorar el Código

插入数据异常处理

sensordb2 hace 1 año
padre
commit
859e8e3b2d

+ 8 - 3
db-api/src/main/java/db/Util.java

@@ -61,9 +61,14 @@ public class Util {
      * @author none
      * @date 2020-11-05 00:00
      */
-    public static String uTCMilliSecondsToDateStringWithMs(long utcMilliSeconds) throws ParseException {
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
-        return format.format(new Date(utcMilliSeconds));
+    public static String uTCMilliSecondsToDateStringWithMs(long utcMilliSeconds) {
+        try {
+            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
+            return format.format(new Date(utcMilliSeconds));
+        } catch (Exception e) {
+            e.printStackTrace();
+            return "";
+        }
     }
 
     public static String uTCMSToDateString(String utcSeconds) throws ParseException {

+ 1 - 0
file-api/src/main/java/db/entity/LogItem.java

@@ -9,6 +9,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 public class LogItem {
+    public static final short DEFAULT_LOG_ITEM_TYPE = 0x55;
     int magic = Globals.LOG_ITEM_MAGIC;
     short type;
     short length;

+ 54 - 24
insert-app/src/main/java/db/handler/PackageHandler.java

@@ -9,6 +9,8 @@ import db.entity.packages.ScanResponsePackage;
 import db.service.ProbeService;
 import db.service.StatisticService;
 import db.util.SpentTimeCalculator;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -39,8 +41,22 @@ public class PackageHandler {
     @Autowired
     private StatisticService statisticService;
 
+    private long bufferIndex = 0;
+    private SpentTimeCalculator bufferFullTimeCalculator = SpentTimeCalculator.create().begin();
+
     private List<DataResponsePackage> packageBuffer = new ArrayList<>(Globals.MAX_LOG_ITEM);
 
+
+    @Data
+    class SavePackageResult {
+        private boolean saveDiskSuccess = false;
+        private boolean saveTsdbSuccess = false;
+
+        public boolean isSuccess() {
+            return this.saveDiskSuccess && this.saveTsdbSuccess;
+        }
+    }
+
     public void handleDataResponsePackage(DataResponsePackage dataResponsePackage) {
         log.debug("开始处理探针上传的数据包:{}", dataResponsePackage);
         statisticService.incrDataCount();
@@ -48,30 +64,43 @@ public class PackageHandler {
         packageBuffer.add(dataResponsePackage);
 
         if (packageBuffer.size() == Globals.MAX_LOG_ITEM) {
+            // buffer满,记录满的时间
+            bufferFullTimeCalculator.end();
             // 保存
-            log.info("缓冲区满,开始保存");
-            if (saveMultiDataResponsePackage()) {
-                // 保存成功,清空缓冲区
+            log.info("[{}]缓冲区满,缓冲区开始接收数据时间:{}, 缓冲区满花费时间:{}ms", bufferIndex,
+                    Util.uTCMilliSecondsToDateStringWithMs(bufferFullTimeCalculator.getStartTime()),
+                    bufferFullTimeCalculator.getSpendTime());
+            log.info("[{}]缓冲区满,开始保存缓冲区数据", bufferIndex);
+
+            SavePackageResult savePackageResult = saveMultiDataResponsePackage();
+
+            if(savePackageResult.isSuccess()) {
+                packageBuffer.clear();
+                log.info("[{}]保存缓冲区数据至硬盘文件和tsdb成功", bufferIndex);
+            } else {
+                log.error("[{}]保存缓冲区数据失败,硬盘文件:{},tsdb:{}", bufferIndex, savePackageResult.saveDiskSuccess,
+                        savePackageResult.saveTsdbSuccess);
+                //TODO
                 packageBuffer.clear();
-                log.info("保存成功,清空缓冲区");
             }
+            bufferIndex++;
         }
-
-//        saveSingleDataResponsePackage(dataResponsePackage);
     }
 
-    public boolean saveMultiDataResponsePackage() {
+    public SavePackageResult saveMultiDataResponsePackage() {
+        SavePackageResult savePackageResult = new SavePackageResult();
+
         SpentTimeCalculator spentTimeCalculator = SpentTimeCalculator.create().begin();
 
         List<DBVal> dbVals = new ArrayList<>();
 
-        log.debug("开始保存至磁盘");
+        log.debug("[{}]开始保存至磁盘", bufferIndex);
 
         // 拼接即将写入磁盘的文件,并获取每一个longIndex
         List<LogItem> logItems = new ArrayList<>(Globals.MAX_LOG_ITEM);
         int totalDataSize = 0;
         for (DataResponsePackage dataResponsePackage : packageBuffer) {
-            logItems.add(new LogItem((short) 0x55, dataResponsePackage.getTimestamp(), dataResponsePackage.getData()));
+            logItems.add(new LogItem(LogItem.DEFAULT_LOG_ITEM_TYPE, dataResponsePackage.getTimestamp(), dataResponsePackage.getData()));
             totalDataSize += dataResponsePackage.getData().length;
 
             HashMap<String, String> tags = new HashMap<>();
@@ -93,18 +122,20 @@ public class PackageHandler {
         List<Double> doubleIndexes;
         try {
             doubleIndexes = packetFileMetaService.appendLogs(logItems, totalDataSize); // 已做转换
-        } catch (IOException e) {
-            log.error("写入数据到磁盘失败");
-            log.error("packetFileMetaService.appendLog error", e);
-            return false;
+        } catch (Exception e) {
+            log.error("[{}]写入数据到磁盘失败", bufferIndex);
+            log.error("[{}]packetFileMetaService.appendLog error", bufferIndex, e);
+            return savePackageResult;
         }
+        spentTimeCalculator.end();
         int insertCount = doubleIndexes.size();
-        log.debug("写入磁盘成功,接收探针上传数据到写入磁盘花费总时间:{}ms 插入{}条", spentTimeCalculator.getSpendTime(),
+        log.debug("[{}]写入磁盘成功,写入磁盘花费总时间:{}ms 插入{}条", bufferIndex, spentTimeCalculator.getSpendTime(),
                 insertCount);
         statisticService.increDataCountDiskSuccess(insertCount);
+        savePackageResult.setSaveDiskSuccess(true);
 
         // 写入数据库
-        log.debug("开始尝试插入ResoDB");
+        log.debug("[{}]开始尝试插入ResoDB", bufferIndex);
 
         for (int i = 0; i < insertCount; i++) {
             dbVals.get(i).setValue(doubleIndexes.get(i));
@@ -112,30 +143,29 @@ public class PackageHandler {
 
         try {
             int insertRes = entry.insertMultiPoint(dbVals);
-
             spentTimeCalculator.end();
 
             if (insertRes != 0) {
                 statisticService.increDataCountTsdbSuccess(insertCount);
+                savePackageResult.setSaveTsdbSuccess(true);
 //                log.debug("插入ResoDB结束,point={},插入{}条", dbVals, insertCount);
-                log.debug("接收探针上传数据到插入时序数据库花费总时间:{}ms 插入{}条", spentTimeCalculator.getSpendTime(), insertCount);
+                log.debug("[{}]写入硬盘并插入时序数据库花费总时间:{}ms 插入{}条", bufferIndex, spentTimeCalculator.getSpendTime(), insertCount);
             } else {
-                log.error("接收探针上传数据到插入时序数据库花费总时间:{}ms 插入失败", spentTimeCalculator.getSpendTime());
+                log.error("[{}]写入硬盘并插入时序数据库花费总时间:{}ms 插入失败", bufferIndex, spentTimeCalculator.getSpendTime());
             }
 
             if (statisticService.getShowStatisticAfterInsertData()) {
-                log.info("到目前为止的统计信息:{}", statisticService.getStatistic().toString());
+                log.info("[{}]到目前为止的统计信息:{}", bufferIndex, statisticService.getStatistic().toString());
             }
         } catch (Exception e) {
-            log.error(String.format("插入ResoDB失败,ip:%s port:%d", entry.getHost(), entry.getPort()), e);
+            log.error(String.format("[%d]插入ResoDB失败,ip:%s port:%d", bufferIndex, entry.getHost(), entry.getPort()), e);
 
             if (statisticService.getShowStatisticAfterInsertData()) {
-                log.info("到目前为止的统计信息:{}", statisticService.getStatistic().toString());
+                log.info("[{}]到目前为止的统计信息:{}", bufferIndex, statisticService.getStatistic().toString());
             }
-            return false;
+            return savePackageResult;
         }
-        return true;
-
+        return  savePackageResult;
     }
 
     public void saveSingleDataResponsePackage(DataResponsePackage dataResponsePackage) {