Commit 7498ae9a authored by p x's avatar p x
Browse files

22

parent e4c61b56
...@@ -4,7 +4,6 @@ import android.os.Handler ...@@ -4,7 +4,6 @@ import android.os.Handler
import android.os.Looper import android.os.Looper
import com.sd.cavphmi.moudule.NetworkModule import com.sd.cavphmi.moudule.NetworkModule
import okhttp3.ConnectionPool import okhttp3.ConnectionPool
import okhttp3.ConnectionSpec
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Request import okhttp3.Request
import okhttp3.RequestBody import okhttp3.RequestBody
...@@ -20,7 +19,7 @@ class SseManager { ...@@ -20,7 +19,7 @@ class SseManager {
// level = HttpLoggingInterceptor.Level.BODY // level = HttpLoggingInterceptor.Level.BODY
// } // }
private constructor() { constructor() {
var sslData = NetworkModule.getSSlSocketFactory() var sslData = NetworkModule.getSSlSocketFactory()
okHttpClient = OkHttpClient.Builder() okHttpClient = OkHttpClient.Builder()
...@@ -28,15 +27,15 @@ class SseManager { ...@@ -28,15 +27,15 @@ class SseManager {
.connectTimeout(30, TimeUnit.SECONDS) .connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS) .writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(0, TimeUnit.SECONDS) .readTimeout(0, TimeUnit.SECONDS)
.connectionPool(ConnectionPool(3, 5, TimeUnit.MINUTES)) // 连接池优化(可选) .connectionPool(ConnectionPool(5, 30, TimeUnit.SECONDS)) // 连接池优化(可选)
.pingInterval(20, TimeUnit.SECONDS) // TCP心跳间隔 .pingInterval(20, TimeUnit.SECONDS) // TCP心跳间隔
.retryOnConnectionFailure(true) .retryOnConnectionFailure(true)
.connectionSpecs( /* .connectionSpecs(
listOf( listOf(
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
ConnectionSpec.MODERN_TLS ConnectionSpec.MODERN_TLS
) )
) // 支持 HTTP/HTTPS ) // 支持 HTTP/HTTPS*/
.sslSocketFactory(sslData.socketFactory, sslData.trustAllCert) .sslSocketFactory(sslData.socketFactory, sslData.trustAllCert)
.hostnameVerifier { hostname, session -> true } .hostnameVerifier { hostname, session -> true }
...@@ -49,14 +48,18 @@ class SseManager { ...@@ -49,14 +48,18 @@ class SseManager {
private var eventSource: EventSource? = null private var eventSource: EventSource? = null
// private var lastEventId: String? = null // 记录最后一个事件 ID // private var lastEventId: String? = null // 记录最后一个事件 ID
private val listeners = mutableListOf<SseCallback>() // 客户端数据监听器 private val listeners = mutableListOf<SseCallback?>() // 客户端数据监听器
// 重连延迟(指数退避:1s → 2s → 4s → ... → 30s 上限) // 重连延迟(指数退避:1s → 2s → 4s → ... → 30s 上限)
private var retryDelayMillis = 1000L private var retryDelayMillis = 1000L
private val maxRetryDelay = 30 * 1000L // 最大重连延迟 private val maxRetryDelay = 30 * 1000L // 最大重连延迟
// 连接状态回调(给外部使用) // 连接状态回调(给外部使用)
var callback: SseCallback? = null private var callback: SseCallback? = null
fun addListener(callback: SseCallback?) {
listeners.add(callback)
}
// 初始化 SSE 请求 // 初始化 SSE 请求
fun connect(sseUrl: String, body: RequestBody?, headers: Map<String, String> = emptyMap()) { fun connect(sseUrl: String, body: RequestBody?, headers: Map<String, String> = emptyMap()) {
...@@ -88,7 +91,9 @@ class SseManager { ...@@ -88,7 +91,9 @@ class SseManager {
// 连接成功回调 // 连接成功回调
override fun onOpen(eventSource: EventSource, response: Response) { override fun onOpen(eventSource: EventSource, response: Response) {
super.onOpen(eventSource, response) super.onOpen(eventSource, response)
callback?.onConnected() listeners.forEach {
it?.onConnected()
}
retryDelayMillis = 1000L // 重置重连延迟(连接成功后恢复初始值) retryDelayMillis = 1000L // 重置重连延迟(连接成功后恢复初始值)
} }
...@@ -100,13 +105,17 @@ class SseManager { ...@@ -100,13 +105,17 @@ class SseManager {
data: String data: String
) { ) {
super.onEvent(eventSource, id, type, data) super.onEvent(eventSource, id, type, data)
callback?.onMessageReceived(data, type ?: "message") // type 是服务器定义的事件类型 listeners.forEach {
it?.onMessageReceived(data, type ?: "message")// type 是服务器定义的事件类型
}
} }
// 连接断开回调(主动/被动断开都会触发) // 连接断开回调(主动/被动断开都会触发)
override fun onClosed(eventSource: EventSource) { override fun onClosed(eventSource: EventSource) {
super.onClosed(eventSource) super.onClosed(eventSource)
callback?.onDisconnected() listeners.forEach {
it?.onDisconnected()
}
// 主动断开时不自动重连(通过 flag 控制) // 主动断开时不自动重连(通过 flag 控制)
if (!isManualDisconnect) { if (!isManualDisconnect) {
scheduleReconnect(sseUrl, body, headers) // 被动断开则重连 scheduleReconnect(sseUrl, body, headers) // 被动断开则重连
...@@ -121,7 +130,9 @@ class SseManager { ...@@ -121,7 +130,9 @@ class SseManager {
) { ) {
super.onFailure(eventSource, t, response) super.onFailure(eventSource, t, response)
val errorMsg = t?.message ?: "未知错误" val errorMsg = t?.message ?: "未知错误"
callback?.onError(errorMsg, t) listeners.forEach {
it?.onError(errorMsg, t)
}
scheduleReconnect(sseUrl, body, headers) // 失败后自动重连 scheduleReconnect(sseUrl, body, headers) // 失败后自动重连
} }
} }
......
...@@ -21,7 +21,7 @@ class AvpDataRepo @Inject constructor( ...@@ -21,7 +21,7 @@ class AvpDataRepo @Inject constructor(
private var retrofitMethod: ClientRetrofitMethod private var retrofitMethod: ClientRetrofitMethod
) { ) {
// private var simpleSSEClient = SimpleSSEClient.instance private var simpleSSEClient = SimpleSSEClient.instance
private var sseManager = SseManager.instance private var sseManager = SseManager.instance
/**获取车辆详情 /**获取车辆详情
...@@ -104,7 +104,7 @@ class AvpDataRepo @Inject constructor( ...@@ -104,7 +104,7 @@ class AvpDataRepo @Inject constructor(
*/ */
fun getAvpStatus(url: String, body: RequestBody, listener: SseCallback?) { fun getAvpStatus(url: String, body: RequestBody, listener: SseCallback?) {
// fun getAvpStatus(url: String, listener: SseCallback?) { // fun getAvpStatus(url: String, listener: SseCallback?) {
sseManager.callback = listener sseManager.addListener(listener)
sseManager.connect(url,body) sseManager.connect(url,body)
// simpleSSEClient.startSSE(url, body, listener) // simpleSSEClient.startSSE(url, body, listener)
} }
...@@ -112,7 +112,7 @@ class AvpDataRepo @Inject constructor( ...@@ -112,7 +112,7 @@ class AvpDataRepo @Inject constructor(
/**获取车辆位姿****/ /**获取车辆位姿****/
// fun getCarPose(url: String, listener: SseCallback?) { // fun getCarPose(url: String, listener: SseCallback?) {
fun getCarPose(url: String,body: RequestBody, listener: SseCallback?) { fun getCarPose(url: String,body: RequestBody, listener: SseCallback?) {
sseManager.callback = listener sseManager.addListener(listener)
sseManager.connect(url,body) sseManager.connect(url,body)
//// try { //// try {
// simpleSSEClient.startSSE(url, body, listener) // simpleSSEClient.startSSE(url, body, listener)
......
...@@ -167,7 +167,7 @@ class MainActivity : BaseActivity<ActivityMainBinding, MyBaseViewModel>() { ...@@ -167,7 +167,7 @@ class MainActivity : BaseActivity<ActivityMainBinding, MyBaseViewModel>() {
getTarget() getTarget()
getV2x() getV2x()
//开启2个HTTP sse //开启2个HTTP sse
getCarVehicle() // getCarVehicle()
getAvpStatus() getAvpStatus()
} }
//获取车位占用情况 //获取车位占用情况
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment