Browse Source

增加多线程netty udp server

sensordb2 1 year ago
parent
commit
bd606453a9

+ 4 - 0
insert-app/pom.xml

@@ -57,6 +57,10 @@
             <version>0.0.1-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+        </dependency>
 
     </dependencies>
 

+ 75 - 7
insert-app/src/main/java/db/netty/NettyServer.java

@@ -2,26 +2,28 @@ package db.netty;
 
 import db.entity.packages.BasePackage;
 import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.*;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.DependsOn;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.concurrent.DelayQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 
 @Component
 @DependsOn("springUtils")
 @Slf4j
 public class NettyServer {
-
-    EventLoopGroup group = new NioEventLoopGroup();
+    private EventLoopGroup bossGroup;
 
     @Value("${netty.port}")
     private Integer port;
@@ -40,6 +42,12 @@ public class NettyServer {
     @Value("${system.sender.retry-times}")
     private Integer retryTimes;
 
+    @Value("${system.udp-server.worker-thread-num}")
+    private Integer workThreadNum;
+
+    @Value("${system.udp-server.use-multithread}")
+    private Boolean useMultiThread;
+
     private DelayQueue<BasePackage> sendingQueue = new DelayQueue<>();
 
     /**
@@ -49,15 +57,25 @@ public class NettyServer {
      */
     @PostConstruct
     public void start() throws InterruptedException {
+        if(useMultiThread) startMT();
+        else startST();
+    }
+
+    /**
+     * single thread
+     */
+    protected void startST() throws InterruptedException {
         log.info("开始启动 Netty Server,host:{} port: {}", host, port);
         Bootstrap bootstrap = new Bootstrap();
+        bossGroup = new NioEventLoopGroup();
+
         NettyServerHandler nettyServerHandler = new NettyServerHandler();
         nettyServerHandler.setUseSeperateThread(useSeperateThread);
 
         Decoder decoder = new Decoder();
         decoder.setShowRecvBytes(this.showRecvBytes);
 
-        bootstrap.group(group)
+        bootstrap.group(bossGroup)
                 // 指定Channel
                 .channel(NioDatagramChannel.class)
 
@@ -72,7 +90,6 @@ public class NettyServer {
                     }
                 });
 
-
         ChannelFuture future = bootstrap.bind(host, port).sync();
         if (future.isSuccess()) {
             log.info("成功启动 Netty Server,host:{} port: {}", host, port);
@@ -82,6 +99,57 @@ public class NettyServer {
         }
     }
 
+    /**
+     * multithread
+     */
+    protected void startMT() throws InterruptedException {
+        log.info("开始启动 Netty Server,host:{} port: {}", host, port);
+        Bootstrap bootstrap = new Bootstrap();
+        NettyServerHandler nettyServerHandler = new NettyServerHandler();
+        nettyServerHandler.setUseSeperateThread(useSeperateThread);
+        Decoder decoder = new Decoder();
+        decoder.setShowRecvBytes(this.showRecvBytes);
+
+        // linux平台下支持SO_REUSEPORT特性以提高性能
+        if (Epoll.isAvailable()) {
+            log.info("***Netty reuse port is activated");
+            bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
+        } else {
+            log.info("***Netty reuse port is not supported");
+        }
+
+        bossGroup = new EpollEventLoopGroup(workThreadNum);
+        bootstrap.group(bossGroup)
+                // 指定Channel
+                .channel(EpollDatagramChannel.class)
+//                .option(ChannelOption.SO_BROADCAST, true)
+                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                .handler(new ChannelInitializer<Channel>() {
+                    @Override
+                    public void initChannel(final Channel ch) throws Exception {
+                        ChannelPipeline p = ch.pipeline();
+                        p.addLast(new Encoder(), decoder);
+                        p.addLast(nettyServerHandler);
+                    }
+                });
+
+        ChannelFuture future;
+        for(int i = 0; i < workThreadNum; ++i) {
+//            future = bootstrap.bind(host, port).await();
+            future = bootstrap.bind(port).await();
+            if(!future.isSuccess()) {
+                log.info("[{}]成功启动 Netty Server,host:{} port: {}", i+1, host, port);
+                channel = future.channel();
+            } else {
+                if (future.cause() != null) {
+                    future.cause().printStackTrace();
+                }
+                log.error(String.format("[%d]失败启动 Netty Server,host:%s port: %d error:%s", i+1, host, port,
+                        future.cause()!=null?future.cause():""));
+            }
+        }
+    }
+
     @PostConstruct
     public void startCheckSendingQueue() {
         int i = 0;
@@ -122,7 +190,7 @@ public class NettyServer {
 
     @PreDestroy
     public void destroy() {
-        group.shutdownGracefully();
+        bossGroup.shutdownGracefully();
         log.info("关闭Netty");
     }
 }

+ 3 - 1
insert-app/src/main/java/db/netty/NettyServerHandler.java

@@ -6,6 +6,7 @@ import db.entity.packages.ListenResponsePackage;
 import db.entity.packages.ScanResponsePackage;
 import db.handler.PackageHandler;
 import db.util.SpringUtils;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import lombok.Data;
@@ -16,6 +17,7 @@ import java.io.IOException;
 
 @Slf4j
 @Data
+@ChannelHandler.Sharable
 public class NettyServerHandler extends SimpleChannelInboundHandler<BasePackage> {
     private boolean useSeperateThread = false;
 
@@ -51,7 +53,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<BasePackage>
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        log.info("NettyServerHandler exceptionCaught:{}", cause);
+        log.error("NettyServerHandler exceptionCaught:{}", cause);
         cause.printStackTrace();
         ctx.fireExceptionCaught(cause);
     }

+ 3 - 0
insert-app/src/main/resources/application.yml

@@ -25,6 +25,9 @@ system:
   sender:
     retry-times: 3 # 超时次数
     retry-stamp: 3000 # 超时时间:毫秒
+  udp-server:
+    worker-thread-num: 10
+    use-multithread: true
 
 reso-db:
   host: 192.168.233.153

+ 1 - 0
package.sh

@@ -16,6 +16,7 @@ cp -rf $ROOT/probe/target/probe-0.0.1-SNAPSHOT-jar-with-dependencies.jar $ROOT/t
 cp -rf $ROOT/init/meta.db $ROOT/tmp/
 cp -rf $ROOT/init $ROOT/tmp/
 cp -rf $ROOT/tools $ROOT/tmp/
+rm -rf $ROOT/tmp/tools/localDebug
 cp -rf $ROOT/conf $ROOT/tmp/
 cp -rf $ROOT/shell $ROOT/tmp/
 chmod -R +x $ROOT/shell/

+ 2 - 0
protocol/src/main/java/db/netty/Decoder.java

@@ -4,6 +4,7 @@ import db.Globals;
 import db.entity.packages.*;
 import db.util.ConvertUtils;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.socket.DatagramPacket;
 import io.netty.handler.codec.MessageToMessageDecoder;
@@ -12,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.List;
 
 @Slf4j
+@ChannelHandler.Sharable
 public class Decoder extends MessageToMessageDecoder<DatagramPacket> {
     private boolean showRecvBytes = true;
 

+ 2 - 0
protocol/src/main/java/db/netty/Encoder.java

@@ -4,6 +4,7 @@ import db.entity.packages.BasePackage;
 import db.util.ConvertUtils;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.socket.DatagramPacket;
 import io.netty.handler.codec.MessageToMessageEncoder;
@@ -15,6 +16,7 @@ import java.util.List;
 
 // 在这里编码包
 @Slf4j
+@ChannelHandler.Sharable
 public class Encoder extends MessageToMessageEncoder<BasePackage> {
 
    @Override

+ 0 - 0
shell/insert/forceStop.sh


+ 0 - 0
shell/query/forceStop.sh