5 Commits ea1ba21179 ... 00da132a0e

Author SHA1 Message Date
  FutureYu 00da132a0e Merge branch 'dev-thread' into dev 1 year ago
  FutureYu e0934a5fe3 修改注释 1 year ago
  FutureYu fac24a4dae 修复读取wal失败的问题 1 year ago
  FutureYu 8cdd67e909 修复threadLocal无法正常初始化的问题 1 year ago
  FutureYu 15af17f07c 增加thread local 1 year ago

+ 21 - 20
file-api/src/main/java/db/service/WalFileService.java

@@ -24,14 +24,15 @@ public class WalFileService {
     @Value("${system.wal-file.suffix}")
     private String suffix;
 
-    String appendableFileName;
-    Long appendableWalId;
+    ThreadLocal<String> appendableFileName = new ThreadLocal<>();
+    ThreadLocal<Long> appendableWalId = new ThreadLocal<>();
+    ThreadLocal<FileOutputStream> walWriter = new ThreadLocal<>();
+
 
     // 追加写入
-    FileOutputStream walWriter;
 
     public Long getAppendableWalId() {
-        return appendableWalId;
+        return appendableWalId.get();
     }
 
 
@@ -39,47 +40,47 @@ public class WalFileService {
      * 打开新的 wal 文件并创建 writer
      */
     public void openWalFile() {
-        appendableFileName = prefix + appendableWalId + suffix;
-        walWriter = FileUtils.openFile(appendableFileName);
-        log.info("打开wal文件:{}作为wal写入目标文件", appendableFileName);
+        appendableFileName.set(prefix + appendableWalId.get() + suffix);
+        walWriter.set(FileUtils.openFile(appendableFileName.get()));
+        log.info("打开wal文件:{}作为wal写入目标文件", appendableFileName.get());
     }
 
     /**
      * 打开指定 walId 的 wal 文件并创建 writer
      */
     public void openWalFile(long walId) {
-        appendableWalId = walId;
+        appendableWalId.set(walId);
         openWalFile();
     }
 
     @PreDestroy
     public void closeWalFile() {
-        if (walWriter == null) {
+        if (walWriter.get() == null) {
             return;
         }
         try {
-            walWriter.close();
-            walWriter = null;
-            log.info("关闭wal文件:{}", appendableFileName);
+            walWriter.get().close();
+            walWriter.set(null);
+            log.info("关闭wal文件:{}", appendableFileName.get());
         } catch (IOException e) {
-            log.error("无法关闭wal文件:{}", appendableFileName, e);
-            walWriter = null;
+            log.error("无法关闭wal文件:{}", appendableFileName.get(), e);
+            walWriter.set(null);
         }
     }
 
     public void appendWal(WalItem walItem) throws IOException {
         // 如果未打开wal文件,则打开
-        if (appendableWalId == null || walWriter == null) {
-            appendableWalId = System.currentTimeMillis();
+        if (appendableWalId.get() == null || walWriter.get() == null) {
+            appendableWalId.set(System.currentTimeMillis());
             openWalFile();
         }
 
         // 写入数据
         byte[] walItemPayload = walItem.getPayload();
 
-        walWriter.write(walItemPayload);
+        walWriter.get().write(walItemPayload);
 
-        log.debug("将{}写入wal文件{}", walItem, appendableFileName);
+        log.debug("将{}写入wal文件{}", walItem, appendableFileName.get());
     }
 
     /**
@@ -147,8 +148,8 @@ public class WalFileService {
      */
     public void clear() {
         closeWalFile();
-        FileUtils.deleteFile(appendableFileName);
-        log.info("清空wal文件{}", appendableFileName);
+        FileUtils.deleteFile(appendableFileName.get());
+        log.info("清空wal文件{}", appendableFileName.get());
     }
 
     /**

+ 1 - 1
insert-app/src/main/java/db/controller/ProbeController.java

@@ -61,7 +61,7 @@ public class ProbeController {
         log.info("==> [api模拟插入随机数据] 收到请求");
 
         long startTime = System.currentTimeMillis();
-        probeService.genData(dataCount);
+        // probeService.genData(dataCount);
         long endTime = System.currentTimeMillis();
 
         String res = String.format("已模拟插入随机数据%d条", dataCount);

+ 77 - 37
insert-app/src/main/java/db/handler/PackageHandler.java

@@ -15,6 +15,7 @@ import db.util.SpentTimeCalculator;
 import db.util.Util;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -25,7 +26,9 @@ import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.*;
 
 @Component
@@ -64,13 +67,24 @@ public class PackageHandler {
     @Value("${debug.justReceivePacket}")
     private Boolean isJustReceivePacket;
 
-    private long bufferIndex = 0;
-    private SpentTimeCalculator bufferFullTimeCalculator = SpentTimeCalculator.create().begin();
-
-    private List<DataResponsePackage> packageBuffer = new ArrayList<>(Globals.MAX_LOG_ITEM);
+    private ThreadLocal<Long> bufferIndex = new ThreadLocal<>();
+    private ThreadLocal<SpentTimeCalculator> bufferFullTimeCalculator = new ThreadLocal<>();
+    private ThreadLocal<List<DataResponsePackage>> packageBuffer = new ThreadLocal<>();
 
     private DelayQueue<TsdbTask> tsdbTaskBuffer = new DelayQueue<>();
     private DelayQueue<DiskTask> diskTaskBuffer = new DelayQueue<>();
+    private Queue<Long> replayableWalIds = new ConcurrentLinkedQueue<>();
+
+    public void initBuffer() {
+        bufferIndex.set(0L);
+        bufferFullTimeCalculator.set(SpentTimeCalculator.create().begin());
+        packageBuffer.set(new ArrayList<>(Globals.MAX_LOG_ITEM));
+        if (enableWal && replayableWalIds.size() > 0) {
+            Long replayableWalId = replayableWalIds.poll();
+            log.info("从队列中取出待恢复wal:{}", replayableWalId);
+            addWalToBuffer(replayableWalId);
+        }
+    }
 
     @Data
     @AllArgsConstructor
@@ -153,8 +167,8 @@ public class PackageHandler {
         if (!ObjectUtils.isEmpty(walLists)) {
             log.info("检测到wal文件:{},开始恢复", walLists);
             for (Long walId : walLists) {
-                List<WalItem> walItems = walFileService.readWal(walId);
-                replayWal(walId, walItems);
+                log.debug("开始重放wal文件{}", walId);
+                replayWal(walId);
             }
         }
     }
@@ -162,9 +176,10 @@ public class PackageHandler {
     /**
      * 重放 wal 文件
      *
-     * @param walItems
+     * @param walId
      */
-    public void replayWal(long walId, List<WalItem> walItems) {
+    public void replayWal(long walId) {
+        List<WalItem> walItems = walFileService.readWal(walId);
         if (walItems.size() == Globals.MAX_LOG_ITEM) {
             log.info("wal文件:{}数目达到Max_Item,直接放入磁盘任务队列", walId);
             List<DataResponsePackage> target = new ArrayList<>(Globals.MAX_LOG_ITEM);
@@ -175,23 +190,33 @@ public class PackageHandler {
                 target.add(dataResponsePackage);
             }
             saveFulledPackageBuffer(target, -1, walId);
+            log.info("恢复{}条wal", walItems.size());
             // 直接保存
         } else if (walItems.size() < Globals.MAX_LOG_ITEM) {
             // 直接写入buffer
-            log.info("wal文件:{}数目不足Max_Item,直接放入缓冲区", walId);
-            // TODO 多个"数目不足"文件可能存在问题
-            walFileService.openWalFile(walId);
-            for (WalItem walItem : walItems) {
-                DataResponsePackage dataResponsePackage = new DataResponsePackage(walItem.getProbeId(),
-                        walItem.getPort(), walItem.getDataStrategy(), walItem.getClockSync(),
-                        walItem.getStamp(), walItem.getRawPacket());
-                addToPackageBuffer(dataResponsePackage);
-            }
+            log.info("wal文件:{}数目不足Max_Item,直接放入待恢复队列", walId);
+            replayableWalIds.add(walId);
         } else {
             // 不应到这里
             log.warn("[replayWal] walItems.size() > Globals.MAX_LOG_ITEM");
             return;
         }
+    }
+
+    /**
+     * 重放数目不足MaxItem的wal
+     *
+     * @param walId
+     */
+    public void addWalToBuffer(Long walId) {
+        walFileService.openWalFile(walId);
+        List<WalItem> walItems = walFileService.readWal(walId);
+        for (WalItem walItem : walItems) {
+            DataResponsePackage dataResponsePackage = new DataResponsePackage(walItem.getProbeId(),
+                    walItem.getPort(), walItem.getDataStrategy(), walItem.getClockSync(),
+                    walItem.getStamp(), walItem.getRawPacket());
+            addToPackageBuffer(dataResponsePackage);
+        }
 
         log.info("恢复{}条wal", walItems.size());
     }
@@ -219,9 +244,14 @@ public class PackageHandler {
         log.debug("开始处理探针上传的数据包:{}", dataResponsePackage);
         statisticService.incrDataCount();
 
-        if(isJustReceivePacket) return;
+        if (isJustReceivePacket) return;
+
+        if (bufferIndex.get() == null) {
+            log.info("初始化threadLocal");
+            initBuffer();
+        }
 
-        if (enableWal) {  // 生成 WAL 文件
+        if (enableWal) { // 生成 WAL 文件
             WalItem walItem = new WalItem(dataResponsePackage.getProbeId(), dataResponsePackage.getPort(),
                     dataResponsePackage.getDataStrategy(), dataResponsePackage.getClockSync(),
                     dataResponsePackage.getTimestamp(), dataResponsePackage.getRawPacket());
@@ -232,6 +262,7 @@ public class PackageHandler {
                 log.error("添加walItem到wal文件失败,walItem:{}", walItem, e);
             }
         }
+
         addToPackageBuffer(dataResponsePackage);
     }
 
@@ -242,15 +273,19 @@ public class PackageHandler {
      */
     public void addToPackageBuffer(DataResponsePackage dataResponsePackage) {
         // 添加至缓冲区
-        packageBuffer.add(dataResponsePackage);
+        packageBuffer.get().add(dataResponsePackage);
+
+        // log.debug("packageBuffer.get()={}", packageBuffer.get().size());
 
-        if (packageBuffer.size() >= Globals.MAX_LOG_ITEM) {
+        if (packageBuffer.get().size() >= Globals.MAX_LOG_ITEM) {
             // buffer满,记录满的时间
-            bufferFullTimeCalculator.end();
+            bufferFullTimeCalculator.get().end();
 
             // 新建缓冲区
-            List<DataResponsePackage> fulledBuffer = packageBuffer;
-            packageBuffer = new ArrayList<>(Globals.MAX_LOG_ITEM);
+            List<DataResponsePackage> fulledBuffer = packageBuffer.get();
+            long bufferId = bufferIndex.get();
+
+            packageBuffer.set(new ArrayList<>(Globals.MAX_LOG_ITEM));
             Long walId;
             if (enableWal) {
                 walId = walFileService.getAppendableWalId();
@@ -259,16 +294,23 @@ public class PackageHandler {
                 walId = -1L;
             }
 
-            log.info("[{}]缓冲区满,缓冲区大小:{},缓冲区开始接收数据时间:{}, 缓冲区满花费时间:{}ms", bufferIndex,
+            log.info("[{}]缓冲区满,缓冲区大小:{},缓冲区开始接收数据时间:{}, 缓冲区满花费时间:{}ms", bufferId,
                     fulledBuffer.size(),
-                    Util.uTCMilliSecondsToDateStringWithMs(bufferFullTimeCalculator.getStartTime()),
-                    bufferFullTimeCalculator.getSpendTime());
+                    Util.uTCMilliSecondsToDateStringWithMs(bufferFullTimeCalculator.get().getStartTime()),
+                    bufferFullTimeCalculator.get().getSpendTime());
 
             // 添加任务
-            diskTaskBuffer.put(new DiskTask(fulledBuffer, bufferIndex, walId, System.currentTimeMillis()));
+            diskTaskBuffer.put(new DiskTask(fulledBuffer, bufferId, walId, System.currentTimeMillis()));
 
-            bufferIndex++;
-            bufferFullTimeCalculator.begin();
+            bufferIndex.set(bufferId + 1);
+            bufferFullTimeCalculator.get().begin();
+
+            // 查看有无wal待恢复
+            if (enableWal && replayableWalIds.size() > 0) {
+                Long replayableWalId = replayableWalIds.poll();
+                log.info("从队列中取出待恢复wal:{}", replayableWalId);
+                addWalToBuffer(replayableWalId);
+            }
         }
     }
 
@@ -302,7 +344,8 @@ public class PackageHandler {
      * @param packages 待保存报文,报文数等于MaxItem数
      * @return
      */
-    public SavePackageResult saveMultiDataResponsePackage(List<DataResponsePackage> packages, long bufferId, Long walId) {
+    public SavePackageResult saveMultiDataResponsePackage(List<DataResponsePackage> packages, long bufferId,
+                                                          Long walId) {
 
         SavePackageResult savePackageResult = new SavePackageResult();
 
@@ -374,10 +417,10 @@ public class PackageHandler {
         log.debug("开始处理探针上传的数据包:{}", dataResponsePackage);
         statisticService.incrDataCount();
 
-
         log.debug("开始保存至磁盘");
         // 封装 LogItem
-        LogItem logItem = new LogItem((short) 0x55, dataResponsePackage.getTimestamp(), dataResponsePackage.getRawPacket());
+        LogItem logItem = new LogItem((short) 0x55, dataResponsePackage.getTimestamp(),
+                dataResponsePackage.getRawPacket());
 
         long longIndex;
         try {
@@ -501,7 +544,7 @@ public class PackageHandler {
             if (insertRes != -1) {
                 statisticService.increDataCountTsdbSuccess(insertCount);
                 saveResult = true;
-//                log.debug("插入ResoDB结束,point={},插入{}条", dbVals, insertCount);
+                // log.debug("插入ResoDB结束,point={},插入{}条", dbVals, insertCount);
                 log.info("插入时序数据库花费时间:{}ms 插入{}条", spentTimeCalculator.getSpendTime(), insertCount);
             } else {
                 log.error("插入时序数据库花费时间:{}ms 插入失败", spentTimeCalculator.getSpendTime());
@@ -521,7 +564,6 @@ public class PackageHandler {
         return saveResult;
     }
 
-
     public void handleScanResponsePackage(ScanResponsePackage scanResponsePackage) {
         log.debug("开始处理数据包ScanResponsePackage:{}", scanResponsePackage);
 
@@ -539,6 +581,4 @@ public class PackageHandler {
         log.debug("开始处理数据包ListenResponsePackage: {}", packet);
         log.debug("成功处理数据包{}", packet);
     }
-
-
 }

+ 2 - 1
insert-app/src/main/java/db/netty/NettyServer.java

@@ -70,6 +70,7 @@ public class NettyServer {
 
         Encoder encoder = new Encoder();
         Decoder decoder = new Decoder();
+        NettyServerHandler nettyServerHandler = new NettyServerHandler();
         decoder.setShowRecvBytes(this.showRecvBytes);
 
         bootstrap.group(group)
@@ -84,7 +85,7 @@ public class NettyServer {
                     public void initChannel(final Channel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         p.addLast(encoder, decoder);
-                        p.addLast(new NettyServerHandler());
+                        p.addLast(nettyServerHandler);
                     }
                 });
         if (useMultiThread) {

+ 3 - 5
insert-app/src/main/java/db/netty/NettyServerHandler.java

@@ -1,5 +1,7 @@
 package db.netty;
 
+import org.springframework.beans.factory.annotation.Autowired;
+
 import db.entity.packages.BasePackage;
 import db.entity.packages.DataResponsePackage;
 import db.entity.packages.ListenResponsePackage;
@@ -9,15 +11,11 @@ import db.util.SpringUtils;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.ChannelHandler.Sharable;
-import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
-import java.io.IOException;
-
 @Slf4j
-@Data
+@Sharable
 public class NettyServerHandler extends SimpleChannelInboundHandler<BasePackage> {
-    private boolean useSeperateThread = false;
 
     private PackageHandler packageHandler;
 

+ 9 - 9
insert-app/src/main/java/db/service/ProbeService.java

@@ -29,8 +29,8 @@ public class ProbeService {
     @Autowired
     private NettyServer server;
 
-    @Autowired
-    private PackageHandler packageHandler;
+    // @Autowired
+    // private PackageHandler packageHandler;
 
     public Map<Short, Probe> getProbes() {
         return probes;
@@ -181,11 +181,11 @@ public class ProbeService {
     }
 
 
-    public void genData(Integer dataCount) {
-        DataGenerator dataGenerator = new DataGenerator();
-        List<DataResponsePackage> packages = dataGenerator.generate(dataCount);
-        for (DataResponsePackage dataPackage: packages) {
-            packageHandler.handleDataResponsePackage(dataPackage);
-        }
-    }
+    // public void genData(Integer dataCount) {
+    //     DataGenerator dataGenerator = new DataGenerator();
+    //     List<DataResponsePackage> packages = dataGenerator.generate(dataCount);
+    //     for (DataResponsePackage dataPackage: packages) {
+    //         packageHandler.handleDataResponsePackage(dataPackage);
+    //     }
+    // }
 }