Quellcode durchsuchen

优化时间范围查询和条件查询

wenjie vor 2 Jahren
Ursprung
Commit
bcb6646c18

+ 82 - 17
file-api/src/main/java/db/service/LogFileService.java

@@ -6,6 +6,7 @@ import db.entity.LogItem;
 import db.mapper.LogFileMetaMapper;
 import db.util.ConvertUtils;
 import db.util.FileUtils;
+import db.util.SpentTimeCalculator;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -14,7 +15,10 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
 
 @Service
 @Data
@@ -36,7 +40,7 @@ public class LogFileService {
     private LogFileMeta logFileMeta; // 当前应写入的文件头信息(用于单logItem写入,废弃)
 
     public byte[] queryLog(long loc) throws IOException {
-        log.debug("读取{}", loc);
+//        log.debug("读取{}", loc);
         int fileIndex = (int) (loc % Globals.MAX_LOG_ITEM);
         long fileId = loc / Globals.MAX_LOG_ITEM;
 
@@ -70,40 +74,99 @@ public class LogFileService {
         return itemContent;
     }
 
-    public List<byte[]> queryLogBetweenTime(LogFileMeta logFileMeta, long startTime, long endTime) throws IOException {
-        List<byte[]> res = new ArrayList<>();
+
+    public ArrayList<byte[]> querySingleLogFile(long fileId, ArrayList<Integer> list) throws IOException {
+
+        ArrayList<byte[]> res = new ArrayList<>();
 
         // 读取fileId文件的头
-        LogFileMeta readLogFile = logFileMetaMapper.getLogFileMetaById(logFileMeta.getId());
+        SpentTimeCalculator spentTimeCalculator = SpentTimeCalculator.create();
+        spentTimeCalculator.begin();
 
-        byte[] headerContent = new byte[Globals.MAX_LOG_ITEM * 4 + 28];
+        LogFileMeta readLogFile = logFileMetaMapper.getLogFileMetaById(fileId);
+
+        spentTimeCalculator.end();
+        log.info("sqlite查询ID:{},耗时:{}", fileId, spentTimeCalculator.getSpendTime());
+
+        spentTimeCalculator.begin();
+        byte[] headerContent = new byte[readLogFile.getHeaderTotalLength()];
         FileUtils.readFile(readLogFile.getFileName(), headerContent, 0);
+
         boolean isReadSuccess = readLogFile.setPayload(headerContent);
         if (!isReadSuccess) {
             log.error("logFile文件完整性校验失败,文件名:{}", readLogFile.getFileName());
             return null;
         }
 
-        // 遍历索引
-        for (int i = 0; i < logFileMeta.getCount(); i++) {
-            int fileOffset = readLogFile.getIndices()[i];
-            // 读取logItem的header
-            byte[] itemHeaderContent = new byte[16];
-            FileUtils.readFile(readLogFile.getFileName(), itemHeaderContent, fileOffset);
+        byte[] allLogItems = new byte[readLogFile.getBodyLength()];
+        FileUtils.readFile(readLogFile.getFileName(), allLogItems, readLogFile.getHeaderTotalLength());
+
+        for (Integer index : list) {
             LogItem logItem = new LogItem();
+            //logItem 在 logFile 中的偏移
+            int fileOffset = readLogFile.getIndices()[index];
+
+            //文件的校验
+            byte[] itemHeaderContent = new byte[16];
+            System.arraycopy(allLogItems, fileOffset - readLogFile.getHeaderTotalLength(),
+                    itemHeaderContent, 0, 16);
             isReadSuccess = logItem.setHeader(itemHeaderContent);
-            ;
             if (!isReadSuccess) {
                 log.error("logItem文件完整性校验失败,文件名:{},文件偏移量:{}", readLogFile.getFileName(), fileOffset);
                 return null;
             }
+
+            byte[] itemContent = new byte[logItem.getLength() - 16];
+            System.arraycopy(allLogItems, fileOffset - readLogFile.getHeaderTotalLength() + 16,
+                    itemContent, 0, logItem.getLength() - 16);
+
+            res.add(itemContent);
+
+        }
+        spentTimeCalculator.end();
+        log.info("收集fileId文件的数据:{},耗时:{}", readLogFile.getFileName(), spentTimeCalculator.getSpendTime());
+
+        return res;
+    }
+
+    public List<byte[]> queryLogBetweenTimeRange(LogFileMeta logFileMeta, long startTime, long endTime) throws IOException {
+
+        List<byte[]> res = new ArrayList<>();
+
+        SpentTimeCalculator spentTimeCalculator = SpentTimeCalculator.create();
+        spentTimeCalculator.begin();
+
+        byte[] headerContent = new byte[logFileMeta.getHeaderTotalLength()];
+        FileUtils.readFile(logFileMeta.getFileName(), headerContent, 0);
+        boolean isReadSuccess = logFileMeta.setPayload(headerContent);
+        if (!isReadSuccess) {
+            log.error("logFile文件完整性校验失败,文件名:{}", logFileMeta.getFileName());
+            return null;
+        }
+
+        byte[] allLogItems = new byte[logFileMeta.getBodyLength()];
+        FileUtils.readFile(logFileMeta.getFileName(), allLogItems, logFileMeta.getHeaderTotalLength());
+
+        int sumOffset = 0;
+
+        for (int i = 0; i < logFileMeta.getCount(); i++) {
+            LogItem logItem = new LogItem();
+            byte[] itemHeaderContent = new byte[16];
+            System.arraycopy(allLogItems, sumOffset, itemHeaderContent, 0, 16);
+            isReadSuccess = logItem.setHeader(itemHeaderContent);
+            if (!isReadSuccess) {
+                log.error("logItem文件完整性校验失败,文件名:{},文件偏移量:{}", logFileMeta.getFileName(), sumOffset);
+                return null;
+            }
             if (logItem.getStamp() >= startTime && logItem.getStamp() <= endTime) {
-                // 读取logItem的body
                 byte[] itemContent = new byte[logItem.getLength() - 16];
-                FileUtils.readFile(readLogFile.getFileName(), itemContent, fileOffset + 16);
+                System.arraycopy(allLogItems, sumOffset + 16, itemContent, 0, logItem.getLength() - 16);
                 res.add(itemContent);
             }
+            sumOffset += logItem.getLength();
         }
+        spentTimeCalculator.end();
+        log.debug("queryByTime,处理logFile:{},时间:{}", logFileMeta.getFileName(), spentTimeCalculator.getSpendTime());
         return res;
     }
 
@@ -147,9 +210,9 @@ public class LogFileService {
      * @throws IOException
      */
     public List<Double> appendLogs(List<LogItem> logItems, int totalRawPacketSize, long walStamp) throws IOException {
-        if(logItems.size() < Globals.MAX_LOG_ITEM) {
+        if (logItems.size() < Globals.MAX_LOG_ITEM) {
             return appendLogsLessThanOneFile(logItems, totalRawPacketSize, walStamp);
-        } else if(logItems.size() == Globals.MAX_LOG_ITEM) {
+        } else if (logItems.size() == Globals.MAX_LOG_ITEM) {
             return appendLogsEqualThanOneFile(logItems, totalRawPacketSize, walStamp);
         } else {
             // 不应该到这里
@@ -160,6 +223,7 @@ public class LogFileService {
 
     /**
      * 插入少于MaxItem条数据
+     *
      * @param logItems
      * @param totalRawPacketSize
      * @return
@@ -261,6 +325,7 @@ public class LogFileService {
 
     /**
      * 插入等于MaxItem条数据
+     *
      * @param logItems
      * @param totalRawPacketSize
      * @return

+ 1 - 2
query-app/src/main/java/db/controller/QueryController.java

@@ -9,7 +9,6 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
-import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.*;
@@ -34,7 +33,7 @@ public class QueryController {
             if (map.keySet().size() == 0) {
                 queryRes = queryService.queryByTime(start, end);
             } else {
-                queryRes = queryService.queryByTimeAndCondition(start, end, map);
+                queryRes = queryService.queryByTimeWithCondition(start, end, map);
             }
 
             return queryRes;

+ 61 - 25
query-app/src/main/java/db/service/QueryService.java

@@ -1,8 +1,12 @@
 package db.service;
 
-import db.*;
+import db.DBApiEntry;
+import db.DBVal;
+import db.Point;
 import db.entity.LogFileMeta;
 import db.mapper.LogFileMetaMapper;
+import db.util.ConvertFormat;
+import db.util.SpentTimeCalculator;
 import db.util.Util;
 import lombok.extern.slf4j.Slf4j;
 import org.pcap4j.core.*;
@@ -14,7 +18,10 @@ import org.springframework.stereotype.Service;
 import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.text.ParseException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 @Service
 @Slf4j
@@ -38,6 +45,18 @@ public class QueryService {
     @Value("${server.port}")
     private String serverPort;
 
+    @Value("${ip.host}")
+    private String hostIP;
+
+    @Value("${system.pcap-files.prefix}")
+    private String pcapPrefix;
+
+    @Value("${system.pcap-files.suffix}")
+    private String pcapSuffix;
+
+    @Value("${system.pcap-files.template}")
+    private String pcapTemplate;
+
     private DBApiEntry entry;
 
     @PostConstruct
@@ -45,7 +64,7 @@ public class QueryService {
         this.entry = DBApiEntry.initApiEntry(host, port);
     }
 
-    public String queryByTimeAndCondition(String start, String end, Map<String, String> tags) throws ParseException,
+    public String queryByTimeWithCondition(String start, String end, Map<String, String> tags) throws ParseException,
             IOException, PcapNativeException, NotOpenException {
 
         Point point = new Point(metricName, tags);
@@ -55,32 +74,41 @@ public class QueryService {
 
         DBApiEntry entry = DBApiEntry.initApiEntry(host, port);
 
+        SpentTimeCalculator spentTimeCalculator = SpentTimeCalculator.create();
+        spentTimeCalculator.begin();
         List<DBVal> lists = entry.getHistRaw(point, startTime, endTime);
+        spentTimeCalculator.end();
         log.info("queryByTimeAndCondition,查询条件为:start:{}, end:{}, point:{}", startTime, endTime, point);
-        log.info("查询ResoDB,查询到:{}", lists);
-        // 根据list查对应报文位置
-        List<Long> locs = new ArrayList<>();
-        for (DBVal dbVal : lists) {
-            locs.add(Double.doubleToRawLongBits(dbVal.getValue()));
+        log.info("查询ResoDB,查询到条目:{},查询耗时:{}", lists.size(), spentTimeCalculator.getSpendTime());
+
+        spentTimeCalculator.begin();
+        HashMap<Long, ArrayList<Integer>> map = new ConvertFormat().convertToLongAndGetIndex(lists);
+        spentTimeCalculator.end();
+        log.info("记录分片耗时:{}",spentTimeCalculator.getSpendTime());
+
+        ArrayList<byte[]> logItems = new ArrayList<>();
+        for (Long key : map.keySet()) {
+            logItems.addAll(logFileService.querySingleLogFile(key, map.get(key)));
         }
-        Collections.sort(locs);
 
-        PcapHandle handle = Pcaps.openOffline("./pcap/template.pcap");
-        String pcapName = "" + startTime + endTime + System.currentTimeMillis();
-        PcapDumper dumper = handle.dumpOpen("./pcap/" + pcapName + ".pcap");
+        spentTimeCalculator.begin();
+        PcapHandle handle = Pcaps.openOffline(pcapTemplate);
+        String pcapName = new StringBuilder().append(System.currentTimeMillis())
+                .append(startTime).append(endTime).toString();
+        PcapDumper dumper = handle.dumpOpen(pcapPrefix + pcapName + pcapSuffix);
 
-        for (Long longIndex : locs) {
-            byte[] logItem = logFileService.queryLog(longIndex);
-            log.info("查询磁盘,查询到:{}", logItem);
+        for (byte[] logItem : logItems) {
             UnknownPacket packet = UnknownPacket.newPacket(logItem, 0, logItem.length);
             dumper.dump(packet);
         }
+        spentTimeCalculator.end();
+        log.info("生成Pcap文件时间:{}", spentTimeCalculator.getSpendTime());
 
         dumper.close();
         handle.close();
 
         StringBuilder returnLink = new StringBuilder();
-        returnLink.append("http://").append("127.0.0.1").append(":")
+        returnLink.append("http://").append(hostIP).append(":")
                 .append(serverPort).append("/download?pcapName=").append(pcapName);
 
         log.info("返回链接:{}", returnLink);
@@ -92,29 +120,37 @@ public class QueryService {
         long startTime = Util.dateStringToUTCMilliSeconds(start);
         long endTime = Util.dateStringToUTCMilliSeconds(end);
 
-        log.info("queryByTime,查询条件为:start:{}, end:{}", startTime, endTime);
+        log.debug("queryByTime,查询条件为:start:{}, end:{}", startTime, endTime);
+
+        SpentTimeCalculator spentTimeCalculator = SpentTimeCalculator.create();
+        spentTimeCalculator.begin();
 
         List<LogFileMeta> logFileMetas = logFileMetaMapper.getValidLogFileMetaBetweenTime(startTime, endTime);
-        log.info("queryByTime,查询B+树,查询到:{}", logFileMetas);
 
-        PcapHandle handle = Pcaps.openOffline("./pcap/udp.pcap");
-        String pcapName = "" + startTime + endTime + System.currentTimeMillis();
-        PcapDumper dumper = handle.dumpOpen("./pcap/" + pcapName + ".pcap");
+        spentTimeCalculator.end();
+        log.debug("queryByTime,查询sqlite,查询条目:{},查询时间:{}", logFileMetas.size(), spentTimeCalculator.getSpendTime());
 
-        for (LogFileMeta logFileMeta : logFileMetas) {
-            List<byte[]> bytes = logFileService.queryLogBetweenTime(logFileMeta, startTime, endTime);
+        PcapHandle handle = Pcaps.openOffline(pcapTemplate);
+        String pcapName = new StringBuilder().append(System.currentTimeMillis())
+                .append(startTime).append(endTime).toString();
+        PcapDumper dumper = handle.dumpOpen(pcapPrefix + pcapName + pcapSuffix);
 
+        spentTimeCalculator.begin();
+        for (LogFileMeta logFileMeta : logFileMetas) {
+            List<byte[]> bytes = logFileService.queryLogBetweenTimeRange(logFileMeta, startTime, endTime);
             for (byte[] item : bytes) {
                 UnknownPacket packet = UnknownPacket.newPacket(item, 0, item.length);
                 dumper.dump(packet);
             }
         }
+        spentTimeCalculator.end();
+        log.info("queryByTime,查询磁盘并生成pcap总时间:{}", spentTimeCalculator.getSpendTime());
+
         dumper.close();
         handle.close();
 
-
         StringBuilder returnLink = new StringBuilder();
-        returnLink.append("http://").append("127.0.0.1").append(":")
+        returnLink.append("http://").append(hostIP).append(":")
                 .append(serverPort).append("/download?pcapName=").append(pcapName);
 
         log.info("返回链接:{}", returnLink);

+ 12 - 4
query-app/src/main/resources/application.yml

@@ -11,20 +11,28 @@ system:
   data-files:
     prefix: "data/"
     suffix: ".data"
-    expire-time: 240 # 小时
+    expire-time: 240 # 数据淘汰时间,小时
+    retry-time: 60000 # 尝试重新写入时间,毫秒
   wal-files:
     prefix: "data/wal/"
     suffix: ".wal"
-  retry-times: 3 # 超时次数
-  retry-stamp: 3000 # 超时时间:毫秒
+  pcap-files:
+    prefix: "pcap/"
+    suffix: ".pcap"
+    template: "pcap/template.pcap"
+  sender:
+    retry-times: 3 # 超时次数
+    retry-stamp: 3000 # 超时时间:毫秒
+
 
 ip:
   broadcast: 192.168.66.255
+  host: 127.0.0.1
 
 reso-db:
   host: 192.168.171.132
   port: 9090
-  metric-name: netProbe
+  metric-name: netProbe2
 
 logging:
   file:

+ 2 - 2
query-app/src/test/java/db/QueryTest.java

@@ -29,8 +29,8 @@ public class QueryTest {
         HashMap<String, String> tags = new HashMap<>();
 //        tags.put("pointName", "pointName_test111");
 //        tags.put("status", "3");
-        log.debug("---queryByTimeAndCondition---");
-        queryService.queryByTimeAndCondition(start, end, tags);
+        log.debug("---queryByTimeWithCondition---");
+        queryService.queryByTimeWithCondition(start, end, tags);
     }
 
     @Test