|
@@ -15,6 +15,7 @@ import db.util.SpentTimeCalculator;
|
|
|
import db.util.Util;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
+import lombok.NoArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
@@ -25,7 +26,9 @@ import javax.annotation.PostConstruct;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
+import java.util.Queue;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
@Component
|
|
@@ -64,13 +67,24 @@ public class PackageHandler {
|
|
|
@Value("${debug.justReceivePacket}")
|
|
|
private Boolean isJustReceivePacket;
|
|
|
|
|
|
- private long bufferIndex = 0;
|
|
|
- private SpentTimeCalculator bufferFullTimeCalculator = SpentTimeCalculator.create().begin();
|
|
|
-
|
|
|
- private List<DataResponsePackage> packageBuffer = new ArrayList<>(Globals.MAX_LOG_ITEM);
|
|
|
+ private ThreadLocal<Long> bufferIndex = new ThreadLocal<>();
|
|
|
+ private ThreadLocal<SpentTimeCalculator> bufferFullTimeCalculator = new ThreadLocal<>();
|
|
|
+ private ThreadLocal<List<DataResponsePackage>> packageBuffer = new ThreadLocal<>();
|
|
|
|
|
|
private DelayQueue<TsdbTask> tsdbTaskBuffer = new DelayQueue<>();
|
|
|
private DelayQueue<DiskTask> diskTaskBuffer = new DelayQueue<>();
|
|
|
+ private Queue<Long> replayableWalIds = new ConcurrentLinkedQueue<>();
|
|
|
+
|
|
|
+ public void initBuffer() {
|
|
|
+ bufferIndex.set(0L);
|
|
|
+ bufferFullTimeCalculator.set(SpentTimeCalculator.create().begin());
|
|
|
+ packageBuffer.set(new ArrayList<>(Globals.MAX_LOG_ITEM));
|
|
|
+ if (enableWal && replayableWalIds.size() > 0) {
|
|
|
+ Long replayableWalId = replayableWalIds.poll();
|
|
|
+ log.info("从队列中取出待恢复wal:{}", replayableWalId);
|
|
|
+ addWalToBuffer(replayableWalId);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Data
|
|
|
@AllArgsConstructor
|
|
@@ -153,8 +167,8 @@ public class PackageHandler {
|
|
|
if (!ObjectUtils.isEmpty(walLists)) {
|
|
|
log.info("检测到wal文件:{},开始恢复", walLists);
|
|
|
for (Long walId : walLists) {
|
|
|
- List<WalItem> walItems = walFileService.readWal(walId);
|
|
|
- replayWal(walId, walItems);
|
|
|
+ log.debug("开始重放wal文件{}", walId);
|
|
|
+ replayWal(walId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -162,9 +176,10 @@ public class PackageHandler {
|
|
|
/**
|
|
|
* 重放 wal 文件
|
|
|
*
|
|
|
- * @param walItems
|
|
|
+ * @param walId
|
|
|
*/
|
|
|
- public void replayWal(long walId, List<WalItem> walItems) {
|
|
|
+ public void replayWal(long walId) {
|
|
|
+ List<WalItem> walItems = walFileService.readWal(walId);
|
|
|
if (walItems.size() == Globals.MAX_LOG_ITEM) {
|
|
|
log.info("wal文件:{}数目达到Max_Item,直接放入磁盘任务队列", walId);
|
|
|
List<DataResponsePackage> target = new ArrayList<>(Globals.MAX_LOG_ITEM);
|
|
@@ -175,23 +190,33 @@ public class PackageHandler {
|
|
|
target.add(dataResponsePackage);
|
|
|
}
|
|
|
saveFulledPackageBuffer(target, -1, walId);
|
|
|
+ log.info("恢复{}条wal", walItems.size());
|
|
|
// 直接保存
|
|
|
} else if (walItems.size() < Globals.MAX_LOG_ITEM) {
|
|
|
// 直接写入buffer
|
|
|
- log.info("wal文件:{}数目不足Max_Item,直接放入缓冲区", walId);
|
|
|
- // TODO 多个"数目不足"文件可能存在问题
|
|
|
- walFileService.openWalFile(walId);
|
|
|
- for (WalItem walItem : walItems) {
|
|
|
- DataResponsePackage dataResponsePackage = new DataResponsePackage(walItem.getProbeId(),
|
|
|
- walItem.getPort(), walItem.getDataStrategy(), walItem.getClockSync(),
|
|
|
- walItem.getStamp(), walItem.getRawPacket());
|
|
|
- addToPackageBuffer(dataResponsePackage);
|
|
|
- }
|
|
|
+ log.info("wal文件:{}数目不足Max_Item,直接放入待恢复队列", walId);
|
|
|
+ replayableWalIds.add(walId);
|
|
|
} else {
|
|
|
// 不应到这里
|
|
|
log.warn("[replayWal] walItems.size() > Globals.MAX_LOG_ITEM");
|
|
|
return;
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 重放数目不足MaxItem的wal
|
|
|
+ *
|
|
|
+ * @param walId
|
|
|
+ */
|
|
|
+ public void addWalToBuffer(Long walId) {
|
|
|
+ walFileService.openWalFile(walId);
|
|
|
+ List<WalItem> walItems = walFileService.readWal(walId);
|
|
|
+ for (WalItem walItem : walItems) {
|
|
|
+ DataResponsePackage dataResponsePackage = new DataResponsePackage(walItem.getProbeId(),
|
|
|
+ walItem.getPort(), walItem.getDataStrategy(), walItem.getClockSync(),
|
|
|
+ walItem.getStamp(), walItem.getRawPacket());
|
|
|
+ addToPackageBuffer(dataResponsePackage);
|
|
|
+ }
|
|
|
|
|
|
log.info("恢复{}条wal", walItems.size());
|
|
|
}
|
|
@@ -219,9 +244,14 @@ public class PackageHandler {
|
|
|
log.debug("开始处理探针上传的数据包:{}", dataResponsePackage);
|
|
|
statisticService.incrDataCount();
|
|
|
|
|
|
- if(isJustReceivePacket) return;
|
|
|
+ if (isJustReceivePacket) return;
|
|
|
+
|
|
|
+ if (bufferIndex.get() == null) {
|
|
|
+ log.info("初始化threadLocal");
|
|
|
+ initBuffer();
|
|
|
+ }
|
|
|
|
|
|
- if (enableWal) { // 生成 WAL 文件
|
|
|
+ if (enableWal) { // 生成 WAL 文件
|
|
|
WalItem walItem = new WalItem(dataResponsePackage.getProbeId(), dataResponsePackage.getPort(),
|
|
|
dataResponsePackage.getDataStrategy(), dataResponsePackage.getClockSync(),
|
|
|
dataResponsePackage.getTimestamp(), dataResponsePackage.getRawPacket());
|
|
@@ -232,6 +262,7 @@ public class PackageHandler {
|
|
|
log.error("添加walItem到wal文件失败,walItem:{}", walItem, e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
addToPackageBuffer(dataResponsePackage);
|
|
|
}
|
|
|
|
|
@@ -242,15 +273,19 @@ public class PackageHandler {
|
|
|
*/
|
|
|
public void addToPackageBuffer(DataResponsePackage dataResponsePackage) {
|
|
|
// 添加至缓冲区
|
|
|
- packageBuffer.add(dataResponsePackage);
|
|
|
+ packageBuffer.get().add(dataResponsePackage);
|
|
|
+
|
|
|
+ // log.debug("packageBuffer.get()={}", packageBuffer.get().size());
|
|
|
|
|
|
- if (packageBuffer.size() >= Globals.MAX_LOG_ITEM) {
|
|
|
+ if (packageBuffer.get().size() >= Globals.MAX_LOG_ITEM) {
|
|
|
// buffer满,记录满的时间
|
|
|
- bufferFullTimeCalculator.end();
|
|
|
+ bufferFullTimeCalculator.get().end();
|
|
|
|
|
|
// 新建缓冲区
|
|
|
- List<DataResponsePackage> fulledBuffer = packageBuffer;
|
|
|
- packageBuffer = new ArrayList<>(Globals.MAX_LOG_ITEM);
|
|
|
+ List<DataResponsePackage> fulledBuffer = packageBuffer.get();
|
|
|
+ long bufferId = bufferIndex.get();
|
|
|
+
|
|
|
+ packageBuffer.set(new ArrayList<>(Globals.MAX_LOG_ITEM));
|
|
|
Long walId;
|
|
|
if (enableWal) {
|
|
|
walId = walFileService.getAppendableWalId();
|
|
@@ -259,16 +294,23 @@ public class PackageHandler {
|
|
|
walId = -1L;
|
|
|
}
|
|
|
|
|
|
- log.info("[{}]缓冲区满,缓冲区大小:{},缓冲区开始接收数据时间:{}, 缓冲区满花费时间:{}ms", bufferIndex,
|
|
|
+ log.info("[{}]缓冲区满,缓冲区大小:{},缓冲区开始接收数据时间:{}, 缓冲区满花费时间:{}ms", bufferId,
|
|
|
fulledBuffer.size(),
|
|
|
- Util.uTCMilliSecondsToDateStringWithMs(bufferFullTimeCalculator.getStartTime()),
|
|
|
- bufferFullTimeCalculator.getSpendTime());
|
|
|
+ Util.uTCMilliSecondsToDateStringWithMs(bufferFullTimeCalculator.get().getStartTime()),
|
|
|
+ bufferFullTimeCalculator.get().getSpendTime());
|
|
|
|
|
|
// 添加任务
|
|
|
- diskTaskBuffer.put(new DiskTask(fulledBuffer, bufferIndex, walId, System.currentTimeMillis()));
|
|
|
+ diskTaskBuffer.put(new DiskTask(fulledBuffer, bufferId, walId, System.currentTimeMillis()));
|
|
|
|
|
|
- bufferIndex++;
|
|
|
- bufferFullTimeCalculator.begin();
|
|
|
+ bufferIndex.set(bufferId + 1);
|
|
|
+ bufferFullTimeCalculator.get().begin();
|
|
|
+
|
|
|
+ // 查看有无wal待恢复
|
|
|
+ if (enableWal && replayableWalIds.size() > 0) {
|
|
|
+ Long replayableWalId = replayableWalIds.poll();
|
|
|
+ log.info("从队列中取出待恢复wal:{}", replayableWalId);
|
|
|
+ addWalToBuffer(replayableWalId);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -302,7 +344,8 @@ public class PackageHandler {
|
|
|
* @param packages 待保存报文,报文数等于MaxItem数
|
|
|
* @return
|
|
|
*/
|
|
|
- public SavePackageResult saveMultiDataResponsePackage(List<DataResponsePackage> packages, long bufferId, Long walId) {
|
|
|
+ public SavePackageResult saveMultiDataResponsePackage(List<DataResponsePackage> packages, long bufferId,
|
|
|
+ Long walId) {
|
|
|
|
|
|
SavePackageResult savePackageResult = new SavePackageResult();
|
|
|
|
|
@@ -374,10 +417,10 @@ public class PackageHandler {
|
|
|
log.debug("开始处理探针上传的数据包:{}", dataResponsePackage);
|
|
|
statisticService.incrDataCount();
|
|
|
|
|
|
-
|
|
|
log.debug("开始保存至磁盘");
|
|
|
// 封装 LogItem
|
|
|
- LogItem logItem = new LogItem((short) 0x55, dataResponsePackage.getTimestamp(), dataResponsePackage.getRawPacket());
|
|
|
+ LogItem logItem = new LogItem((short) 0x55, dataResponsePackage.getTimestamp(),
|
|
|
+ dataResponsePackage.getRawPacket());
|
|
|
|
|
|
long longIndex;
|
|
|
try {
|
|
@@ -501,7 +544,7 @@ public class PackageHandler {
|
|
|
if (insertRes != -1) {
|
|
|
statisticService.increDataCountTsdbSuccess(insertCount);
|
|
|
saveResult = true;
|
|
|
-// log.debug("插入ResoDB结束,point={},插入{}条", dbVals, insertCount);
|
|
|
+ // log.debug("插入ResoDB结束,point={},插入{}条", dbVals, insertCount);
|
|
|
log.info("插入时序数据库花费时间:{}ms 插入{}条", spentTimeCalculator.getSpendTime(), insertCount);
|
|
|
} else {
|
|
|
log.error("插入时序数据库花费时间:{}ms 插入失败", spentTimeCalculator.getSpendTime());
|
|
@@ -521,7 +564,6 @@ public class PackageHandler {
|
|
|
return saveResult;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
public void handleScanResponsePackage(ScanResponsePackage scanResponsePackage) {
|
|
|
log.debug("开始处理数据包ScanResponsePackage:{}", scanResponsePackage);
|
|
|
|
|
@@ -539,6 +581,4 @@ public class PackageHandler {
|
|
|
log.debug("开始处理数据包ListenResponsePackage: {}", packet);
|
|
|
log.debug("成功处理数据包{}", packet);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
}
|