Commit e40793c6 authored by p x's avatar p x
Browse files

netty 1

parent 8516d494
Pipeline #3224 failed with stages
in 0 seconds
# wsmock
WebSockset 服务用来推送模拟数据
# main 分支
stomp协议是先链接后订阅是2步骤,前端要新增js依赖,修改代码
# ws_netty 分支
netty包实现的 websockset
......@@ -35,6 +35,11 @@ dependencies {
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
// https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2
implementation("com.alibaba.fastjson2:fastjson2:2.0.60")
// https://mvnrepository.com/artifact/io.netty/netty-all
implementation("io.netty:netty-all:4.2.9.Final")
}
kotlin {
......
package com.inzy.wsmock
class Greeting(var content: String) {
}
\ No newline at end of file
package com.inzy.wsmock
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.handler.annotation.SendTo
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.util.HtmlUtils
@RestController
class GreetingController{
@MessageMapping("/hello")
@SendTo("/topic/greetings")
@Throws(Exception::class)
fun greeting(message: HelloMessage): Greeting {
Thread.sleep(1000) // simulated delay
return Greeting("Hello, " + HtmlUtils.htmlEscape(message.name) + "!")
}
// 新增开关变量
var pushEnabled = false
@GetMapping("/togglePush")
fun togglePush(@RequestParam enable: Boolean): String {
pushEnabled = enable
return "服务器定时推送已${if (enable) "开启" else "关闭"}"
}
}
\ No newline at end of file
package com.inzy.wsmock
class HelloMessage {
val name: String=""
}
\ No newline at end of file
package com.inzy.wsmock
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import io.netty.handler.stream.ChunkedWriteHandler
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
@Component
class NettyWebSocketServer(
@Value("\${netty.websocket.port:8089}") private val port: Int,
private val webSocketHandler: WebSocketHandler
) {
// Netty主从线程组
private val bossGroup = NioEventLoopGroup(1)
private val workerGroup = NioEventLoopGroup()
/**
* 启动Netty WebSocket服务
*/
@PostConstruct
fun start() {
try {
val bootstrap = ServerBootstrap()
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
// HTTP编解码器
pipeline.addLast(HttpServerCodec())
// 分块写处理器
pipeline.addLast(ChunkedWriteHandler())
// HTTP消息聚合器(最大1024*1024字节)
pipeline.addLast(HttpObjectAggregator(1024 * 1024))
// WebSocket协议处理器(指定WebSocket路径)
pipeline.addLast(WebSocketServerProtocolHandler("/gs-guide-websocket"))
// 自定义WebSocket消息处理器
pipeline.addLast(webSocketHandler)
}
})
// 绑定端口并启动
val channelFuture = bootstrap.bind(port).sync()
println("Netty WebSocket服务启动成功,端口:$port")
// 阻塞直到服务关闭
channelFuture.channel().closeFuture().sync()
} catch (e: Exception) {
e.printStackTrace()
} finally {
// 优雅关闭线程组
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
}
}
/**
* 销毁时关闭Netty服务
*/
@PreDestroy
fun stop() {
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
println("Netty WebSocket服务已关闭")
}
}
\ No newline at end of file
package com.inzy.wsmock
import org.springframework.stereotype.Component
import java.util.concurrent.atomic.AtomicBoolean
@Component
class PushConfig {
// 定时推送开关(默认关闭)
val pushEnabled = AtomicBoolean(false)
// 推送间隔(毫秒,默认5秒)
var pushInterval: Long = 5000
}
\ No newline at end of file
package com.inzy.wsmock
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.util.*
/**
* 定时推送消息到 /topic/greetings 主题
*/
@Component
class ScheduledMessageSender(
private val messagingTemplate: SimpMessagingTemplate, // 注入STOMP消息模板
private val messageController: GreetingController // 注入控制层
) {
// 定义定时规则:每5秒推送一次(可根据需求调整,比如 1000=1秒,60000=1分钟)
// cron表达式参考:0/5 * * * * ? (每5秒执行)
@Scheduled(fixedRate = 5000)
fun sendPeriodicMessage() {
if (messageController.pushEnabled) {
// 1. 构造推送内容(可替换为业务数据)
val time = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Date())
val greetingContent = "定时推送消息 - $time"
// 2. 发送到 /topic/greetings 主题(前端已订阅该主题)
messagingTemplate.convertAndSend("/topic/greetings", Greeting(greetingContent))
println("已推送定时消息:$greetingContent")
}
}
}
\ No newline at end of file
package com.inzy.wsmock
import com.alibaba.fastjson2.JSONObject
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.time.LocalDateTime
@Component
class ScheduledPushTask(
private val webSocketHandler: WebSocketHandler,
private val pushConfig: PushConfig
) {
/**
* 定时推送任务(固定延迟,避免任务叠加)
*/
// @Scheduled(fixedDelayString = "#{@pushConfig.pushInterval}")
@Scheduled(fixedRate = 1000)
fun pushMsgToClients() {
// 开关关闭时跳过
// if (!pushConfig.pushEnabled.get()) {
// return
// }
// 构造推送消息(适配前端格式)
val msgObj = JSONObject()
msgObj["content"] = "定时推送消息 - ${LocalDateTime.now()}"
println("--------msgObj = ${msgObj}")
// 广播给所有客户端
webSocketHandler.broadcastMsg(msgObj.toJSONString())
}
}
\ No newline at end of file
package com.inzy.wsmock
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
import org.springframework.messaging.converter.MappingJackson2MessageConverter
import org.springframework.messaging.simp.config.ChannelRegistration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
/**
* Kotlin 版 STOMP WebSocket 核心配置
* @EnableWebSocketMessageBroker:启用 STOMP 协议 + 消息代理
*/
@Configuration
@EnableWebSocketMessageBroker // 核心注解:启用 STOMP + 消息代理
class StompWebSocketConfig : WebSocketMessageBrokerConfigurer {
/**
* 配置消息代理(广播/订阅的前缀)
*/
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 1. 启用内存消息代理(生产环境可替换为 RabbitMQ/ActiveMQ)
// 2. 配置代理前缀:客户端订阅以 /topic/ 开头的主题(广播)、/queue/ 开头的队列(点对点)
config.enableSimpleBroker("/topic", "/queue")
// 3. 配置应用前缀:客户端发送消息的前缀(对应 @MessageMapping)
// 客户端发送消息需用 /app/xxx,服务端 @MessageMapping("xxx") 接收
config.setApplicationDestinationPrefixes("/app")
// 4. 配置用户点对点前缀(可选):客户端订阅 /user/queue/xxx 接收单人消息
config.setUserDestinationPrefix("/user")
}
/**
* 注册 STOMP 端点(客户端连接入口)
*/
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
// 1. 注册 STOMP 端点:客户端通过 ws://localhost:8088/kotlin-stomp/ws/stomp 连接
registry.addEndpoint("/gs-guide-websocket")
.setAllowedOrigins("*") // 允许跨域(生产环境指定具体域名)
// 2. 可选:支持 SockJS(兼容低版本浏览器,如不支持 WebSocket 的场景)
registry.addEndpoint("/ws/stomp")
.setAllowedOrigins("*")
.withSockJS()
}
/**
* 可选:配置消息通道拦截器(如权限验证、日志)
*/
override fun configureClientInboundChannel(registration: ChannelRegistration) {
// 可添加拦截器,比如验证客户端发送的消息是否有权限
// registration.interceptors(StompAuthInterceptor())
}
}
package com.inzy.wsmock
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketFrame
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
/**
* 自定义WebSocket处理器(支持多客户端)
*/
@Sharable
@Component
class WebSocketHandler : SimpleChannelInboundHandler<WebSocketFrame>() {
// 存储在线客户端Channel(线程安全)
private val onlineChannels = ConcurrentHashMap<String, Channel>()
/**
* 客户端连接成功
*/
override fun channelActive(ctx: ChannelHandlerContext) {
val channel = ctx.channel()
val clientId = channel.id().asShortText()
onlineChannels[clientId] = channel
println("客户端连接成功:$clientId,当前在线数:${onlineChannels.size}")
// 欢迎消息
channel.writeAndFlush(TextWebSocketFrame("Welcome! Client ID: $clientId"))
}
/**
* 客户端断开连接
*/
override fun channelInactive(ctx: ChannelHandlerContext) {
val channel = ctx.channel()
val clientId = channel.id().asShortText()
onlineChannels.remove(clientId)
println("客户端断开连接:$clientId,当前在线数:${onlineChannels.size}")
}
/**
* 处理客户端消息
*/
override fun channelRead0(ctx: ChannelHandlerContext, frame: WebSocketFrame) {
if (frame is TextWebSocketFrame) {
val msg = frame.text()
val clientId = ctx.channel().id().asShortText()
println("收到客户端[$clientId]消息:$msg")
// 回复客户端
ctx.writeAndFlush(TextWebSocketFrame("Server received: $msg"))
}
}
/**
* 异常处理
*/
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val clientId = ctx.channel().id().asShortText()
println("客户端[$clientId]发生异常:${cause.message}")
ctx.close()
onlineChannels.remove(clientId)
}
/**
* 广播消息给所有在线客户端
*/
fun broadcastMsg(msg: String) {
if (onlineChannels.isEmpty()) {
println("无在线客户端,跳过推送")
return
}
val frame = TextWebSocketFrame(msg)
onlineChannels.forEach { (clientId, channel) ->
if (channel.isActive) {
channel.writeAndFlush(frame)
.addListener { future ->
if (!future.isSuccess) {
println("推送消息给客户端[$clientId]失败:${future.cause()?.message}")
}
}
} else {
onlineChannels.remove(clientId)
}
}
println("已向${onlineChannels.size}个客户端推送消息:$msg")
}
}
\ No newline at end of file
......@@ -12,3 +12,9 @@ server:
enabled: true
force: true
# Netty WebSocket配置
netty:
websocket:
port: 8089 # Netty WebSocket端口
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment