ソースを参照

修复读取wal失败的问题

FutureYu 1 年間 前
コミット
fac24a4dae
1 ファイル変更44 行追加17 行削除
  1. 44 17
      insert-app/src/main/java/db/handler/PackageHandler.java

+ 44 - 17
insert-app/src/main/java/db/handler/PackageHandler.java

@@ -26,7 +26,9 @@ import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.*;
 
 @Component
@@ -69,11 +71,17 @@ public class PackageHandler {
 
     private DelayQueue<TsdbTask> tsdbTaskBuffer = new DelayQueue<>();
     private DelayQueue<DiskTask> diskTaskBuffer = new DelayQueue<>();
+    private Queue<Long> replayableWalIds = new ConcurrentLinkedQueue<>();
 
     public void initBuffer() {
         bufferIndex.set(0L);
         bufferFullTimeCalculator.set(SpentTimeCalculator.create().begin());
         packageBuffer.set(new ArrayList<>(Globals.MAX_LOG_ITEM));
+        if(enableWal && replayableWalIds.size() > 0) {
+            Long replayableWalId = replayableWalIds.poll();
+            log.info("从队列中取出待恢复wal:{}", replayableWalId);
+            addWalToBuffer(replayableWalId);
+        }
     }
 
     @Data
@@ -157,8 +165,8 @@ public class PackageHandler {
         if (!ObjectUtils.isEmpty(walLists)) {
             log.info("检测到wal文件:{},开始恢复", walLists);
             for (Long walId : walLists) {
-                List<WalItem> walItems = walFileService.readWal(walId);
-                replayWal(walId, walItems);
+                log.debug("开始重放wal文件{}", walId); 
+                replayWal(walId);
             }
         }
     }
@@ -168,7 +176,8 @@ public class PackageHandler {
      *
      * @param walItems
      */
-    public void replayWal(long walId, List<WalItem> walItems) {
+    public void replayWal(long walId) {
+        List<WalItem> walItems = walFileService.readWal(walId);
         if (walItems.size() == Globals.MAX_LOG_ITEM) {
             log.info("wal文件:{}数目达到Max_Item,直接放入磁盘任务队列", walId);
             List<DataResponsePackage> target = new ArrayList<>(Globals.MAX_LOG_ITEM);
@@ -179,24 +188,32 @@ public class PackageHandler {
                 target.add(dataResponsePackage);
             }
             saveFulledPackageBuffer(target, -1, walId);
+            log.info("恢复{}条wal", walItems.size());
             // 直接保存
         } else if (walItems.size() < Globals.MAX_LOG_ITEM) {
             // 直接写入buffer
-            log.info("wal文件:{}数目不足Max_Item,直接放入缓冲区", walId);
-            // TODO 多个"数目不足"文件可能存在问题
-            walFileService.openWalFile(walId);
-            for (WalItem walItem : walItems) {
-                DataResponsePackage dataResponsePackage = new DataResponsePackage(walItem.getProbeId(),
-                        walItem.getPort(), walItem.getDataStrategy(), walItem.getClockSync(),
-                        walItem.getStamp(), walItem.getRawPacket());
-                addToPackageBuffer(dataResponsePackage);
-            }
+            log.info("wal文件:{}数目不足Max_Item,直接放入待恢复队列", walId);
+            replayableWalIds.add(walId);
         } else {
             // 不应到这里
             log.warn("[replayWal] walItems.size() > Globals.MAX_LOG_ITEM");
             return;
         }
+    }
 
+    /**
+     * 重放数目不足MaxItem的wal
+     * @param walId
+     */
+    public void addWalToBuffer(Long walId) {
+        walFileService.openWalFile(walId);
+        List<WalItem> walItems = walFileService.readWal(walId);
+        for (WalItem walItem : walItems) {
+            DataResponsePackage dataResponsePackage = new DataResponsePackage(walItem.getProbeId(),
+                    walItem.getPort(), walItem.getDataStrategy(), walItem.getClockSync(),
+                    walItem.getStamp(), walItem.getRawPacket());
+            addToPackageBuffer(dataResponsePackage);
+        }
         log.info("恢复{}条wal", walItems.size());
     }
 
@@ -223,6 +240,11 @@ public class PackageHandler {
         log.debug("开始处理探针上传的数据包:{}", dataResponsePackage);
         statisticService.incrDataCount();
 
+        if (bufferIndex.get() == null) {
+            log.info("初始化threadLocal");
+            initBuffer();
+        }
+
         if (enableWal) { // 生成 WAL 文件
             WalItem walItem = new WalItem(dataResponsePackage.getProbeId(), dataResponsePackage.getPort(),
                     dataResponsePackage.getDataStrategy(), dataResponsePackage.getClockSync(),
@@ -235,11 +257,6 @@ public class PackageHandler {
             }
         }
 
-        if (bufferIndex.get() == null) {
-            log.info("初始化threadLocal");
-            initBuffer();
-        }
-
         addToPackageBuffer(dataResponsePackage);
     }
 
@@ -252,6 +269,8 @@ public class PackageHandler {
         // 添加至缓冲区
         packageBuffer.get().add(dataResponsePackage);
 
+        // log.debug("packageBuffer.get()={}", packageBuffer.get().size());
+
         if (packageBuffer.get().size() >= Globals.MAX_LOG_ITEM) {
             // buffer满,记录满的时间
             bufferFullTimeCalculator.get().end();
@@ -259,6 +278,7 @@ public class PackageHandler {
             // 新建缓冲区
             List<DataResponsePackage> fulledBuffer = packageBuffer.get();
             long bufferId = bufferIndex.get();
+            
             packageBuffer.set(new ArrayList<>(Globals.MAX_LOG_ITEM));
             Long walId;
             if (enableWal) {
@@ -278,6 +298,13 @@ public class PackageHandler {
 
             bufferIndex.set(bufferId + 1);
             bufferFullTimeCalculator.get().begin();
+
+            // 查看有无wal待恢复
+            if(enableWal && replayableWalIds.size() > 0) {
+                Long replayableWalId = replayableWalIds.poll();
+                log.info("从队列中取出待恢复wal:{}", replayableWalId);
+                addWalToBuffer(replayableWalId);
+            }
         }
     }