后端 Netty SpringCloud中启动Netty服务与WebSocket通信 cmyang 2023-10-30 2024-01-20 1. 背景 消息中心的项目中,需要支持长连接推送的需求,因为目前只是内部使用,将长连接服务端和http的springBoot服务放在了一起。需要完成的需求
在Springboot中启动Netty服务端
将Netty的端口注册到Nacos上,方便前端服务通过网关连接到Netty服务端,同时Nacos本身有负载均衡的能力
Springboot服务启动的时候,同时启动Netty服务,服务关闭时,同时关闭Netty服务
2. 实现方式
与前端长连接交互使用WebSocket通信
服务端使用Netty作为服务端
Springboot启动后,通过实现ApplicationRunner接口来启动服务,ApplicationRunner会在SpringBoot服务启动完成后,执行实现的方法具体实现
public class NettyStartServer implements ApplicationRunner
获取Nacos信息,通过手动注册的方式将Netty服务注册到Nacos上
1 2 3 4 5 6 7 8 9 Properties properties = new Properties ();properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr()); properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace()); properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername()); properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword()); NamingService namingService = NamingFactory.createNamingService(properties);String hostAddress = NetUtil.getLocalhost().getHostAddress();namingService.registerInstance("message-ws" , hostAddress, wsPort);
Springboot关闭时,停止Netty服务,使用Runtime.getRuntime().addShutdownHook(new Thread())
方法,用来监听程序的退出动作,程序退出时执行线程中的方法
Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyWsServer.destroy()));
3. 代码实现 3.1 启动类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package com.xxx.web.cloud.platform.message;import cn.hutool.core.exceptions.ExceptionUtil;import cn.hutool.core.net.NetUtil;import com.alibaba.cloud.nacos.NacosDiscoveryProperties;import com.alibaba.nacos.api.PropertyKeyConst;import com.alibaba.nacos.api.naming.NamingFactory;import com.alibaba.nacos.api.naming.NamingService;import com.xxx.web.cloud.platform.message.application.netty.server.NettyWsServer;import io.netty.channel.ChannelFuture;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;import java.util.Properties;@Component @Slf4j public class NettyStartServer implements ApplicationRunner { @Autowired private NettyWsServer nettyWsServer; @Autowired private NacosDiscoveryProperties nacosDiscoveryProperties; @Value("${netty.ws.port}") private Integer wsPort; @Override public void run (ApplicationArguments args) throws Exception { new Thread (this ::startNetty).start(); Runtime.getRuntime().addShutdownHook(new Thread (() -> { nettyWsServer.destroy(); })); } private void startNetty () { log.info("The Netty Server for websocket is beginning to start...[{}]" , wsPort); try { ChannelFuture future = nettyWsServer.start(); Properties properties = new Properties (); properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr()); properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace()); properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername()); properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword()); NamingService namingService = NamingFactory.createNamingService(properties); String hostAddress = NetUtil.getLocalhost().getHostAddress(); namingService.registerInstance("message-ws" , hostAddress, wsPort); log.info("websocket服务已注册到nacos...[{}:{}]" , hostAddress, wsPort); future.channel().closeFuture().sync(); } catch (Exception e) { log.error("Netty Server start error: {}" , ExceptionUtil.stacktraceToString(e)); } } }
3.2 Netty服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.xxx.web.cloud.platform.message.application.netty.server;import com.xxx.web.cloud.platform.message.application.netty.handler.WebSocketChannelHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component @Slf4j public class NettyWsServer { @Autowired private WebSocketChannelHandler webSocketChannelHandler; @Value("${netty.ws.port}") private Integer wsPort; private static final String WS_PATH = "/ws" ; private final EventLoopGroup bossGroup = new NioEventLoopGroup (); private final EventLoopGroup workerGroup = new NioEventLoopGroup (); public ChannelFuture start () throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new HttpObjectAggregator (65536 )); pipeline.addLast(new WebSocketServerCompressionHandler ()); pipeline.addLast(new WebSocketServerProtocolHandler (WS_PATH, null , true )); pipeline.addLast(webSocketChannelHandler); } }); ChannelFuture future = bootstrap.bind(wsPort).sync(); log.info("Netty Server for websocket start...[{}]" , wsPort); return future; } public void destroy () { log.info("Netty Server for websocket destroy..." ); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }