Browse Source

修改发送报文方式

FutureYu 1 year ago
parent
commit
4e0171c598

+ 16 - 14
insert-app/src/main/java/db/netty/NettyServer.java

@@ -13,6 +13,7 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import java.util.concurrent.DelayQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 
 @Component
@@ -39,8 +40,7 @@ public class NettyServer {
     @Value("${system.sender.retry-times}")
     private Integer retryTimes;
 
-    private PriorityBlockingQueue<BasePackage> sendingQueue = new PriorityBlockingQueue<>(30,
-            (o1, o2) -> (int) (o1.getSendStamp() - o2.getSendStamp()));
+    private DelayQueue<BasePackage> sendingQueue = new DelayQueue<>();
 
     /**
      * 启动Netty Server
@@ -82,15 +82,18 @@ public class NettyServer {
         }
     }
 
-    @Scheduled(fixedDelay = 100)
-    private void checkQueue() throws InterruptedException {
-//        log.debug("开始探测发送队列");
-        while (!sendingQueue.isEmpty()) {
-            BasePackage basePackage = sendingQueue.peek();
-            if (basePackage.getSendStamp() <= System.currentTimeMillis()) {
-                // 需要发送
-                sendingQueue.take();
-                log.info("正式发送{}", basePackage);
+    @PostConstruct
+    public void startCheckSendingQueue() {
+        new Thread(() -> {
+            checkSendingQueue();
+        }, "CheckSendingQueueThread").start();
+    }
+
+    private void checkSendingQueue() {
+        while (true) {
+            try {
+                BasePackage basePackage = sendingQueue.take();
+                log.info("发出{}", basePackage);
                 channel.writeAndFlush(basePackage);
                 if (basePackage.getSendTimes() > 1) {
                     // 再次放入队列
@@ -99,9 +102,8 @@ public class NettyServer {
                     send(basePackage);
                     log.info("超时重传机制,重新添加至发送队列{}", basePackage);
                 }
-            } else {
-                // 暂时无待发送
-                return;
+            } catch (InterruptedException e) {
+                log.error("从发送队列队列中取出报文失败", e);
             }
         }
     }

+ 1 - 1
insert-app/src/main/java/db/service/ProbeService.java

@@ -141,7 +141,7 @@ public class ProbeService {
 
         scanRequestPackage.setToHostPort(Globals.BROADCAST_HOST, Globals.BROADCAST_PORT);
 
-        log.info("发送 scanRequestPackage:{}", scanRequestPackage);
+        log.info("广播发送 scanRequestPackage:{}", scanRequestPackage);
         server.send(scanRequestPackage);
     }
 

+ 19 - 1
protocol/src/main/java/db/entity/packages/BasePackage.java

@@ -8,10 +8,12 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import java.text.ParseException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
 
 @Data
 @NoArgsConstructor
-public abstract class BasePackage {
+public abstract class BasePackage implements Delayed {
 
     private String fromHost;
     private Integer fromPort;
@@ -29,6 +31,22 @@ public abstract class BasePackage {
     private short packageCode;
     private short packageLength;
 
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return sendStamp - System.currentTimeMillis();
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+        BasePackage task = (BasePackage) o;
+        long diff = this.sendStamp - task.sendStamp;
+        if (diff <= 0) {
+            return -1;
+        } else {
+            return 1;
+        }
+    }
+
     public void setFromHostPort(String fromHost, Integer fromPort) {
         this.fromHost = fromHost;
         this.fromPort = fromPort;