|
@@ -9,23 +9,25 @@ import io.netty.channel.epoll.EpollDatagramChannel;
|
|
|
import io.netty.channel.epoll.EpollEventLoopGroup;
|
|
|
import io.netty.channel.nio.NioEventLoopGroup;
|
|
|
import io.netty.channel.socket.nio.NioDatagramChannel;
|
|
|
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
|
|
+import io.netty.util.concurrent.EventExecutorGroup;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.annotation.DependsOn;
|
|
|
-import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.PreDestroy;
|
|
|
import java.util.concurrent.DelayQueue;
|
|
|
-import java.util.concurrent.PriorityBlockingQueue;
|
|
|
|
|
|
@Component
|
|
|
@DependsOn("springUtils")
|
|
|
@Slf4j
|
|
|
public class NettyServer {
|
|
|
|
|
|
- EventLoopGroup group;
|
|
|
+ EventLoopGroup bossGroup;
|
|
|
+
|
|
|
+ EventExecutorGroup workerGroup;
|
|
|
|
|
|
@Value("${netty.port}")
|
|
|
private Integer port;
|
|
@@ -42,8 +44,10 @@ public class NettyServer {
|
|
|
@Value("${system.sender.retry-times}")
|
|
|
private Integer retryTimes;
|
|
|
|
|
|
+ @Value("${system.udp-server.boss-thread-num}")
|
|
|
+ private Integer bossThreadNum;
|
|
|
@Value("${system.udp-server.worker-thread-num}")
|
|
|
- private Integer workThreadNum;
|
|
|
+ private Integer workerThreadNum;
|
|
|
|
|
|
@Value("${system.udp-server.use-multithread}")
|
|
|
private Boolean useMultiThread;
|
|
@@ -67,7 +71,9 @@ public class NettyServer {
|
|
|
public void start() throws InterruptedException {
|
|
|
useMultiThread = useMultiThread && Epoll.isAvailable(); // 必须系统支持
|
|
|
|
|
|
- group = useMultiThread ? new EpollEventLoopGroup() : new NioEventLoopGroup();
|
|
|
+ bossGroup = useMultiThread ? new EpollEventLoopGroup() : new NioEventLoopGroup();
|
|
|
+
|
|
|
+ workerGroup = new DefaultEventExecutorGroup(workerThreadNum);//业务线程池
|
|
|
|
|
|
log.info("开始启动 Netty Server,host:{} port: {}", host, port);
|
|
|
log.info("***Netty-Server isJustReceivePacket:{}", isJustReceivePacket);
|
|
@@ -81,7 +87,7 @@ public class NettyServer {
|
|
|
decoder.setDecode(this.decodePacketOrNot);
|
|
|
log.info("Netty-Server decode showRecvBytes:{} decodePacketOrNot:{}", this.showRecvBytes, this.decodePacketOrNot);
|
|
|
|
|
|
- bootstrap.group(group)
|
|
|
+ bootstrap.group(bossGroup)
|
|
|
// 指定Channel
|
|
|
.channel(useMultiThread ? EpollDatagramChannel.class : NioDatagramChannel.class)
|
|
|
|
|
@@ -93,7 +99,7 @@ public class NettyServer {
|
|
|
public void initChannel(final Channel ch) throws Exception {
|
|
|
ChannelPipeline p = ch.pipeline();
|
|
|
p.addLast(encoder, decoder);
|
|
|
- p.addLast(nettyServerHandler);
|
|
|
+ p.addLast(workerGroup, nettyServerHandler);
|
|
|
}
|
|
|
});
|
|
|
if (useMultiThread) {
|
|
@@ -102,11 +108,11 @@ public class NettyServer {
|
|
|
|
|
|
if (useMultiThread) {
|
|
|
// linux系统下使用SO_REUSEPORT特性,使得多个线程绑定同一个端口
|
|
|
- if(workThreadNum<=0) {
|
|
|
- workThreadNum = Runtime.getRuntime().availableProcessors();
|
|
|
+ if (bossThreadNum <= 0) {
|
|
|
+ bossThreadNum = Runtime.getRuntime().availableProcessors();
|
|
|
}
|
|
|
- log.info("使用 epoll reuseport,启动线程数:{}", workThreadNum);
|
|
|
- for (int i = 0; i < workThreadNum; i++) {
|
|
|
+ log.info("使用 epoll reuseport,启动线程数:{}", bossThreadNum);
|
|
|
+ for (int i = 0; i < bossThreadNum; i++) {
|
|
|
ChannelFuture future = bootstrap.bind(host, port).await();
|
|
|
if (!future.isSuccess()) {
|
|
|
log.error("[多线程-{}]失败启动 Netty Server,host:{} port: {}", i, host, port);
|
|
@@ -166,7 +172,7 @@ public class NettyServer {
|
|
|
|
|
|
@PreDestroy
|
|
|
public void destroy() {
|
|
|
- group.shutdownGracefully();
|
|
|
+ bossGroup.shutdownGracefully();
|
|
|
log.info("关闭Netty");
|
|
|
}
|
|
|
}
|