Commit 16e196a6 authored by p x's avatar p x
Browse files

车辆位姿干感知物

parent 1d079da7
...@@ -6,9 +6,14 @@ plugins { ...@@ -6,9 +6,14 @@ plugins {
} }
group = "com.inzy" group = "com.inzy"
version = "0.0.1-SNAPSHOT" version = "1.0"
description = "ws 模拟" description = "ws 模拟"
// 禁用普通 JAR 打包
tasks.jar {
enabled = false
}
java { java {
toolchain { toolchain {
languageVersion = JavaLanguageVersion.of(17) languageVersion = JavaLanguageVersion.of(17)
......
This diff is collapsed.
package com.inzy.wsmock
import com.alibaba.fastjson2.JSONObject
import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_KEY
import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_VALUE_1
import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_VALUE_2
import com.inzy.wsmock.RequestParamHandler.Companion.REQUEST_PARAMS_KEY
import com.inzy.wsmock.RequestParamHandler.Companion.REQUEST_PATH_KEY
import com.inzy.wsmock.utils.FileIoUtil
import io.netty.channel.Channel
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import jakarta.annotation.PostConstruct
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import org.springframework.core.io.ResourceLoader
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.time.LocalDateTime
@Component
class AdasPushTask(
// 注入Spring资源加载器(Spring自动装配)
private val resourceLoader: ResourceLoader
) {
private val logger = LoggerFactory.getLogger(javaClass)
// 注入ChannelManager单例
private val channelManager = ChannelManager.instance
//车辆位姿
private var carDst = mutableListOf<String>()
//感知物
private var preDst = mutableListOf<String>()
@PostConstruct
fun readMockFile() {
var classpathResource = resourceLoader.getResource("classpath:adasm/CarVehicle_fz.txt")
// 读取文件内容(Kotlin简化写法)
CoroutineScope(Dispatchers.IO).launch {
FileIoUtil.getMockToList(classpathResource.inputStream, carDst)
// println("读取到文件内容:${dst.count()}")
classpathResource = resourceLoader.getResource("classpath:adasm/PerTarget.txt")
FileIoUtil.getMockToList(classpathResource.inputStream, preDst)
}
}
/**根据路径推送***/
fun pushMsgFormPath(channel: Channel){
var path: String = ""
if (channel.hasAttr(REQUEST_PATH_KEY)) {
path = channel.attr(REQUEST_PATH_KEY).get()
}
when (path) {
"/ws/adas" -> {
pushMsgFromTypeAdas(channel)
}
}
}
/**根据type推送**/
private fun pushMsgFromTypeAdas(channel: Channel) {
var type: String? = ""
if (channel.hasAttr(REQUEST_PARAMS_KEY)) {
val params = channel.attr(REQUEST_PARAMS_KEY).get()
type = params.get(PARAM_TYPE_KEY)
}
when (type) {
PARAM_TYPE_VALUE_1 -> {//车辆位姿
if (carDst.isNotEmpty()){
CoroutineScope(Dispatchers.Default).launch {
delay(1000)
carDst.forEachIndexed { index, string ->
// println("index = ${index}")
sendMsg(channel,string)
delay(200)
}
}
}
}
PARAM_TYPE_VALUE_2 -> {//感知物
if (preDst.isNotEmpty()){
CoroutineScope(Dispatchers.Default).launch {
delay(2000)
preDst.forEachIndexed { index, string ->
// println("index = ${index}")
sendMsg(channel,string)
delay(500)
}
}
}
}
else -> {
val msg = JSONObject()
msg.put("type", "adas")
msg.put("data", "adas")
sendMsg(channel, msg.toJSONString())
}
}
}
private fun sendMsg(channel: Channel, msg: String) {
if (!channel.isActive()) {
// println("无在线客户端,跳过推送")
channelManager.removeChannel(channel)
return
}
val clientId = channel.id().asShortText()
val frame = TextWebSocketFrame(msg)
channel.writeAndFlush(frame)
.addListener { future ->
if (!future.isSuccess) {
println("推送消息给客户端[$clientId]失败:${future.cause()?.message}")
channelManager.removeChannel(channel) // 推送失败移除失效Channel
}
}
}
}
\ No newline at end of file
...@@ -4,6 +4,7 @@ package com.inzy.wsmock ...@@ -4,6 +4,7 @@ package com.inzy.wsmock
import io.netty.channel.Channel import io.netty.channel.Channel
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.util.AttributeKey import io.netty.util.AttributeKey
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
/** /**
...@@ -29,7 +30,7 @@ class ChannelManager private constructor() { ...@@ -29,7 +30,7 @@ class ChannelManager private constructor() {
fun removeChannel(channel: Channel) { fun removeChannel(channel: Channel) {
val clientId = channel.id().asShortText() val clientId = channel.id().asShortText()
onlineChannels.remove(clientId) onlineChannels.remove(clientId)
println("Channel移除成功:$clientId,当前在线数:${onlineChannels.size}") // println("Channel移除成功:$clientId,当前在线数:${onlineChannels.size}")
} }
/** /**
...@@ -67,14 +68,14 @@ class ChannelManager private constructor() { ...@@ -67,14 +68,14 @@ class ChannelManager private constructor() {
/***发送过滤好的通道**/ /***发送过滤好的通道**/
fun sendMsgFromType(typeChannels: Map<String, Channel>,msg: String) { fun sendMsgFromType(typeChannels: Map<String, Channel>, msg: String) {
if (typeChannels.isEmpty()) { if (typeChannels.isEmpty()) {
// println("无在线客户端,跳过推送") // println("无在线客户端,跳过推送")
return return
} }
val frame = TextWebSocketFrame(msg)
typeChannels.forEach { (clientId, channel) -> typeChannels.forEach { (clientId, channel) ->
if (channel.isActive) { if (channel.isActive) {
val frame = TextWebSocketFrame(msg)
channel.writeAndFlush(frame) channel.writeAndFlush(frame)
.addListener { future -> .addListener { future ->
if (!future.isSuccess) { if (!future.isSuccess) {
...@@ -120,5 +121,6 @@ class ChannelManager private constructor() { ...@@ -120,5 +121,6 @@ class ChannelManager private constructor() {
*/ */
companion object { companion object {
val instance: ChannelManager = ChannelManager() val instance: ChannelManager = ChannelManager()
// val instance: ChannelManager by lazy { ChannelManager() }
} }
} }
\ No newline at end of file
...@@ -28,13 +28,13 @@ import java.net.ServerSocket ...@@ -28,13 +28,13 @@ import java.net.ServerSocket
@Slf4j @Slf4j
@Component @Component
class NettyWebSocketServer( class NettyWebSocketServer(
@Value("\${netty.websocket.port:8089}") private val port: Int @Value("\${netty.websocket.port:8089}") private val port: Int,
// private val webSocketHandler: WebSocketHandler private val webSocketHandler: WebSocketHandler
) { ) {
private val logger = LoggerFactory.getLogger(javaClass) private val logger = LoggerFactory.getLogger(javaClass)
private val websocketPath = "/gs-guide-websocket" private val websocketPath = "/ws"
// Netty主从线程组 // Netty主从线程组
private val bossGroup = NioEventLoopGroup(1) private val bossGroup = NioEventLoopGroup(1)
...@@ -65,7 +65,7 @@ class NettyWebSocketServer( ...@@ -65,7 +65,7 @@ class NettyWebSocketServer(
pipeline.addLast(HttpObjectAggregator(1024 * 1024)) pipeline.addLast(HttpObjectAggregator(1024 * 1024))
pipeline.addLast(RequestParamHandler()) // 添加HTTP请求拦截器 pipeline.addLast(RequestParamHandler()) // 添加HTTP请求拦截器
pipeline.addLast(WebSocketServerProtocolHandler(websocketPath)) pipeline.addLast(WebSocketServerProtocolHandler(websocketPath))
pipeline.addLast(WebSocketHandler()) pipeline.addLast(webSocketHandler)
// 调试:打印处理器链顺序,确认RequestParamHandler在正确位置 // 调试:打印处理器链顺序,确认RequestParamHandler在正确位置
// logger.info("ChannelPipeline顺序:${pipeline.map { it.javaClass.simpleName }}") // logger.info("ChannelPipeline顺序:${pipeline.map { it.javaClass.simpleName }}")
} }
......
...@@ -36,7 +36,7 @@ class RequestParamHandler : SimpleChannelInboundHandler<TextWebSocketFrame?>() { ...@@ -36,7 +36,7 @@ class RequestParamHandler : SimpleChannelInboundHandler<TextWebSocketFrame?>() {
ctx.channel().attr(REQUEST_PARAMS_KEY).set(params) ctx.channel().attr(REQUEST_PARAMS_KEY).set(params)
// 4. 可选:去除查询参数后更新请求URI(不影响后续处理器解析路径) // 4. 可选:去除查询参数后更新请求URI(不影响后续处理器解析路径)
request.uri = path request.uri = path.substring(0..2)
// 去除查询参数后更新URI(可选) // 去除查询参数后更新URI(可选)
/* if (uri.contains("?")) { /* if (uri.contains("?")) {
......
...@@ -5,18 +5,25 @@ import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_KEY ...@@ -5,18 +5,25 @@ import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_KEY
import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_VALUE_1 import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_VALUE_1
import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_VALUE_2 import com.inzy.wsmock.RequestParamHandler.Companion.PARAM_TYPE_VALUE_2
import com.inzy.wsmock.RequestParamHandler.Companion.REQUEST_PARAMS_KEY import com.inzy.wsmock.RequestParamHandler.Companion.REQUEST_PARAMS_KEY
import com.inzy.wsmock.utils.FileIoUtil
import io.netty.channel.Channel import io.netty.channel.Channel
import lombok.extern.slf4j.Slf4j import jakarta.annotation.PostConstruct
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.core.io.ResourceLoader
import org.springframework.scheduling.annotation.Scheduled import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
import java.time.LocalDateTime import java.time.LocalDateTime
@Component @Component
class ScheduledPushTask( class ScheduledAdasPushTask(
// private val webSocketHandler: WebSocketHandler, // private val webSocketHandler: WebSocketHandler,
// private val pushConfig: PushConfig // private val pushConfig: PushConfig,
// 注入Spring资源加载器(Spring自动装配)
private val resourceLoader: ResourceLoader
) { ) {
private val logger = LoggerFactory.getLogger(javaClass) private val logger = LoggerFactory.getLogger(javaClass)
...@@ -24,29 +31,50 @@ class ScheduledPushTask( ...@@ -24,29 +31,50 @@ class ScheduledPushTask(
// 注入ChannelManager单例 // 注入ChannelManager单例
private val channelManager = ChannelManager.instance private val channelManager = ChannelManager.instance
//车辆位姿
private var carDst = mutableListOf<String>()
private var carDstIndex = 0
// @PostConstruct
fun readMockFile() {
val classpathResource = resourceLoader.getResource("classpath:adasm/CarVehicle_fz.txt")
// 读取文件内容(Kotlin简化写法)
CoroutineScope(Dispatchers.Default).launch {
// val content = classpathResource.inputStream.bufferedReader()
FileIoUtil.getMockToList(classpathResource.inputStream, carDst)
// println("读取到文件内容:${dst.count()}")
}
}
/** /**
* 定时推送任务(type=1) * 定时推送任务(type=1) 车辆位姿
*/ */
@Scheduled(fixedDelayString = "#{@pushConfig.pushInterval}") // @Scheduled(fixedDelayString = "#{@pushConfig.pushInterval}")
// @Scheduled(fixedDelayString = "#{@pushConfig.pushInterval}")
fun pushMsgToType1() { fun pushMsgToType1() {
// val onlineChannels = channelManager.getAllChannels() val onlineChannels = channelManager.getAllChannels()
//得到设置了type属性的channel //得到设置了type属性的channel
val typeChannels=filterTypeChannels(PARAM_TYPE_VALUE_1) val typeChannels = filterTypeChannels(PARAM_TYPE_VALUE_1)
// println("onlineChannels.size = ${onlineChannels.size} typeChannels.size = ${typeChannels.size}") // println("onlineChannels.size = ${onlineChannels.size} typeChannels.size = ${typeChannels.size}")
val msgObj = JSONObject() // val msgObj = JSONObject()
msgObj["content"] = "定时推送消息 type=1 ${LocalDateTime.now()}" // msgObj["content"] = "定时推送消息 type=1 ${LocalDateTime.now()}"
channelManager.sendMsgFromType(typeChannels,msgObj.toJSONString()) if (carDst.isNotEmpty()){
println("-------------carDstIndex = ${carDstIndex}")
channelManager.sendMsgFromType(typeChannels, carDst.get(carDstIndex))
carDstIndex = (carDstIndex + 1) % carDst.size // 循环索引
}
} }
@Scheduled(fixedDelayString = "200") // @Scheduled(fixedDelayString = "200")
fun pushMsgToType2() { fun pushMsgToType2() {
//得到设置了type属性的channel //得到设置了type属性的channel
val typeChannels=filterTypeChannels(PARAM_TYPE_VALUE_2) val typeChannels = filterTypeChannels(PARAM_TYPE_VALUE_2)
// println("onlineChannels.size = ${onlineChannels.size} typeChannels.size = ${typeChannels.size}") // println("onlineChannels.size = ${onlineChannels.size} typeChannels.size = ${typeChannels.size}")
val msgObj = JSONObject() val msgObj = JSONObject()
msgObj["content"] = "定时推送消息 type=2 ${LocalDateTime.now()}" msgObj["content"] = "定时推送消息 type=2 ${LocalDateTime.now()}"
channelManager.sendMsgFromType(typeChannels,msgObj.toJSONString()) channelManager.sendMsgFromType(typeChannels, msgObj.toJSONString())
} }
...@@ -68,7 +96,6 @@ class ScheduledPushTask( ...@@ -68,7 +96,6 @@ class ScheduledPushTask(
} }
/** /**
* 定时推送任务(固定延迟,避免任务叠加) * 定时推送任务(固定延迟,避免任务叠加)
*/ */
......
...@@ -7,13 +7,14 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame ...@@ -7,13 +7,14 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketFrame import io.netty.handler.codec.http.websocketx.WebSocketFrame
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
/** /**
* 自定义WebSocket处理器(专注处理消息交互,Channel管理交给ChannelManager) * 自定义WebSocket处理器(专注处理消息交互,Channel管理交给ChannelManager)
*/ */
@Sharable @Sharable
//@Component @Component
class WebSocketHandler : SimpleChannelInboundHandler<WebSocketFrame>() { class WebSocketHandler(private val adasPushTask: AdasPushTask) : SimpleChannelInboundHandler<WebSocketFrame>() {
// private val logger = LoggerFactory.getLogger(javaClass) // private val logger = LoggerFactory.getLogger(javaClass)
...@@ -26,12 +27,14 @@ class WebSocketHandler : SimpleChannelInboundHandler<WebSocketFrame>() { ...@@ -26,12 +27,14 @@ class WebSocketHandler : SimpleChannelInboundHandler<WebSocketFrame>() {
val channel = ctx.channel() val channel = ctx.channel()
// 交给ChannelManager管理 // 交给ChannelManager管理
channelManager.addChannel(channel) channelManager.addChannel(channel)
//根据后缀路径推送
adasPushTask.pushMsgFormPath(channel)
// 握手成功后读取参数(此时参数已存储) // 握手成功后读取参数(此时参数已存储)
// val params = ctx.channel().attr(RequestParamHandler.REQUEST_PARAMS_KEY).get() // val params = ctx.channel().attr(RequestParamHandler.REQUEST_PARAMS_KEY).get()
// val path = ctx.channel().attr(RequestParamHandler.REQUEST_PATH_KEY).get() // val path = ctx.channel().attr(RequestParamHandler.REQUEST_PATH_KEY).get()
// logger.info("WS握手成功,客户端查询参数:$params 请求路径:$path") // logger.info("WS握手成功,客户端查询参数:$params 请求路径:$path")
ctx.writeAndFlush(TextWebSocketFrame("Welcome!")) // ctx.writeAndFlush(TextWebSocketFrame("Welcome!"))
// ctx.writeAndFlush(TextWebSocketFrame("Welcome! Client ID: $clientId, params: $params")) // ctx.writeAndFlush(TextWebSocketFrame("Welcome! Client ID: $clientId, params: $params"))
} }
super.userEventTriggered(ctx, evt) super.userEventTriggered(ctx, evt)
......
package com.inzy.wsmock.utils
import java.io.InputStream
object FileIoUtil {
fun getMockToList(inputStream: InputStream, dst: MutableList<String>) {
// 方式1:手动逐行读取(推荐,Kotlin简洁写法)
inputStream.use { inputStream -> // use 自动关闭流(AutoCloseable)
inputStream.bufferedReader().forEachLine { line ->
// 处理每行内容(示例:添加到列表 + 打印)
dst.add(line)
// println("读取到行 = ${line}")
}
}
}
}
\ No newline at end of file
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment