Commit f2f64488 authored by p x's avatar p x
Browse files

完善http sse

parent 7498ae9a
package com.sd.cavphmi.net
import com.sd.cavphmi.bean.WarningBean
import com.sd.cavphmi.moudule.NetworkModule.getSSlSocketFactory
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import okhttp3.Call
import okhttp3.Callback
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.Response
import okhttp3.ResponseBody
import okhttp3.internal.closeQuietly
import java.io.IOException
import java.util.concurrent.TimeUnit
class SimpleSSEClient {
companion object {
val instance: SimpleSSEClient by lazy { SimpleSSEClient() }
}
// private var logging = HttpLoggingInterceptor()
// private val client = OkHttpClient()
private var client: OkHttpClient
private var responseBodys = mutableListOf<ResponseBody>()
private var calls = mutableListOf<Call>()
private constructor() {
var sslData = getSSlSocketFactory()
client = OkHttpClient.Builder()
// .addInterceptor(logging)
.readTimeout(0, TimeUnit.SECONDS)
.sslSocketFactory(sslData.socketFactory, sslData.trustAllCert)
.hostnameVerifier { hostname, session -> true }
.build()
}
fun startSSE(url: String, body: RequestBody?, listener: SSESimpleListener?) {
val build = Request.Builder()
.url(url)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
if (body != null) {
build.post(body)
}
val request = build.build()
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
listener?.onError(e)
}
override fun onResponse(call: Call, response: Response) {
if (!response.isSuccessful) {
listener?.onError(IOException("HTTP ${response.code} ${response.message}"))
return
}
calls.add(call)
try {
response.body?.let { body ->
// println("----------- let SimpleSSEClient.onResponse")
responseBodys.add(body)
body.source().use { source ->
if (source == null)
return
listener?.onOpen()
while (true) {
val line = source.readUtf8Line() ?: break
when {
line.startsWith("data:") -> {
val data = line.substring(5).trim()
listener?.onEvent(data)
}
line.isEmpty() -> {
// 事件分隔符
}
}
}
listener?.onClosed()
}
}
} catch (e: IOException) {
e.printStackTrace()
} finally {
// response.closeQuietly()
}
}
})
}
interface SSESimpleListener {
fun onOpen()
fun onEvent(data: String)
fun onError(throwable: Throwable)
fun onClosed()
}
/**释放连接***/
fun cancelContect() {
CoroutineScope(Dispatchers.Default).launch {
for (call in calls) {
call.cancel()
}
calls.clear()
for (body in responseBodys) {
body.closeQuietly()
}
responseBodys.clear()
}
}
}
\ No newline at end of file
...@@ -48,16 +48,16 @@ class SseManager { ...@@ -48,16 +48,16 @@ class SseManager {
private var eventSource: EventSource? = null private var eventSource: EventSource? = null
// private var lastEventId: String? = null // 记录最后一个事件 ID // private var lastEventId: String? = null // 记录最后一个事件 ID
private val listeners = mutableListOf<SseCallback?>() // 客户端数据监听器 private val listeners = mutableListOf<SseCallback2?>() // 客户端数据监听器
// 重连延迟(指数退避:1s → 2s → 4s → ... → 30s 上限) // 重连延迟(指数退避:1s → 2s → 4s → ... → 30s 上限)
private var retryDelayMillis = 1000L private var retryDelayMillis = 1000L
private val maxRetryDelay = 30 * 1000L // 最大重连延迟 private val maxRetryDelay = 30 * 1000L // 最大重连延迟
// 连接状态回调(给外部使用) // 连接状态回调(给外部使用)
private var callback: SseCallback? = null private var callback: SseCallback2? = null
fun addListener(callback: SseCallback?) { fun addListener(callback: SseCallback2) {
listeners.add(callback) listeners.add(callback)
} }
...@@ -190,7 +190,7 @@ class SseManager { ...@@ -190,7 +190,7 @@ class SseManager {
} }
// SSE 状态回调接口(外部实现) // SSE 状态回调接口(外部实现)
interface SseCallback { interface SseCallback2 {
fun onConnected() // 连接成功 fun onConnected() // 连接成功
fun onDisconnected() // 连接断开 fun onDisconnected() // 连接断开
fun onMessageReceived(data: String, eventType: String) // 接收消息 fun onMessageReceived(data: String, eventType: String) // 接收消息
......
package com.sd.cavphmi.net
import android.os.Handler
import android.os.Looper
import com.sd.cavphmi.moudule.NetworkModule
import okhttp3.ConnectionSpec
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* SSE 连接实体类:封装单个连接的核心信息
* @param connId 连接唯一标识(如 "chat", "notice", "status")
* @param sseUrl 连接的服务器地址
* @param eventSource SSE 连接实例(OkHttp 提供)
* @param callback 连接的回调接口(接收消息、状态)
*/
data class SseConnection(
val connId: String,
val sseUrl: String,
val eventSource: EventSource,
val callback: SseCallback
)
/**
* SSE 回调接口:每个连接的独立回调
*/
interface SseCallback {
// 连接成功
fun onConnected(connId: String)
// 接收自定义事件(服务器指定 event 字段)
fun onEventReceived(connId: String, eventType: String?, data: String)
// 连接失败
fun onFailed(connId: String, errorMsg: String)
// 连接关闭
fun onClosed(connId: String)
fun onReconnecting(delayMillis: Long) // 重连中(可选)
}
class SseMultiConnectionManager {
companion object {
// 是否主动断开(控制重连逻辑)
private var isManualDisconnect = false
val instance: SseMultiConnectionManager by lazy { SseMultiConnectionManager() }
}
var sslData = NetworkModule.getSSlSocketFactory()
// OkHttp 客户端(全局复用,避免重复创建;配置长连接适配 SSE)
private val okHttpClient = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(0, TimeUnit.SECONDS) // 长连接延长读取超时
.writeTimeout(30, TimeUnit.SECONDS)
// .connectionPool(ConnectionPool(5, 30, TimeUnit.SECONDS)) // 连接池优化(可选)
// .pingInterval(20, TimeUnit.SECONDS) // TCP心跳间隔
.retryOnConnectionFailure(false) // 多连接场景下禁用自动重试(避免冲突)
.connectionSpecs(
listOf(
ConnectionSpec.CLEARTEXT,
ConnectionSpec.MODERN_TLS
)
) // 支持 HTTP/HTTPS
.sslSocketFactory(sslData.socketFactory, sslData.trustAllCert)
.hostnameVerifier { hostname, session -> true }
.build()
// 存储所有活跃连接:key = connId(唯一标识),value = SseConnection
// 使用 ConcurrentHashMap 保证线程安全(支持并发读写)
private val activeConnections = ConcurrentHashMap<String, SseConnection>()
// 重入锁:确保启动/关闭连接时的原子操作(避免并发问题)
private val connectionLock = ReentrantLock()
// 重连延迟(指数退避:1s → 2s → 4s → ... → 30s 上限)
private var retryDelayMillis = 1000L
private val maxRetryDelay = 30 * 1000L // 最大重连延迟
/**
* 启动一个新的 SSE 连接
* @param connId 连接唯一标识(如 "chat_123", "system_notice")
* @param sseUrl 连接的服务器地址
* @param callback 该连接的独立回调(处理消息和状态)
*/
fun startConnection(
connId: String,
sseUrl: String,
body: RequestBody,
callback: SseCallback
) {
connectionLock.withLock { // 加锁确保原子操作
// 1. 先关闭同名连接(避免重复创建)
if (activeConnections.containsKey(connId)) {
stopConnection(connId)
callback.onClosed(connId)
}
// 2. 构建当前连接的 SSE 请求(必须是 GET + text/event-stream 头)
val request = Request.Builder()
.url(sseUrl)
.post(body)
.header("Accept", "text/event-stream") // 核心:SSE 格式标识
.header("Cache-Control", "no-cache") // 禁用缓存
.header("Connection", "keep-alive") // 保持长连接
// 可选:添加当前连接的独立头(如不同 Token、用户ID)
// .header("Authorization", "Bearer ${getTokenForConn(connId)}")
.build()
// 3. 创建当前连接的 EventSourceListener(绑定回调)
val listener = object : EventSourceListener() {
override fun onOpen(eventSource: EventSource, response: okhttp3.Response) {
super.onOpen(eventSource, response)
// println("-------hashCode = ${eventSource.hashCode()} ${eventSource.request().url}")
// 回调到当前连接的 callback(主线程)
callback.onConnected(connId)
}
override fun onEvent(
eventSource: EventSource,
id: String?,
type: String?,
data: String
) {
super.onEvent(eventSource, id, type, data)
// 自定义事件回调(服务器指定 event 字段)
callback.onEventReceived(connId, type, data)
}
override fun onFailure(
eventSource: EventSource,
t: Throwable?,
response: okhttp3.Response?
) {
super.onFailure(eventSource, t, response)
// 失败后移除连接(避免内存泄漏)
activeConnections.remove(connId)
// 失败回调(错误信息拼接)
val errorMsg = t?.message ?: "Unknown error"
callback.onFailed(connId, errorMsg)
// 主动断开时不自动重连(通过 flag 控制)
if (!isManualDisconnect) {
scheduleReconnect(connId, sseUrl, body, callback) // 被动断开则重连
}
}
override fun onClosed(eventSource: EventSource) {
super.onClosed(eventSource)
callback.onClosed(connId)
// 关闭后移除连接
activeConnections.remove(connId)
}
}
// 4. 创建 EventSource 实例并存储到集合
val eventSource = EventSources.createFactory(okHttpClient)
.newEventSource(request, listener)
// 5. 存储连接信息到 ConcurrentHashMap
activeConnections[connId] = SseConnection(
connId = connId,
sseUrl = sseUrl,
eventSource = eventSource,
callback = callback
)
}
}
// 调度重连(指数退避策略)
private fun scheduleReconnect(
connId: String,
sseUrl: String,
body: RequestBody,
sseCallback:SseCallback
) {
sseCallback.onReconnecting(retryDelayMillis)
// 使用 Handler 延迟重连(避免主线程阻塞)
Handler(Looper.getMainLooper()).postDelayed({
if (!isManualDisconnect) {
startConnection(connId,sseUrl, body,sseCallback) // 重连
// 指数退避:延迟翻倍,不超过最大值
retryDelayMillis = minOf(retryDelayMillis * 2, maxRetryDelay)
}
}, retryDelayMillis)
}
/**
* 关闭指定 ID 的 SSE 连接
* @param connId 连接唯一标识
*/
fun stopConnection(connId: String) {
isManualDisconnect = true
connectionLock.withLock {
val connection = activeConnections.remove(connId)
connection?.eventSource?.cancel() // 关闭连接
}
}
/**
* 关闭所有 SSE 连接(如 App 退出时)
*/
fun stopAllConnections() {
isManualDisconnect = true
connectionLock.withLock {
activeConnections.values.forEach { it.eventSource.cancel() }
activeConnections.clear()
}
}
/**
* 检查连接是否活跃
* @param connId 连接唯一标识
* @return true = 活跃,false = 已关闭/未创建
*/
fun isConnectionActive(connId: String): Boolean {
return activeConnections.containsKey(connId)
}
/**
* 获取所有活跃连接的 ID
*/
fun getActiveConnIds(): List<String> {
return activeConnections.keys.toList()
}
}
...@@ -6,10 +6,10 @@ import com.sd.cavphmi.bean.VehDetailBean ...@@ -6,10 +6,10 @@ import com.sd.cavphmi.bean.VehDetailBean
import com.sd.cavphmi.bean.req.SpaceInfo import com.sd.cavphmi.bean.req.SpaceInfo
import com.sd.cavphmi.net.MyResult import com.sd.cavphmi.net.MyResult
import com.sd.cavphmi.net.RequestBodyUtil import com.sd.cavphmi.net.RequestBodyUtil
import com.sd.cavphmi.net.SimpleSSEClient import com.sd.cavphmi.net.SseCallback
import com.sd.cavphmi.net.SimpleSSEClient.SSESimpleListener
import com.sd.cavphmi.net.SseManager import com.sd.cavphmi.net.SseManager
import com.sd.cavphmi.net.SseManager.SseCallback import com.sd.cavphmi.net.SseManager.SseCallback2
import com.sd.cavphmi.net.SseMultiConnectionManager
import com.sd.cavphmi.net.httpmothod.ClientRetrofitMethod import com.sd.cavphmi.net.httpmothod.ClientRetrofitMethod
import com.sd.cavphmi.utils.MyContants import com.sd.cavphmi.utils.MyContants
import okhttp3.RequestBody import okhttp3.RequestBody
...@@ -21,8 +21,14 @@ class AvpDataRepo @Inject constructor( ...@@ -21,8 +21,14 @@ class AvpDataRepo @Inject constructor(
private var retrofitMethod: ClientRetrofitMethod private var retrofitMethod: ClientRetrofitMethod
) { ) {
private var simpleSSEClient = SimpleSSEClient.instance // private var simpleSSEClient = SimpleSSEClient.instance
private var sseManager = SseManager.instance private var sseManager2 = SseManager.instance
// 初始化多连接管理器(全局单例更佳,可通过依赖注入)
private val sseManager = SseMultiConnectionManager.instance
// 连接 ID 定义(唯一标识每个连接)
private val CONN_ID_CAR = "CAR" // 聊天连接
private val CONN_ID_AVPSTATU = "AVP_STATU" // 系统通知连接
/**获取车辆详情 /**获取车辆详情
* @param id 正常应该是传场地ID,但是亦庄这个和太和桥车是一样的 * @param id 正常应该是传场地ID,但是亦庄这个和太和桥车是一样的
...@@ -102,24 +108,34 @@ class AvpDataRepo @Inject constructor( ...@@ -102,24 +108,34 @@ class AvpDataRepo @Inject constructor(
* 我们车辆位姿数据用的是 车俩基础信息的id 可绑车辆接口 返回的是 avp车俩id * 我们车辆位姿数据用的是 车俩基础信息的id 可绑车辆接口 返回的是 avp车俩id
* 所以这里的id 是绑定车辆列表返回的id * 所以这里的id 是绑定车辆列表返回的id
*/ */
fun getAvpStatus(url: String, body: RequestBody, listener: SseCallback?) { fun getAvpStatus(url: String, body: RequestBody, sseCallback: SseCallback) {
// fun getAvpStatus(url: String, listener: SseCallback?) { sseManager.startConnection(CONN_ID_AVPSTATU, url, body, sseCallback)
sseManager.addListener(listener)
sseManager.connect(url,body) /* sseManager2.addListener(object : SseCallback2 {
// simpleSSEClient.startSSE(url, body, listener) override fun onConnected() {
println("------------getAvpStatus onConnected")
}
override fun onDisconnected() {
}
override fun onMessageReceived(data: String, eventType: String) {
println("----------getAvpStatus = ${data}")
}
override fun onError(errorMsg: String, throwable: Throwable?) {
println("------------getAvpStatus errorMsg")
}
override fun onReconnecting(delayMillis: Long) {
}
})
sseManager2.connect(url, body)*/
} }
/**获取车辆位姿****/ /**获取车辆位姿****/
// fun getCarPose(url: String, listener: SseCallback?) { fun getCarPose(url: String, body: RequestBody, sseCallback: SseCallback) {
fun getCarPose(url: String,body: RequestBody, listener: SseCallback?) { sseManager.startConnection(CONN_ID_CAR, url, body, sseCallback)
sseManager.addListener(listener)
sseManager.connect(url,body)
//// try {
// simpleSSEClient.startSSE(url, body, listener)
//// } catch (e: HttpException) {
////// println("e.message = ${e.message}")
//// } catch (e: Exception) {
//// }
} }
......
...@@ -167,7 +167,7 @@ class MainActivity : BaseActivity<ActivityMainBinding, MyBaseViewModel>() { ...@@ -167,7 +167,7 @@ class MainActivity : BaseActivity<ActivityMainBinding, MyBaseViewModel>() {
getTarget() getTarget()
getV2x() getV2x()
//开启2个HTTP sse //开启2个HTTP sse
// getCarVehicle() getCarVehicle()
getAvpStatus() getAvpStatus()
} }
//获取车位占用情况 //获取车位占用情况
......
...@@ -30,8 +30,8 @@ import com.sd.cavphmi.highmap.WarnPtc ...@@ -30,8 +30,8 @@ import com.sd.cavphmi.highmap.WarnPtc
import com.sd.cavphmi.intfaces.OnWebSocketCb import com.sd.cavphmi.intfaces.OnWebSocketCb
import com.sd.cavphmi.net.MyResult import com.sd.cavphmi.net.MyResult
import com.sd.cavphmi.net.RequestBodyUtil import com.sd.cavphmi.net.RequestBodyUtil
import com.sd.cavphmi.net.SimpleSSEClient import com.sd.cavphmi.net.SseCallback
import com.sd.cavphmi.net.SseManager.SseCallback import com.sd.cavphmi.net.SseMultiConnectionManager
import com.sd.cavphmi.repositorys.AvpDataRepo import com.sd.cavphmi.repositorys.AvpDataRepo
import com.sd.cavphmi.utils.FileIoUtils import com.sd.cavphmi.utils.FileIoUtils
//import com.sd.cavphmi.utils.FileIoUtils //import com.sd.cavphmi.utils.FileIoUtils
...@@ -150,7 +150,7 @@ class MainVm @Inject constructor( ...@@ -150,7 +150,7 @@ class MainVm @Inject constructor(
it.close() it.close()
} }
//关闭Http sse 长连接 //关闭Http sse 长连接
SimpleSSEClient.instance.cancelContect() SseMultiConnectionManager.instance.stopAllConnections()
} }
...@@ -210,20 +210,23 @@ class MainVm @Inject constructor( ...@@ -210,20 +210,23 @@ class MainVm @Inject constructor(
private var avpCb = object : SseCallback { private var avpCb = object : SseCallback {
var url = "" var url = ""
override fun onConnected() { override fun onConnected(connId: String) {
println("-------- AVP状态 Sse open url = ${url}") println("-------- AVP状态 Sse open url = ${url} connId = ${connId}")
} }
override fun onDisconnected() { override fun onEventReceived(
} connId: String,
eventType: String?,
override fun onMessageReceived(data: String, eventType: String) { data: String
) {
viewModelScope.launch { viewModelScope.launch {
if (data.isNotEmpty()) { if (data.isNotEmpty()) {
// println("-------AVP状态 = ${data}") // println("-------AVP状态 = ${data}")
FileSdCardUtils.writeFileToDownload(data, "avp_status.txt")
try { try {
var result = gson.fromJson<AvpStatuBean>(data, AvpStatuBean::class.java) var result = gson.fromJson<AvpStatuBean>(data, AvpStatuBean::class.java)
if (result.haulingStageState != null) {
FileSdCardUtils.writeFileToDownload(data, "avp_status.txt")
}
avpStatu.emit(result) avpStatu.emit(result)
} catch (e: JsonSyntaxException) { } catch (e: JsonSyntaxException) {
e.printStackTrace() e.printStackTrace()
...@@ -232,11 +235,16 @@ class MainVm @Inject constructor( ...@@ -232,11 +235,16 @@ class MainVm @Inject constructor(
} }
} }
override fun onError(errorMsg: String, throwable: Throwable?) { override fun onFailed(connId: String, errorMsg: String) {
println("---------AVP 状态连接失败 connId = ${connId} errorMsg = ${errorMsg}")
}
override fun onClosed(connId: String) {
println("---------AVP 状态连接关闭 connId = ${connId}")
} }
override fun onReconnecting(delayMillis: Long) { override fun onReconnecting(delayMillis: Long) {
println("------AVP状态断开 ${delayMillis / 1000}秒后重连") println("------ AVP ${delayMillis / 1000}秒后开始重连")
} }
} }
...@@ -262,20 +270,23 @@ class MainVm @Inject constructor( ...@@ -262,20 +270,23 @@ class MainVm @Inject constructor(
//车辆位姿回调 //车辆位姿回调
private var carCb = object : SseCallback { private var carCb = object : SseCallback {
var url = "" var url = ""
override fun onConnected() { override fun onConnected(connId: String) {
println("--------车辆位姿 Sse open url = ${url}") println("--------车辆位姿 Sse open url = ${url} connId = ${connId}")
} }
override fun onDisconnected() { override fun onEventReceived(
} connId: String,
eventType: String?,
override fun onMessageReceived(data: String, eventType: String) { data: String
) {
viewModelScope.launch { viewModelScope.launch {
if (data.isNotEmpty()) { if (data.isNotEmpty()) {
// println("-----车辆位姿 = ${data}") // println("-----车辆位姿 = ${data}")
FileSdCardUtils.writeFileToDownload(data, "CarVehicle.txt")
try { try {
var result = gson.fromJson<CarVehicle>(data, CarVehicle::class.java) var result = gson.fromJson<CarVehicle>(data, CarVehicle::class.java)
if (result.businessStatus!=null){
FileSdCardUtils.writeFileToDownload(data, "CarVehicle.txt")
}
carVehicle.value = result carVehicle.value = result
} catch (e: JsonSyntaxException) { } catch (e: JsonSyntaxException) {
e.printStackTrace() e.printStackTrace()
...@@ -284,12 +295,18 @@ class MainVm @Inject constructor( ...@@ -284,12 +295,18 @@ class MainVm @Inject constructor(
} }
} }
override fun onError(errorMsg: String, throwable: Throwable?) { override fun onFailed(connId: String, errorMsg: String) {
println("---------车辆位置 连接失败 connId = ${connId} errorMsg = ${errorMsg}")
}
override fun onClosed(connId: String) {
println("---------车辆位置 状态连接失败 connId = ${connId}")
} }
override fun onReconnecting(delayMillis: Long) { override fun onReconnecting(delayMillis: Long) {
println("------车辆链接断开 ${delayMillis / 1000}秒后重连") println("------ 车辆位置 ${delayMillis / 1000}秒后开始重连")
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment