Commit 3e80defb authored by p x's avatar p x
Browse files

调通netty websocket

parent e40793c6
......@@ -27,7 +27,7 @@ repositories {
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-websocket")
// implementation("org.springframework.boot:spring-boot-starter-websocket")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
compileOnly("org.projectlombok:lombok")
......@@ -40,6 +40,7 @@ dependencies {
implementation("com.alibaba.fastjson2:fastjson2:2.0.60")
// https://mvnrepository.com/artifact/io.netty/netty-all
implementation("io.netty:netty-all:4.2.9.Final")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
}
kotlin {
......
This diff is collapsed.
......@@ -13,6 +13,10 @@ import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import io.netty.handler.stream.ChunkedWriteHandler
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
......@@ -21,15 +25,22 @@ class NettyWebSocketServer(
@Value("\${netty.websocket.port:8089}") private val port: Int,
private val webSocketHandler: WebSocketHandler
) {
private val logger = LoggerFactory.getLogger(javaClass)
// Netty主从线程组
private val bossGroup = NioEventLoopGroup(1)
private val workerGroup = NioEventLoopGroup()
// 保存Netty服务的ChannelFuture,用于优雅关闭
private var serverChannelFuture: io.netty.channel.ChannelFuture? = null
/**
* 启动Netty WebSocket服务
* 启动Netty WebSocket服务(异步执行,避免阻塞Spring主线程)
*/
@PostConstruct
fun start() {
// 🔥 关键修改:使用新线程启动Netty,不阻塞Spring初始化
CoroutineScope(Dispatchers.IO).launch {
try {
val bootstrap = ServerBootstrap()
bootstrap.group(bossGroup, workerGroup)
......@@ -39,29 +50,27 @@ class NettyWebSocketServer(
.childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
// HTTP编解码器
pipeline.addLast(HttpServerCodec())
// 分块写处理器
pipeline.addLast(ChunkedWriteHandler())
// HTTP消息聚合器(最大1024*1024字节)
pipeline.addLast(HttpObjectAggregator(1024 * 1024))
// WebSocket协议处理器(指定WebSocket路径)
pipeline.addLast(WebSocketServerProtocolHandler("/gs-guide-websocket"))
// 自定义WebSocket消息处理器
pipeline.addLast(webSocketHandler)
}
})
// 绑定端口并启动
val channelFuture = bootstrap.bind(port).sync()
println("Netty WebSocket服务启动成功,端口:$port")
// 阻塞直到服务关闭
channelFuture.channel().closeFuture().sync()
// 绑定端口并启动(非阻塞?不,这里sync()只阻塞当前新线程,不阻塞Spring主线程)
serverChannelFuture = bootstrap.bind(port).sync()
logger.info("Netty WebSocket服务启动成功,端口:$port")
// 阻塞当前新线程(而非Spring主线程)直到服务关闭
serverChannelFuture?.channel()?.closeFuture()?.sync()
} catch (e: Exception) {
e.printStackTrace()
logger.error("Netty服务启动失败", e)
} finally {
// 优雅关闭线程组
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
logger.info("Netty线程组已关闭")
}
}
}
......@@ -70,8 +79,10 @@ class NettyWebSocketServer(
*/
@PreDestroy
fun stop() {
// 主动关闭Netty服务,解除closeFuture的阻塞
serverChannelFuture?.channel()?.close()?.sync()
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
println("Netty WebSocket服务已关闭")
logger.info("Netty WebSocket服务已关闭")
}
}
\ No newline at end of file
......@@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicBoolean
@Component
class PushConfig {
// 定时推送开关(默认关闭)
val pushEnabled = AtomicBoolean(false)
val pushEnabled = AtomicBoolean(true)
// 推送间隔(毫秒,默认5秒)
var pushInterval: Long = 5000
}
\ No newline at end of file
......@@ -2,21 +2,28 @@ package com.inzy.wsmock
import com.alibaba.fastjson2.JSONObject
import lombok.extern.slf4j.Slf4j
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.time.LocalDateTime
@Slf4j
@Component
class ScheduledPushTask(
private val webSocketHandler: WebSocketHandler,
private val pushConfig: PushConfig
) {
private val logger = LoggerFactory.getLogger(javaClass)
/**
* 定时推送任务(固定延迟,避免任务叠加)
*/
// @Scheduled(fixedDelayString = "#{@pushConfig.pushInterval}")
@Scheduled(fixedRate = 1000)
@Scheduled(fixedDelayString = "#{@pushConfig.pushInterval}")
fun pushMsgToClients() {
// 增加日志,确认函数是否执行
// logger.info("定时推送任务开始执行 - ${LocalDateTime.now()}")
// 开关关闭时跳过
// if (!pushConfig.pushEnabled.get()) {
// return
......@@ -24,7 +31,7 @@ class ScheduledPushTask(
// 构造推送消息(适配前端格式)
val msgObj = JSONObject()
msgObj["content"] = "定时推送消息 - ${LocalDateTime.now()}"
println("--------msgObj = ${msgObj}")
logger.debug("msgObj = ${msgObj}")
// 广播给所有客户端
webSocketHandler.broadcastMsg(msgObj.toJSONString())
}
......
......@@ -2,6 +2,7 @@ package com.inzy.wsmock
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.ComponentScan
import org.springframework.scheduling.annotation.EnableScheduling
@SpringBootApplication
......
......@@ -13,6 +13,34 @@ server:
force: true
logging:
# 全局日志级别
level:
root: INFO
io.netty: WARN
org.springframework: WARN
# 包级别日志
# com.inzy.wsmock: DEBUG
# org.springframework: WARN
# org.hibernate: ERROR
# 类级别日志
# com.example.service.UserService: TRACE
# 日志文件配置
file:
name: logs/wsmock.log
# 日志格式
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36}.%method - %msg%n"
logback:
rollingpolicy:
max-file-size: 10MB
max-history: 30
total-size-cap: 1GB
file-name-pattern: logs/wsmock-.%d{yyyy-MM-dd}.%i.log
# Netty WebSocket配置
netty:
websocket:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment