SpringCloud中启动Netty服务与WebSocket通信

1. 背景

消息中心的项目中,需要支持长连接推送的需求,因为目前只是内部使用,将长连接服务端和http的springBoot服务放在了一起。需要完成的需求

  • 在Springboot中启动Netty服务端
  • 将Netty的端口注册到Nacos上,方便前端服务通过网关连接到Netty服务端,同时Nacos本身有负载均衡的能力
  • Springboot服务启动的时候,同时启动Netty服务,服务关闭时,同时关闭Netty服务

2. 实现方式

  • 与前端长连接交互使用WebSocket通信

  • 服务端使用Netty作为服务端

  • Springboot启动后,通过实现ApplicationRunner接口来启动服务,ApplicationRunner会在SpringBoot服务启动完成后,执行实现的方法具体实现

    • public class NettyStartServer implements ApplicationRunner
  • 获取Nacos信息,通过手动注册的方式将Netty服务注册到Nacos上

    java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 将netty服务注册到nacos
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
    properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
    properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername());
    properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword());
    NamingService namingService = NamingFactory.createNamingService(properties);
    String hostAddress = NetUtil.getLocalhost().getHostAddress();
    namingService.registerInstance("message-ws", hostAddress, wsPort);
  • Springboot关闭时,停止Netty服务,使用Runtime.getRuntime().addShutdownHook(new Thread())方法,用来监听程序的退出动作,程序退出时执行线程中的方法

    • Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyWsServer.destroy()));

3. 代码实现

3.1 启动类

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.xxx.web.cloud.platform.message;

import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.net.NetUtil;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.xxx.web.cloud.platform.message.application.netty.server.NettyWsServer;
import io.netty.channel.ChannelFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
* Netty启动类
* @author iseven.yang
* @date 2023/2/10 10:10
*/
@Component
@Slf4j
public class NettyStartServer implements ApplicationRunner {

@Autowired
private NettyWsServer nettyWsServer;
@Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties;

@Value("${netty.ws.port}")
private Integer wsPort;

@Override
public void run(ApplicationArguments args) throws Exception {
// 需要新开线程启动netty服务,不然会阻塞springboot主进程
new Thread(this::startNetty).start();
// 关闭监听
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
nettyWsServer.destroy();
}));
}

private void startNetty() {
log.info("The Netty Server for websocket is beginning to start...[{}]", wsPort);
try {
// 启动netty服务
ChannelFuture future = nettyWsServer.start();

// 将netty服务注册到nacos
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword());
NamingService namingService = NamingFactory.createNamingService(properties);
String hostAddress = NetUtil.getLocalhost().getHostAddress();
namingService.registerInstance("message-ws", hostAddress, wsPort);
log.info("websocket服务已注册到nacos...[{}:{}]", hostAddress, wsPort);
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("Netty Server start error: {}", ExceptionUtil.stacktraceToString(e));
}
}
}

3.2 Netty服务端

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.xxx.web.cloud.platform.message.application.netty.server;

import com.xxx.web.cloud.platform.message.application.netty.handler.WebSocketChannelHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* 长连接服务端websocket协议
* @author iseven.yang
* @date 2023/2/8 11:18
*/
@Component
@Slf4j
public class NettyWsServer {

@Autowired
private WebSocketChannelHandler webSocketChannelHandler;

@Value("${netty.ws.port}")
private Integer wsPort;

private static final String WS_PATH = "/ws";

//NIO线程组
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();

public ChannelFuture start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 处理websocket
// 增加对http的支持
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
// 支持WebSocket应答数据压缩传输
pipeline.addLast(new WebSocketServerCompressionHandler());
// 对整个websocket的通信进行了初始化(发现http报文中有升级为websocket的请求),包括握手,以及以后的一些通信控制
pipeline.addLast(new WebSocketServerProtocolHandler(WS_PATH, null, true));
pipeline.addLast(webSocketChannelHandler);
}
});
ChannelFuture future = bootstrap.bind(wsPort).sync();
log.info("Netty Server for websocket start...[{}]", wsPort);
return future;
// future.channel().closeFuture().sync();
}

public void destroy() {
log.info("Netty Server for websocket destroy...");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}