Commit 07602a7b authored by kang.nie@inzymeits.com's avatar kang.nie@inzymeits.com
Browse files

提交代码

parent e0c7be76
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.springframework.stereotype.Component;
/**
* 定位websocket服务类,针对单个设备/或车辆
* <p>type=1 港机定位</p>
*
* @author 成东
* @since 2022-02-23 9:58
*/
@Component
@ServerEndpoint(value = "/locationWebSocket/crane")
@Slf4j
public class LocationWebSocketService {
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static ConcurrentMap<String, WebSocketSession> webSocketMap = new ConcurrentHashMap<>();
public static boolean isPush() {
for (WebSocketSession w : webSocketMap.values()) {
if (w.getSession().isOpen()) {
return true;
}
}
return false;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
log.info(String.format("港机定位websocket打开新会话%s 成功", session.getId()));
webSocketMap.put(session.getId(), new WebSocketSession(session));
onlineCount.incrementAndGet();
sendMessage(session, "连接成功");
} catch (Exception e) {
log.error("websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
log.info(String.format("港机定位websocket会话:%s 关闭,当剩余前会话数:%d", session.getId(),
onlineCount.decrementAndGet()));
webSocketMap.remove(session.getId());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info(String.format("收到来自会话%s的信息: %s", session.getId(), message));
}
/**
*
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
/**
* 消息广播
*/
public static void messageBroadcast(Object messages) {
webSocketMap.values().parallelStream().forEach(webSocketSession -> {
webSocketSession.sendInfo(messages);
});
}
/**
* 实现服务器主动推送
*/
public void sendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(message);
}
private class WebSocketSession {
@Getter
private final Session session;
private final AtomicInteger printSwitch = new AtomicInteger(0);
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger missCount = new AtomicInteger(0);
public WebSocketSession(Session session) {
this.session = session;
}
public void sendInfo(Object messages) {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(JSON.toJSONString(messages));
successCount.incrementAndGet();
} catch (IOException e) {
log.info(String.format("港机定位websocket推送失败: %s, %s ", session.getId(), messages), e);
missCount.incrementAndGet();
}
}
//定期状态输出
int minuteOfHour = DateTime.now().getMinuteOfHour();
int printMinute = 5;
if (printSwitch.get() < 1 && minuteOfHour % printMinute == 0) {
log.info(String.format("港机定位webSocket: %s 成功推送次数: %s, 推送失败次数:%s", session.getId(),
successCount.get(), missCount.get()));
printSwitch.incrementAndGet();
} else if (minuteOfHour % printMinute != 0) {
printSwitch.set(0);
}
}
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSONObject;
import com.ssi.controller.platform.VehicleTrackController;
import com.ssi.response.SSIResponse;
import com.ssi.service.platform.impl.VehicleTrackServiceImpl;
import com.ssi.utils.SpringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 推动交通灯信息到前端
*/
@ServerEndpoint(value = "/ReplaySocketServer")
@Component
public class ReplaySocketServer {
private static Logger log = LoggerFactory.getLogger(ReplaySocketServer.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<ReplaySocketServer> webSocketSet = new CopyOnWriteArraySet<>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private Queue<Map<String, Object>> replayCache = new LinkedBlockingQueue<>();
public static void messageBroadcast(String messages) {
webSocketSet.parallelStream().forEach(webSocketSession -> {
try {
// webSocketSession.session.getBasicRemote().sendText(messages);
webSocketSession.sendMessage(null);
}catch (Exception e){
log.error("ReplaySocketServer服务发送消息失败:",e);
}
});
}
public Queue<Map<String, Object>> getReplayCache() {
return replayCache;
}
public void setReplayCache(List<Map<String, Object>> replayCaches) {
replayCache.clear();
replayCache.addAll(replayCaches);
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) throws Exception {
log.info("ReplaySocketServer服务新增用户连接;{}",session.getId());
this.session=session;
webSocketSet.add(this);
log.info("当前用户数为:{}",onlineCount.incrementAndGet());
// session.getBasicRemote().sendText("连接成功");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
log.info("用户离线当前用户数为:{}", onlineCount.decrementAndGet());
webSocketSet.remove(this);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(message);
VehicleTrackServiceImpl vehicleTrackService = SpringUtils.getObject(VehicleTrackServiceImpl.class);
SSIResponse response = vehicleTrackService.smallTrack(jsonObject.getString("vin"), jsonObject.getLong("startTime"), jsonObject.getLong("stopTime"), null);
List<Map<String, Object>> listData = (List<Map<String, Object>>) response.getData();
if(CollectionUtils.isNotEmpty(listData)){
setReplayCache(listData);
}
}
public void sendMessage(String message){
Queue<Map<String, Object>> replayCache = getReplayCache();
if(replayCache.size()<1){
return;
}
Map<String, Object> poll = replayCache.poll();
try {
this.session.getBasicRemote().sendText(JSONObject.toJSONString(poll));
}catch (Exception e){
log.error("ReplaySocketServer服务发送消息失败:",e);
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ssi.model.RedisDataModel;
import com.ssi.utils.ConfigUtils;
import com.ssi.utils.SpringUtils;
import com.ssi.websocket.vo.CraneLocation;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.tomcat.websocket.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 推动交通灯信息到前端
*/
@ServerEndpoint(value = "/Websocket/TrafficLampInfo")
@Component
public class TrafficLampInfoWebSocketServer {
private static Logger log = LoggerFactory.getLogger(TrafficLampInfoWebSocketServer.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<TrafficLampInfoWebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) throws Exception {
log.info("TrafficLampInfoWebSocketServer服务新增用户连接;{}",session.getId());
this.session=session;
webSocketSet.add(this);
log.info("当前用户数为:{}",onlineCount.incrementAndGet());
// session.getBasicRemote().sendText("连接成功");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
log.info("用户离线当前用户数为:{}", onlineCount.decrementAndGet());
webSocketSet.remove(this);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws Exception {
}
public static void messageBroadcast(String messages) {
webSocketSet.parallelStream().forEach(webSocketSession -> {
try {
webSocketSession.session.getBasicRemote().sendText(messages);
}catch (Exception e){
log.error("TrafficLampInfoWebSocketServer服务发送消息失败:",e);
}
});
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.ssi.entity.VehicleRoad;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/**
*
* 车路协调websocket
*/
@ServerEndpoint("/vehicleRoadCoordinationWebSocketServer")
@Component
public class VehicleRoadCoordinationWebSocketServer {
private final static Logger log = LoggerFactory.getLogger(VehicleRoadCoordinationWebSocketServer.class);
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<VehicleRoadCoordinationWebSocketServer> webSocketSet = new CopyOnWriteArraySet<VehicleRoadCoordinationWebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
log.info("有新窗口开始监听:" + session.getId() + ",当前在线人数为" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + session.getId() + "的信息:" + message);
// VehicleRoad vehicleRoad = JSON.parseObject(message, VehicleRoad.class);
}
public void addIgnoreMessage(VehicleRoad vehicleRoad){
// ignoreMap.put(key,new Date());
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误",error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
VehicleRoadCoordinationWebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
VehicleRoadCoordinationWebSocketServer.onlineCount--;
}
public static void sendInfo(String message) {
log.info("异常推送websocket收到信息:" + message);
//群发消息
for (VehicleRoadCoordinationWebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void sendMessage(VehicleRoad vehicleRoad) {
//群发消息
for (VehicleRoadCoordinationWebSocketServer item : webSocketSet) {
try {
item.sendMessage(JSON.toJSONString(vehicleRoad));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.ssi.entity.VmsVehicle;
import com.ssi.model.VehicleBaseInfoModel;
import com.ssi.service.WebSocketDataBackService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @description
* 用于脚本数据的回放
*/
@ServerEndpoint(value="/VMSWebBacksocket")
@Component
public class WebBackSocketService {
private static Logger log = LoggerFactory.getLogger(WebBackSocketService.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Map<String, WebSocketSession> webSocketSet = Maps.newConcurrentMap();
private Session session;
private String token;
private static WebSocketDataBackService webSocketDataBackService;
private static com.ssi.model.VehicleBaseInfoModel VehicleBaseInfoModel;
/**
* 连接建立成功调用的方法
* */
@OnOpen
public void onOpen(Session session) {
try {
this.session = session;
this.token = session.getId();
WebSocketSession webSocketSession = new WebSocketSession(this);
WebBackSocketService.webSocketSet.put(token, webSocketSession);
addOnlineCount();
log.info(String.format("打开数据回放新会话%s成功。", token));
}catch (Exception e){
log.error("打开数据回放websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.get(this.token).close();
webSocketSet.remove(this.token);
subOnlineCount();
log.info(String.format("数据回放会话:%s 关闭,当剩余前会话数:%s", token, getOnlineCount()));
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
try {
if(StringUtils.isNotBlank(message)) {
JSONObject messageJson = JSON.parseObject(message);
if (messageJson != null) {
Set<String> vins;
String vinStr = messageJson.getString("vins");
if (StringUtils.isNotBlank(vinStr)) {
vins = Sets.newHashSet(vinStr.split(","));
} else {
vins = getVins();
}
if(!vins.isEmpty()) {
WebSocketSession webSocketSession
= WebBackSocketService.webSocketSet.get(session.getId());
Long startTime = messageJson.getLong("startTime");
Long stopTime = messageJson.getLong("stopTime");
String backorId = messageJson.getString("id");
if(backorId == null){
backorId = token;
}
backorId = String.format("%s,%s,%s,%s,%s", backorId, vinStr, "", startTime, stopTime);
int status = webSocketDataBackService.playBack(session.getId(), backorId, "", vins, startTime, stopTime);
if(status == 0){
webSocketSession.backorIds.add(backorId);
}
}
}
}
} catch (Exception e) {
log.info(String.format("数据回放客户端参数解析失败:%s", message), e);
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("数据回放会话:%s 发生错误.", session.getId()), error);
webSocketSet.get(session.getId()).close();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 消息广播
* */
public static void messageBroadcast(String backorId, Set<String> sessions, String json) {
messageBroadcast(backorId, sessions, json, false);
}
public static void messageBroadcast(String backorId, Set<String> sessions, List<String> recordList) {
messageBroadcast(backorId, sessions, recordList, false);
}
public static void messageBroadcast(String backorId, Set<String> sessions, String json, boolean last) {
for (String session : sessions) {
webSocketSet.get(session).sendInfo(backorId, json, last);
}
}
public static void messageBroadcast(String backorId, Set<String> sessions, List<String> recordList, boolean last) {
for (String session : sessions) {
webSocketSet.get(session).sendInfo(backorId, recordList, last);
}
}
public static int getOnlineCount() {
return WebBackSocketService.onlineCount.get();
}
public static void addOnlineCount() {
WebBackSocketService.onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
WebBackSocketService.onlineCount.decrementAndGet();
}
private class WebSocketSession{
private WebBackSocketService webSocketService;
private Set<String> backorIds = Sets.newConcurrentHashSet();
public WebSocketSession(WebBackSocketService webSocketService) {
this.webSocketService = webSocketService;
}
public synchronized void sendInfo(String backorId, String json, boolean last){
try {
if(json != null) {
boolean complete = last;
if(json.startsWith("#")){
complete = true;
json = json.substring(1);
}
webSocketService.sendMessage(String.format("{\"last\":%s,\"timeStamp\":%s, \"car\":%s}",
complete, System.currentTimeMillis(), json));
}
} catch (IOException e) {
log.info(String.format("%s数据回放推送失败: %s, 数据量:%s", backorId, webSocketService.token, json), e);
}finally {
if(last && backorIds.contains(backorId)){
close(backorId);
}
}
}
public synchronized void sendInfo(String backorId, List<String> recordList, boolean last){
List<Map<String, Object>> resultList = Lists.newArrayList();
try {
if(!recordList.isEmpty()){
recordList.stream().forEach(record -> {
String json = record;
if(json != null) {
boolean complete = last;
if(json.startsWith("#")){
complete = true;
json = json.substring(1);
}
Map<String, Object> res = Maps.newHashMap();
res.put("last", complete);
res.put("timeStamp", System.currentTimeMillis());
res.put("car", JSONObject.parse(json));
resultList.add(res);
}
});
webSocketService.sendMessage(JSONObject.toJSONString(resultList));
}
} catch (IOException e) {
log.info(String.format("%s数据回放推送失败: %s, 数据量:%s", backorId, webSocketService.token, JSONObject.toJSONString(resultList)), e);
}finally {
if(last && backorIds.contains(backorId)){
close(backorId);
}
}
}
private void close(String backorId){
webSocketDataBackService.close(webSocketService.token, backorId);
}
private void close(){
backorIds.forEach(backorId -> {
webSocketDataBackService.close(webSocketService.token, backorId);
});
}
}
public static boolean hasWebSocketSession() {
return !webSocketSet.isEmpty();
}
private Set<String> getVins(){
Set<String> vins = null;
List<VmsVehicle> allVehicleBaseInfo = VehicleBaseInfoModel.getAllVehicleInfo();
if (allVehicleBaseInfo != null && !allVehicleBaseInfo.isEmpty()) {
vins = new HashSet<>();
for (VmsVehicle vehicle : allVehicleBaseInfo) {
vins.add(vehicle.getVin());
}
}
return vins;
}
@Autowired
public void setWebSocketDataBackKafkaModel(WebSocketDataBackService webSocketDataBackService) {
WebBackSocketService.webSocketDataBackService = webSocketDataBackService;
}
@Autowired
public void setCismVehicleBaseInfoModel(VehicleBaseInfoModel VehicleBaseInfoModel) {
WebBackSocketService.VehicleBaseInfoModel = VehicleBaseInfoModel;
}
}
package com.ssi.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* websocket配置类
*/
@Configuration
public class WebSocketConfig {
/**
* 服务器节点
* <p>
* 如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @description 根据回话推送数据
*/
@ServerEndpoint(value = "/VMSWebsocket")
@Component
public class WebSocketService {
private static Logger log = LoggerFactory.getLogger(WebSocketService.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Map<String, WebSocketSession> webSocketSet = Maps.newConcurrentMap();
private Session session;
private String token;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
this.session = session;
WebSocketSession webSocketSession = new WebSocketSession();
webSocketSession.webSocketService = this;
this.token = session.getId();
WebSocketService.webSocketSet.put(token, webSocketSession);
addOnlineCount();
log.info(String.format("打开新会话%s成功。", token));
} catch (Exception e) {
log.error("websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this.token);
subOnlineCount();
log.info(String.format("会话:%s 关闭,当剩余前会话数:%s", token, getOnlineCount()));
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
if (StringUtils.isNotBlank(message)) {
JSONObject messageJson = JSON.parseObject(message);
if (messageJson != null) {
String vinStr = messageJson.getString("vins");
Set<String> vins;
if (StringUtils.isNotBlank(vinStr)) {
vins = Sets.newHashSet(vinStr.split(","));
} else {
vins = Sets.newHashSet();
}
WebSocketSession webSocketSession = WebSocketService.webSocketSet.get(session.getId());
webSocketSession.setVins(vins);
log.info(String.format("收到来自会话%s的信息: %s, vins: %s", session.getId(), message, vins));
}
}
} catch (Exception e) {
log.info(String.format("客户端参数解析失败:%s", message), e);
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 消息广播
*/
public static void messageBroadcast(String vin, Map<String, Object> messages) {
webSocketSet.values()
.parallelStream()
.forEach(webSocketSession -> {
webSocketSession.sendInfo(vin, messages);
});
}
public static int getOnlineCount() {
return WebSocketService.onlineCount.get();
}
public static void addOnlineCount() {
WebSocketService.onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
WebSocketService.onlineCount.decrementAndGet();
}
private class WebSocketSession {
private WebSocketService webSocketService;
private Set<String> vins = new HashSet<>();
private Object lock = new Object();
private int printMinute = 5;
private AtomicInteger printSwitch = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger missCount = new AtomicInteger(0);
public void sendInfo(String vin, Map<String, Object> messages) {
synchronized (lock) {
Map<String, Object> res;
if (vins.isEmpty() || vins.contains(vin)) {
res = Maps.newHashMap();
res.put("timeStamp", System.currentTimeMillis());
res.put("car", messages);
try {
webSocketService.sendMessage(JSON.toJSONString(res));
successCount.incrementAndGet();
} catch (IOException e) {
log.info(String.format("推送失败: %s, %s ", webSocketService.token, messages), e);
missCount.incrementAndGet();
}
}
}
//定期状态输出
int minuteOfHour = DateTime.now().getMinuteOfHour();
if (printSwitch.get() < 1 && minuteOfHour % printMinute == 0) {
log.info(String.format("webSocket: %s 成功推送次数: %s, 推送失败次数:%s",
webSocketService.token, successCount.get(), missCount.get()));
printSwitch.incrementAndGet();
} else if (minuteOfHour % printMinute != 0) {
printSwitch.set(0);
}
}
public void setVins(Set<String> vins) {
synchronized (lock) {
this.vins = vins;
}
}
}
public static boolean hasWebSocketSession() {
return !webSocketSet.isEmpty();
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.ssi.model.TelecontrolModel;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 调试APP使用的websocket
*/
@ServerEndpoint(value = "/debugAppVMSWebsocket")
@Component
public class WebSocketServiceForDebugApp {
private static Logger log = LoggerFactory.getLogger(WebSocketServiceForDebugApp.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
@Getter
private static Map<String, WebSocketSession> webSocketSet = Maps.newConcurrentMap();
private Session session;
private String token;
@Setter
private static TelecontrolModel telecontrolModel;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
this.session = session;
WebSocketSession webSocketSession = new WebSocketSession();
webSocketSession.webSocketService = this;
this.token = session.getId();
WebSocketServiceForDebugApp.webSocketSet.put(token, webSocketSession);
addOnlineCount();
log.info(String.format("打开新会话%s成功。", token));
} catch (Exception e) {
log.error("websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session, CloseReason closeReason) {
CloseReason.CloseCode closeCode = closeReason.getCloseCode();
WebSocketSession webSocketSession = webSocketSet.get(session.getId());
if(CloseReason.CloseCodes.NORMAL_CLOSURE.getCode() == closeCode.getCode()){
telecontrolModel.deleteSocketOvertime(session.getId(),webSocketSession.getControlledVin());
}
webSocketSet.remove(this.token);
subOnlineCount();
log.info(String.format("会话:%s 关闭,当剩余前会话数:%s", token, getOnlineCount()));
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
WebSocketSession webSocketSession = webSocketSet.get(session.getId());
if(webSocketSession.getControlledVin() != null){
telecontrolModel.setOrRefreshSocketOvertime(session.getId(),webSocketSession.getControlledVin());
}
if("1".equals(message)){
log.debug(String.format("收到session%s心跳消息",session.getId()));
return;
}
if (StringUtils.isNotBlank(message)) {
JSONObject messageJson = JSON.parseObject(message);
if (messageJson != null) {
String vinStr = messageJson.getString("vins");
Set<String> vins;
if (StringUtils.isNotBlank(vinStr)) {
vins = Sets.newHashSet(vinStr.split(","));
webSocketSession.setControlledVin(vinStr.split(",")[0]);
} else {
vins = Sets.newHashSet();
}
webSocketSession.setVins(vins);
log.info(String.format("收到来自会话%s的信息: %s, vins: %s", session.getId(), message, vins));
}
}
} catch (Exception e) {
log.info(String.format("客户端参数解析失败:%s", message), e);
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
String id = session.getId();
log.error(String.format("会话:%s 发生错误.", id), error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 消息广播
*/
public static void messageBroadcast(String vin, Map<String, Object> messages) {
webSocketSet.values()
.parallelStream()
.forEach(webSocketSession -> {
webSocketSession.sendInfo(vin, messages);
});
}
public static int getOnlineCount() {
return WebSocketServiceForDebugApp.onlineCount.get();
}
public static void addOnlineCount() {
WebSocketServiceForDebugApp.onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
WebSocketServiceForDebugApp.onlineCount.decrementAndGet();
}
private class WebSocketSession {
private WebSocketServiceForDebugApp webSocketService;
@Getter
private Set<String> vins = new HashSet<>();
@Getter
@Setter
private String controlledVin;
private Object lock = new Object();
private int printMinute = 5;
private AtomicInteger printSwitch = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger missCount = new AtomicInteger(0);
public void sendInfo(String vin, Map<String, Object> messages) {
synchronized (lock) {
Map<String, Object> res;
if (vins.isEmpty() || vins.contains(vin)) {
res = Maps.newHashMap();
res.put("timeStamp", System.currentTimeMillis());
res.put("car", messages);
try {
webSocketService.sendMessage(JSON.toJSONString(res));
successCount.incrementAndGet();
} catch (IOException e) {
log.info(String.format("推送失败: %s, %s ", webSocketService.token, messages), e);
missCount.incrementAndGet();
}
}
}
//定期状态输出
int minuteOfHour = DateTime.now().getMinuteOfHour();
if (printSwitch.get() < 1 && minuteOfHour % printMinute == 0) {
log.info(String.format("webSocket: %s 成功推送次数: %s, 推送失败次数:%s",
webSocketService.token, successCount.get(), missCount.get()));
printSwitch.incrementAndGet();
} else if (minuteOfHour % printMinute != 0) {
printSwitch.set(0);
}
}
public void setVins(Set<String> vins) {
synchronized (lock) {
this.vins = vins;
}
}
}
public static boolean hasWebSocketSession() {
return !webSocketSet.isEmpty();
}
public static Set<String> getVinsBySessionId(String sessionId){
WebSocketSession webSocketSession = WebSocketServiceForDebugApp.getWebSocketSet().get(sessionId);
return webSocketSession.getVins();
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 大屏及web使用的websocket,推送车辆任务信息
*/
@ServerEndpoint(value = "/VMSWebsocketForScreenList")
@Component
public class WebSocketServiceForScreenList {
private static Logger log = LoggerFactory.getLogger(WebSocketServiceForScreenList.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Map<String, WebSocketSession> webSocketSet = Maps.newConcurrentMap();
private Session session;
private String token;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
this.session = session;
WebSocketSession webSocketSession = new WebSocketSession();
webSocketSession.webSocketService = this;
this.token = session.getId();
WebSocketServiceForScreenList.webSocketSet.put(token, webSocketSession);
addOnlineCount();
log.info(String.format("打开新会话%s成功。", token));
} catch (Exception e) {
log.error("websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this.token);
subOnlineCount();
log.info(String.format("会话:%s 关闭,当剩余前会话数:%s", token, getOnlineCount()));
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
if (StringUtils.isNotBlank(message)) {
JSONObject messageJson = JSON.parseObject(message);
if (messageJson != null) {
String vinStr = messageJson.getString("vins");
Set<String> vins;
if (StringUtils.isNotBlank(vinStr)) {
vins = Sets.newHashSet(vinStr.split(","));
} else {
vins = Sets.newHashSet();
}
WebSocketSession webSocketSession = WebSocketServiceForScreenList.webSocketSet.get(session.getId());
webSocketSession.setVins(vins);
log.info(String.format("收到来自会话%s的信息: %s, vins: %s", session.getId(), message, vins));
}
}
} catch (Exception e) {
log.info(String.format("客户端参数解析失败:%s", message), e);
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 消息广播
*/
public static void messageBroadcast(Collection<Map<String, Object>> messages) {
webSocketSet.values()
.parallelStream()
.forEach(webSocketSession -> {
webSocketSession.sendInfo(messages);
});
}
/**
* 消息广播
*/
public static void messageBroadcast(String message) {
webSocketSet.values().parallelStream().forEach(webSocketSession -> {
webSocketSession.sendInfo(message);
});
}
public static int getOnlineCount() {
return WebSocketServiceForScreenList.onlineCount.get();
}
public static void addOnlineCount() {
WebSocketServiceForScreenList.onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
WebSocketServiceForScreenList.onlineCount.decrementAndGet();
}
private class WebSocketSession {
private WebSocketServiceForScreenList webSocketService;
private Set<String> vins = new HashSet<>();
private Object lock = new Object();
private int printMinute = 5;
private AtomicInteger printSwitch = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger missCount = new AtomicInteger(0);
public void sendInfo(Collection<Map<String, Object>> messages) {
List<Map<String, Object>> resultList = Lists.newArrayList();
Iterator<Map<String, Object>> iter = messages.iterator() ;
while(iter.hasNext())
{
Map<String, Object> res = Maps.newHashMap();
res.put("timeStamp", System.currentTimeMillis());
res.put("car", iter.next());
resultList.add(res);
}
try {
webSocketService.sendMessage(JSON.toJSONString(resultList));
successCount.incrementAndGet();
} catch (IOException e) {
log.info(String.format("推送失败: %s, %s ", webSocketService.token, messages), e);
missCount.incrementAndGet();
}
//定期状态输出
int minuteOfHour = DateTime.now().getMinuteOfHour();
if (printSwitch.get() < 1 && minuteOfHour % printMinute == 0) {
log.info(String.format("大屏list webSocket: %s 成功推送次数: %s, 推送失败次数:%s",
webSocketService.token, successCount.get(), missCount.get()));
printSwitch.incrementAndGet();
} else if (minuteOfHour % printMinute != 0) {
printSwitch.set(0);
}
}
public void setVins(Set<String> vins) {
synchronized (lock) {
this.vins = vins;
}
}
public void sendInfo(String message) {
try {
webSocketService.sendMessage(message);
} catch (IOException e) {
log.error(String.format("推送文本消息失败: %s, %s ", webSocketService.token, message), e);
}
}
}
public static boolean hasWebSocketSession() {
return !webSocketSet.isEmpty();
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.ssi.websocket.push.task.WebSocketScheduledSendTask;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 大屏及web使用的websocket,推送车辆任务信息
*/
@ServerEndpoint(value = "/WSSForScreenOfflineList")
@Component
public class WebSocketServiceForScreenOfflineList {
private static Logger log = LoggerFactory.getLogger(WebSocketServiceForScreenOfflineList.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Map<String, WebSocketSession> webSocketSet = Maps.newConcurrentMap();
private Session session;
private String token;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
this.session = session;
WebSocketSession webSocketSession = new WebSocketSession();
webSocketSession.webSocketService = this;
this.token = session.getId();
WebSocketServiceForScreenOfflineList.webSocketSet.put(token, webSocketSession);
addOnlineCount();
log.info(String.format("打开新会话%s成功。", token));
} catch (Exception e) {
log.error("websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this.token);
subOnlineCount();
log.info(String.format("会话:%s 关闭,当剩余前会话数:%s", token, getOnlineCount()));
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
if (StringUtils.isNotBlank(message)) {
JSONObject messageJson = JSON.parseObject(message);
if (messageJson != null) {
String vinStr = messageJson.getString("vins");
Set<String> vins;
if (StringUtils.isNotBlank(vinStr)) {
vins = Sets.newHashSet(vinStr.split(","));
} else {
vins = Sets.newHashSet();
}
WebSocketSession webSocketSession = WebSocketServiceForScreenOfflineList.webSocketSet.get(session.getId());
webSocketSession.setVins(vins);
log.info(String.format("收到来自会话%s的信息: %s, vins: %s", session.getId(), message, vins));
}
}
} catch (Exception e) {
log.info(String.format("客户端参数解析失败:%s", message), e);
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 消息广播
*/
public static void messageBroadcast(Collection<Map<String, Object>> messages) {
webSocketSet.values()
.parallelStream()
.forEach(webSocketSession -> {
webSocketSession.sendInfo(messages);
});
}
/**
* 消息广播
*/
public static void messageBroadcast(String message) {
webSocketSet.values().parallelStream().forEach(webSocketSession -> {
webSocketSession.sendInfo(message);
});
}
public static int getOnlineCount() {
return WebSocketServiceForScreenOfflineList.onlineCount.get();
}
public static void addOnlineCount() {
WebSocketServiceForScreenOfflineList.onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
WebSocketServiceForScreenOfflineList.onlineCount.decrementAndGet();
}
private class WebSocketSession {
private WebSocketServiceForScreenOfflineList webSocketService;
private Set<String> vins = new HashSet<>();
private Object lock = new Object();
private int printMinute = 5;
private AtomicInteger printSwitch = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger missCount = new AtomicInteger(0);
public void sendInfo(Collection<Map<String, Object>> messages) {
List<Map<String, Object>> resultList = Lists.newArrayList();
Iterator<Map<String, Object>> iter = messages.iterator() ;
while(iter.hasNext())
{
Map<String, Object> res = Maps.newHashMap();
res.put("timeStamp", System.currentTimeMillis());
res.put("car", iter.next());
resultList.add(res);
}
try {
webSocketService.sendMessage(JSON.toJSONString(resultList));
successCount.incrementAndGet();
} catch (IOException e) {
log.info(String.format("推送失败: %s, %s ", webSocketService.token, messages), e);
missCount.incrementAndGet();
}
//定期状态输出
int minuteOfHour = DateTime.now().getMinuteOfHour();
if (printSwitch.get() < 1 && minuteOfHour % printMinute == 0) {
log.info(String.format("大屏list webSocket: %s 成功推送次数: %s, 推送失败次数:%s",
webSocketService.token, successCount.get(), missCount.get()));
printSwitch.incrementAndGet();
} else if (minuteOfHour % printMinute != 0) {
printSwitch.set(0);
}
}
public void setVins(Set<String> vins) {
synchronized (lock) {
this.vins = vins;
}
}
public void sendInfo(String message) {
try {
webSocketService.sendMessage(message);
} catch (IOException e) {
log.error(String.format("推送文本消息失败: %s, %s ", webSocketService.token, message), e);
}
}
}
public static boolean hasWebSocketSession() {
return !webSocketSet.isEmpty();
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 大屏及web使用的websocket,推送车辆任务信息
*/
@ServerEndpoint(value = "/WSSForScreenSomeoneCardList")
@Component
public class WebSocketServiceForScreenSomeoneCardList {
private static Logger log = LoggerFactory.getLogger(WebSocketServiceForScreenSomeoneCardList.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Map<String, WebSocketSession> webSocketSet = Maps.newConcurrentMap();
private Session session;
private String token;
private Integer pushSwitch;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
this.session = session;
WebSocketSession webSocketSession = new WebSocketSession();
webSocketSession.webSocketService = this;
this.token = session.getId();
WebSocketServiceForScreenSomeoneCardList.webSocketSet.put(token, webSocketSession);
addOnlineCount();
log.info(String.format("打开新会话%s成功。", token));
} catch (Exception e) {
log.error("websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this.token);
subOnlineCount();
log.info(String.format("会话:%s 关闭,当剩余前会话数:%s", token, getOnlineCount()));
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
if (StringUtils.isNotBlank(message)) {
JSONObject messageJson = JSON.parseObject(message);
if (messageJson != null) {
pushSwitch = Objects.nonNull(messageJson.getInteger("pushSwitch"))?messageJson.getInteger("pushSwitch"):0;
String vinStr = messageJson.getString("vins");
Set<String> vins;
if (StringUtils.isNotBlank(vinStr)) {
vins = Sets.newHashSet(vinStr.split(","));
} else {
vins = Sets.newHashSet();
}
WebSocketSession webSocketSession = WebSocketServiceForScreenSomeoneCardList.webSocketSet.get(session.getId());
webSocketSession.setVins(vins);
log.info(String.format("收到来自会话%s的信息: %s, vins: %s", session.getId(), message, vins));
}
}
} catch (Exception e) {
log.info(String.format("客户端参数解析失败:%s", message), e);
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
if(this.pushSwitch==1){
this.session.getBasicRemote().sendText(message);
}else {
JSONArray jsonArray = new JSONArray();
this.session.getBasicRemote().sendText(jsonArray.toJSONString());
}
}
/**
* 消息广播
*/
public static void messageBroadcast(Collection<Map<String, Object>> messages) {
webSocketSet.values()
.parallelStream()
.forEach(webSocketSession -> {
webSocketSession.sendInfo(messages);
});
}
/**
* 消息广播
*/
public static void messageBroadcast(String message) {
webSocketSet.values().parallelStream().forEach(webSocketSession -> {
webSocketSession.sendInfo(message);
});
}
public static int getOnlineCount() {
return WebSocketServiceForScreenSomeoneCardList.onlineCount.get();
}
public static void addOnlineCount() {
WebSocketServiceForScreenSomeoneCardList.onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
WebSocketServiceForScreenSomeoneCardList.onlineCount.decrementAndGet();
}
private class WebSocketSession {
private WebSocketServiceForScreenSomeoneCardList webSocketService;
private Set<String> vins = new HashSet<>();
private Object lock = new Object();
private int printMinute = 5;
private AtomicInteger printSwitch = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger missCount = new AtomicInteger(0);
public void sendInfo(Collection<Map<String, Object>> messages) {
List<Map<String, Object>> resultList = Lists.newArrayList();
Iterator<Map<String, Object>> iter = messages.iterator() ;
while(iter.hasNext())
{
Map<String, Object> res = Maps.newHashMap();
res.put("timeStamp", System.currentTimeMillis());
res.put("car", iter.next());
resultList.add(res);
}
try {
webSocketService.sendMessage(JSON.toJSONString(resultList));
successCount.incrementAndGet();
} catch (IOException e) {
log.info(String.format("推送失败: %s, %s ", webSocketService.token, messages), e);
missCount.incrementAndGet();
}
//定期状态输出
int minuteOfHour = DateTime.now().getMinuteOfHour();
if (printSwitch.get() < 1 && minuteOfHour % printMinute == 0) {
log.info(String.format("大屏list webSocket: %s 成功推送次数: %s, 推送失败次数:%s",
webSocketService.token, successCount.get(), missCount.get()));
printSwitch.incrementAndGet();
} else if (minuteOfHour % printMinute != 0) {
printSwitch.set(0);
}
}
public void setVins(Set<String> vins) {
synchronized (lock) {
this.vins = vins;
}
}
public void sendInfo(String message) {
try {
webSocketService.sendMessage(message);
} catch (IOException e) {
log.error(String.format("推送文本消息失败: %s, %s ", webSocketService.token, message), e);
}
}
}
public static boolean hasWebSocketSession() {
return !webSocketSet.isEmpty();
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 大屏及web使用的websocket,推送车辆任务信息
*/
@ServerEndpoint(value = "/VMSWebsocketForVideoList")
@Component
public class WebSocketServiceForVideoList {
private static Logger log = LoggerFactory.getLogger(WebSocketServiceForVideoList.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Map<String, WebSocketSession> webSocketSet = Maps.newConcurrentMap();
private Session session;
private String token;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
this.session = session;
WebSocketSession webSocketSession = new WebSocketSession();
webSocketSession.webSocketService = this;
this.token = session.getId();
WebSocketServiceForVideoList.webSocketSet.put(token, webSocketSession);
addOnlineCount();
log.info(String.format("打开新会话%s成功。", token));
} catch (Exception e) {
log.error("websocket 打开异常", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this.token);
subOnlineCount();
log.info(String.format("会话:%s 关闭,当剩余前会话数:%s", token, getOnlineCount()));
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
if (StringUtils.isNotBlank(message)) {
JSONObject messageJson = JSON.parseObject(message);
if (messageJson != null) {
String vinStr = messageJson.getString("vins");
Set<String> vins;
if (StringUtils.isNotBlank(vinStr)) {
vins = Sets.newHashSet(vinStr.split(","));
} else {
vins = Sets.newHashSet();
}
WebSocketSession webSocketSession = WebSocketServiceForVideoList.webSocketSet.get(session.getId());
webSocketSession.setVins(vins);
log.info(String.format("收到来自会话%s的信息: %s, vins: %s", session.getId(), message, vins));
}
}
} catch (Exception e) {
log.info(String.format("客户端参数解析失败:%s", message), e);
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 消息广播
*/
public static void messageBroadcast(Collection<Map<String, Object>> messages) {
webSocketSet.values()
.parallelStream()
.forEach(webSocketSession -> {
webSocketSession.sendInfo(messages);
});
}
/**
* 消息广播
*/
public static void messageBroadcast(String message) {
webSocketSet.values().parallelStream().forEach(webSocketSession -> {
webSocketSession.sendInfo(message);
});
}
public static int getOnlineCount() {
return WebSocketServiceForVideoList.onlineCount.get();
}
public static void addOnlineCount() {
WebSocketServiceForVideoList.onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
WebSocketServiceForVideoList.onlineCount.decrementAndGet();
}
private class WebSocketSession {
private WebSocketServiceForVideoList webSocketService;
private Set<String> vins = new HashSet<>();
private Object lock = new Object();
private int printMinute = 5;
private AtomicInteger printSwitch = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger missCount = new AtomicInteger(0);
public void sendInfo(Collection<Map<String, Object>> messages) {
List<Map<String, Object>> resultList = Lists.newArrayList();
Iterator<Map<String, Object>> iter = messages.iterator() ;
while(iter.hasNext())
{
Map<String, Object> res = Maps.newHashMap();
res.put("timeStamp", System.currentTimeMillis());
res.put("car", iter.next());
resultList.add(res);
}
try {
webSocketService.sendMessage(JSON.toJSONString(resultList));
successCount.incrementAndGet();
} catch (IOException e) {
log.info(String.format("推送失败: %s, %s ", webSocketService.token, messages), e);
missCount.incrementAndGet();
}
//定期状态输出
int minuteOfHour = DateTime.now().getMinuteOfHour();
if (printSwitch.get() < 1 && minuteOfHour % printMinute == 0) {
log.info(String.format("视频监控list webSocket: %s 成功推送次数: %s, 推送失败次数:%s",
webSocketService.token, successCount.get(), missCount.get()));
printSwitch.incrementAndGet();
} else if (minuteOfHour % printMinute != 0) {
printSwitch.set(0);
}
}
public void setVins(Set<String> vins) {
synchronized (lock) {
this.vins = vins;
}
}
public void sendInfo(String message) {
try {
webSocketService.sendMessage(message);
} catch (IOException e) {
log.error(String.format("推送文本消息失败: %s, %s ", webSocketService.token, message), e);
}
}
}
public static boolean hasWebSocketSession() {
return !webSocketSet.isEmpty();
}
}
package com.ssi.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ssi.model.RedisDataModel;
import com.ssi.utils.ConfigUtils;
import com.ssi.utils.SpringUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.*;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* 测试websocket
*/
/*@ServerEndpoint(value = "/VMSWebsocketTest")
@Component*/
public class WebSocketServiceTest {
private static Logger log = LoggerFactory.getLogger(WebSocketServiceTest.class);
private String preLocationFix = "ivccs:vms:vehicle:preLocation";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) throws Exception {
InputStream inputStream = new FileInputStream("C:\\Users\\lh314\\Documents\\Tencent Files\\863825024\\FileRecv\\1020-23数据记录");
String text = IOUtils.toString(inputStream, "utf8");
JSONObject jsonObject = JSON.parseObject(text);
JSONObject hits = jsonObject.getJSONObject("hits");
JSONArray array = hits.getJSONArray("hits");
long lastCollectTime = 0;
for (int i = 0; i < array.size(); i++) {
JSONObject o = (JSONObject) array.get(i);
JSONObject source = o.getJSONObject("_source");
long collectTime = (long) source.get("collectTime");
if (collectTime == lastCollectTime) {
continue;
}
addLastLocationInfo(source);
ArrayList<Object> list = new ArrayList<>();
HashMap<Object, Object> map = new HashMap<>();
map.put("timeStamp", System.currentTimeMillis());
map.put("car", source);
list.add(map);
Thread.sleep(1000);
System.out.println(JSON.toJSONString(list));
session.getBasicRemote().sendText(JSON.toJSONString(list));
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws Exception {
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(String.format("会话:%s 发生错误.", session.getId()), error);
}
private Map<String, Object> preWithSpeedMap;
private void addLastLocationInfo(Map<String, Object> map) {
RedisDataModel redisDataModel = SpringUtils.getObject(RedisDataModel.class);
String location = redisDataModel.get(String.format("%s:%s", preLocationFix, map.get("vin")));
if (StringUtils.isNotBlank(location)) {
String[] split = location.split(",");
Double lastLongitude = Double.parseDouble(split[0]);
Double lastLatitude = Double.parseDouble(split[1]);
if (lastLatitude != null && lastLongitude != null) {
map.put("preLocation", new double[]{lastLongitude, lastLatitude});
}
}
double speed = Double.parseDouble(map.getOrDefault("speed", "0").toString());
/*Double longitude = ConfigUtils.getAsDoubleWithDefault(map, "longitude", null);
Double latitude = ConfigUtils.getAsDoubleWithDefault(map, "latitude", null);*/
JSONArray location1 = (JSONArray) map.get("location");
BigDecimal longitude = (BigDecimal) location1.get(0);
BigDecimal latitude = (BigDecimal) location1.get(1);
if (speed != 0) {
if (longitude != null && latitude != null) {
redisDataModel.set(String.format("%s:%s", preLocationFix, map.get("vin"))
, String.format("%s,%s", longitude, latitude));
Map<String, Object> preWithSpeedMapTemp = new HashMap<>();
preWithSpeedMapTemp.put("longitude", longitude);
preWithSpeedMapTemp.put("latitude", latitude);
preWithSpeedMap = preWithSpeedMapTemp;
}
} else {
if (preWithSpeedMap != null) {
Double longitude1 = ConfigUtils.getAsDoubleWithDefault(preWithSpeedMap, "longitude", null);
Double latitude1 = ConfigUtils.getAsDoubleWithDefault(preWithSpeedMap, "latitude", null);
if (longitude1 != null && latitude1 != null) {
map.put("location", new double[]{longitude1, latitude1});
}
} else {
Map<String, Object> preWithSpeedMapTemp = new HashMap<>();
preWithSpeedMapTemp.put("longitude", longitude);
preWithSpeedMapTemp.put("latitude", latitude);
preWithSpeedMap = preWithSpeedMapTemp;
}
}
}
}
package com.ssi.websocket.push.task;
import com.alibaba.fastjson.JSONObject;
import com.ssi.entity.VmsCranePadBind;
import com.ssi.entity.VmsVehicle;
import com.ssi.service.VmsCranePadBindService;
import com.ssi.service.VmsVehicleService;
import com.ssi.service.VmsTosOrdersService;
import com.ssi.websocket.CraneAppWebSocketServer;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 吊具APP任务推送任务
* @author ZhangLiYao
* @version 1.0
* @date 2020/8/6 10:00
*/
@Component
@EnableScheduling
public class CraneAppOrdersTask {
@Autowired
private VmsTosOrdersService vmsTosOrdersService;
@Autowired
private VmsVehicleService vmsVehicleService;
@Autowired
private VmsCranePadBindService vmsCranePadBindService;
@Scheduled(fixedRateString = "5000")
private void getOrdersOfTask() {
CopyOnWriteArraySet<CraneAppWebSocketServer> webSocketSet = CraneAppWebSocketServer.getWebSocketSet();
if(webSocketSet.isEmpty()){
return;
}
List<VmsVehicle> vehicleList = vmsVehicleService.list();
Map<String, VmsVehicle> vinMap = vehicleList.stream().collect(Collectors.toMap(VmsVehicle::getVin, Function.identity()));
List<VmsCranePadBind> vmsCranePadBindList = vmsCranePadBindService.list();
Map<String, VmsCranePadBind> padMacMap = vmsCranePadBindList.stream().collect(Collectors.toMap(VmsCranePadBind::getPadMac, Function.identity()));
//群发消息
for (CraneAppWebSocketServer item : webSocketSet) {
try {
if(StringUtils.isBlank(item.getPadMac())){
return;
}
List<Map<String, Object>> resultList = vmsCranePadBindService.getCraneOrderList(vinMap, padMacMap.get(item.getPadMac()));
if(!resultList.isEmpty()){
item.sendMessage(JSONObject.toJSONString(resultList));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.ssi.websocket.push.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.ssi.entity.CraneInfo;
import com.ssi.entity.VmsVehicle;
import com.ssi.mapper.VmsVehicleMapper;
import com.ssi.model.RedisDataModel;
import com.ssi.service.CraneInfoService;
import com.ssi.websocket.CraneInfoWebSocketServer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
/**
* 推送吊具实时信息定时任务
*/
@Component
@EnableScheduling
public class CraneInfoTask {
@Autowired
private CraneInfoService craneInfoService;
@Autowired
private RedisDataModel redisDataModel;
@Autowired
private VmsVehicleMapper vmsVehicleMapper;
@Value("${order.latestOrderKeyPrefix:harbor:command:status}")
private String latestOrderKeyPrefix;
@Value("${harbor.crane.confirmExpireTime}")
private int confirmExpireTime;
@Scheduled(fixedRateString = "1000")
public void getCraneInfo() throws Exception {
CopyOnWriteArraySet<CraneInfoWebSocketServer> webSocketSet = CraneInfoWebSocketServer.getWebSocketSet();
if (CollectionUtils.isNotEmpty(webSocketSet)) {
for (CraneInfoWebSocketServer craneInfoWebSocketServer : webSocketSet) {
String craneNo = craneInfoWebSocketServer.getCraneNo();
CraneInfo craneInfo = null;
if (StringUtils.isNotBlank(craneNo)) {
craneInfo = new CraneInfo();
craneInfo.setCraneNo(craneNo);
}
List<Object> list = craneInfoService.queryCraneListFromRedis(craneInfo);
if (CollectionUtils.isNotEmpty(list)) {
List<Map> res = list.stream().map(o -> {
Map map = (Map) o;
String portCode = (String) map.get("portCode");
Map craneConfirmInfo = craneInfoService.queryCraneConfirmInfo(portCode);
if (craneConfirmInfo != null) {
//portCode对应的vin craneStatus确认状态
long timestamp = (long)craneConfirmInfo.get("collectTime");
if((System.currentTimeMillis() - timestamp) > confirmExpireTime * 1000){
map.put("craneStatus", 0);//空闲
}else{
String vin = (String) craneConfirmInfo.get("vin");
int craneStatus = (int) craneConfirmInfo.get("craneStatus");
//获取redis任务获取taskType
String orderKey = String.format("%s:%s", latestOrderKeyPrefix, vin);
Map<String, Object> order = redisDataModel.getJson2Map(orderKey);
int taskType = (int) order.get("taskType");
//根据vin获取vehicleNum
String key="harbor:command:vin:info";
String keys = String.format("%s:%s", key, vin);
String vehicleMap = redisDataModel.get(keys);
if(vehicleMap != null){//判断redis是否存在,不存在则查询数据库
JSONObject jsonObject=JSONObject.parseObject(vehicleMap);
String vehicleNum = null;
Integer vehicleType = 0;
if (jsonObject.containsKey("vehicleNum")){//判断redis是否存在车辆编号
vehicleNum = jsonObject.get("vehicleNum").toString();
}else{
VmsVehicle vmsVehicle = vmsVehicleMapper.selectOne(new LambdaQueryWrapper<VmsVehicle>()
.eq(VmsVehicle::getVin, vin));
if (vmsVehicle!=null){
vehicleNum = vmsVehicle.getVehicleNum();
vehicleType=vmsVehicle.getVehicleType()==null ? 0 : vmsVehicle.getVehicleType();
}
}
if (jsonObject.containsKey("vehicleType")){
vehicleType =(Integer) jsonObject.get("vehicleType");
}
//设置车辆编号、装卸箱、吊具确认状态,车辆类型
map.put("vehicleNum", vehicleNum);
map.put("taskType", taskType);
map.put("craneStatus", craneStatus);
map.put("vehicleType",vehicleType);
}else{
VmsVehicle vmsVehicle = vmsVehicleMapper.selectOne(new LambdaQueryWrapper<VmsVehicle>()
.eq(VmsVehicle::getVin, vin));
String vehicleNum = "";
Integer vehicleType = 0;
if (vmsVehicle!=null){
vehicleNum = vmsVehicle.getVehicleNum();
vehicleType=vmsVehicle.getVehicleType()==null ? 0 : vmsVehicle.getVehicleType();
}
//设置车辆编号、装卸箱、吊具确认状态,车辆类型
map.put("vehicleNum", vehicleNum);
map.put("taskType", taskType);
map.put("craneStatus", craneStatus);
map.put("vehicleType",vehicleType);
}
}
//未来时间设置成现在
long craneTime = (long)map.get("collectTime");
if(craneTime > System.currentTimeMillis()){
map.put("collectTime",System.currentTimeMillis());
}
return map;
} else {
return null;
}
}).filter(map -> map != null).collect(Collectors.toList());
craneInfoWebSocketServer.sendMessage(JSON.toJSONString(res));
}
}
}
}
}
package com.ssi.websocket.push.task;
import static com.ssi.constant.RedisKey.CRANE_LOCATION;
import com.alibaba.fastjson.JSONObject;
import com.ssi.model.CraneInfoRedis;
import com.ssi.model.RedisDataModel;
import com.ssi.service.CraneInfoService;
import com.ssi.service.VmsShipsDrawingService;
import com.ssi.service.VmsVehicleService;
import com.ssi.websocket.LocationWebSocketService;
import com.ssi.websocket.vo.CraneLocation;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.ssi.service.VmsAlertThresholdService;
/**
* @author 成东
* @since 2022-02-23 11:25
*/
@Component
public class PushPortMachineryLocationTask {
@Autowired
private RedisDataModel redisDataModel;
@Autowired
private CraneInfoService craneInfoService;
@Autowired
private VmsShipsDrawingService vmsShipsDrawingService;
@Autowired
private VmsVehicleService vmsVehicleService;
@Value("${vehicle.latestData.redis.protCode:ivccs:vms:portCode}")
private String portCodeKey;
@Value("${vehicle.latestData.redis.cranePos:ivccs:vms:cranePos}")
private String cranePosKey;
//websocket获取桥吊数据
@Scheduled(initialDelay = 2 * 1000, fixedDelay = 2000)
public void getOrdersOfTask() {
if (LocationWebSocketService.isPush()) {
val locationMap = redisDataModel.mgetForMap(
redisDataModel.keys(CRANE_LOCATION.getKeyPrefix() + "*")
);
long time = System.currentTimeMillis();
Map<String, CraneLocation> map1 = new HashMap<>();
locationMap.forEach((key, value) -> {
if (value != null) {
String no = StringUtils.removeStart(key, CRANE_LOCATION.getKeyPrefix());
val p = new CraneLocation();
p.setPortCode(no);
val sp = ((String) value).split(",");
String regex="^2\\d*$";
if (no.matches(regex)){//判断no是否以2开头
//String portCode="ivccs:vms:portCode";
String keys = String.format("%s:%s",portCodeKey,no);
Map<String, Object> map = redisDataModel.getJson2Map(keys);
if (map==null){//先从redis中获取数据,不存在在请求数据库
map=vmsShipsDrawingService.getPileInfo(no);//查询龙门吊的视图
JSONObject jsonObject=new JSONObject();
jsonObject.put("CY_AREA_NO",String.valueOf(map.get("CY_AREA_NO").toString()));
redisDataModel.set(keys, String.valueOf(jsonObject));
}
p.setLatitude(new BigDecimal(sp[0]));
p.setLongitude(new BigDecimal(sp[1]));
String name=String.valueOf(map.get("CY_AREA_NO").toString());
Map map2=getLocaltion(Double.parseDouble(sp[1]),Double.parseDouble(sp[0]),name);
p.setLatitude(new BigDecimal(map2.get("lat").toString()));
p.setLongitude(new BigDecimal(map2.get("lng").toString()));
p.setPortType(2);
p.setTimestamp(time);
map1.put(no, p);
}else {
Map map2=getLocaltion(Double.parseDouble(sp[1]),Double.parseDouble(sp[0]),"Bridge0001");
p.setLatitude(new BigDecimal(map2.get("lat").toString()));
p.setLongitude(new BigDecimal(map2.get("lng").toString()));
p.setTimestamp(time);
p.setPortType(1);
map1.put(no, p);
}
}
});
LocationWebSocketService.messageBroadcast(map1.values());
}
}
//重新映射场吊GPS位置
private Map getLocaltion(double lng3,double lat3,String name) {
Map map = new HashMap();
// String cranePos="ivccs:vms:cranePos";
String keys = String.format("%s:%s",cranePosKey,name);
Map<String, Object> cranePosMap = redisDataModel.getJson2Map(keys);
if (cranePosMap==null){//先从redis中获取数据,不存在在请求数据库
cranePosMap=vmsVehicleService.getVmsCranePosMapInfo(name);
JSONObject jsonObject=new JSONObject();
jsonObject.put("startLng",cranePosMap.get("startLng").toString());//起始位置经度
jsonObject.put("startLat",cranePosMap.get("startLat").toString());//起始位置纬度
jsonObject.put("endLng",cranePosMap.get("endLng").toString());//终点位置经度
jsonObject.put("endLat",cranePosMap.get("endLat").toString());//终点位置纬度
redisDataModel.set(keys, String.valueOf(jsonObject));
}
double lng1=Double.parseDouble(cranePosMap.get("startLng").toString());//起始位置经度
double lat1=Double.parseDouble(cranePosMap.get("startLat").toString());//起始位置纬度
double lng2=Double.parseDouble(cranePosMap.get("endLng").toString());//终点位置经度
double lat2=Double.parseDouble(cranePosMap.get("endLat").toString());//终点位置纬度
double dLng = lng1 - lng2;//相差距离经度
double dLat = lat1 - lat2;//想差距离纬度
double u = (lng3 - lng1) * dLng + (lat3 - lat1) * dLat;
u /= dLng * dLng + dLat * dLat;
double lng = new BigDecimal(lng1 + u * dLng).setScale(8, RoundingMode.HALF_UP).doubleValue();
map.put("lng",lng);
//foot.setLng(new BigDecimal(p1.getLng() + u * dLng).setScale(8, RoundingMode.HALF_UP).doubleValue());
double lat = new BigDecimal(lat1 + u * dLat).setScale(8, RoundingMode.HALF_UP).doubleValue();
map.put("lat",lat);
double d = Math.pow(lng1 - lng2, 2) + Math.pow(lat1 - lat2, 2);
double d1 = Math.pow(lng1 - lng, 2) + Math.pow(lat1 - lat, 2);
double d2 = Math.pow(lng2 - lng, 2) + Math.pow(lat2 - lat, 2);
if (d1 > d || d2 > d) {
if (d1 > d2) {
map.put("lng",Math.pow(lng2 - lng, 2));
map.put("lat",Math.pow(lat2 - lat, 2));
return map;
} else {
map.put("lng",Math.pow(lng1 - lng, 2));
map.put("lat",Math.pow(lat1 - lat, 2));
return map;
}
}
return map;
}
}
package com.ssi.websocket.push.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.ssi.entity.CraneInfo;
import com.ssi.entity.VmsVehicle;
import com.ssi.mapper.VmsVehicleMapper;
import com.ssi.model.RedisDataModel;
import com.ssi.service.CraneInfoService;
import com.ssi.websocket.CraneInfoWebSocketServer;
import com.ssi.websocket.ReplaySocketServer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
/**
* 推送吊具实时信息定时任务
*/
@Component
@EnableScheduling
public class ReplayTask {
private static Logger log = LoggerFactory.getLogger(ReplayTask.class);
private static Queue<Map<String, Object>> replayCache = new LinkedBlockingQueue<>();
public static Queue<Map<String, Object>> getReplayCache() {
return replayCache;
}
public static void setReplayCache(List<Map<String, Object>> replayCaches) {
replayCache.clear();
replayCache.addAll(replayCaches);
}
@Scheduled(cron = "0/1 * * * * ? ")
public void getCraneInfo() throws Exception {
Queue<Map<String, Object>> replayCache = getReplayCache();
if(replayCache.size()>1){
Map<String, Object> poll = replayCache.poll();
ReplaySocketServer.messageBroadcast(JSON.toJSONString(poll));
}else{
ReplaySocketServer.messageBroadcast(null);
}
}
}
package com.ssi.websocket.push.task;
import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.ssi.entity.VmsTrafficLightInfo;
import com.ssi.mapper.VmsTrafficLightInfoMapper;
import com.ssi.model.RedisDataModel;
import com.ssi.websocket.TrafficLampInfoWebSocketServer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* @author SunnyHotz
* @PackageName:com.ssi.websocket.push.task
* @ClassName:SyncTrafficLampInfo
* @Description:
* @date 2022/9/28 13:21
*/
@Component
public class SyncTrafficLampInfo {
@Autowired
private RedisDataModel redisDataModel;
@Autowired
private VmsTrafficLightInfoMapper vmsTrafficLightInfoMapper;
@Scheduled(cron = "0/1 * * * * ?")
public void syncSchedule(){
List<VmsTrafficLightInfo> vmsTrafficLightInfos = vmsTrafficLightInfoMapper.selectList(new LambdaQueryWrapper<>());
if(CollectionUtils.isEmpty(vmsTrafficLightInfos)){
return;
}
JSONArray objects = new JSONArray();
vmsTrafficLightInfos.stream().forEach(lamp ->{
Map<String, Object> map = redisDataModel.hgetJson2Map("ivccs:vms:traffic:light", lamp.getCrossId());
if(MapUtils.isEmpty(map)){
return;
}
map.put("crossId",lamp.getCrossId());
objects.add(map);
});
TrafficLampInfoWebSocketServer.messageBroadcast(objects.toJSONString());
}
}
package com.ssi.websocket.push.task;
import com.alibaba.fastjson.JSONObject;
import com.ssi.entity.VmsCranePadBind;
import com.ssi.entity.VmsVehicle;
import com.ssi.response.SSIResponse;
import com.ssi.service.VmsCranePadBindService;
import com.ssi.service.VmsTosOrdersService;
import com.ssi.service.VmsVehicleService;
import com.ssi.websocket.WebSocketServiceForVideoList;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 吊具APP任务推送任务
* @author ZhangLiYao
* @version 1.0
* @date 2020/8/6 10:00
*/
@Component
@EnableScheduling
public class VideoStatusListTask {
@Autowired
private VmsTosOrdersService vmsTosOrdersService;
@Autowired
private VmsVehicleService vmsVehicleService;
@Autowired
private VmsCranePadBindService vmsCranePadBindService;
@Scheduled(fixedRateString = "10000")
private void getOrdersOfTask() {
if(!WebSocketServiceForVideoList.hasWebSocketSession()){
return;
}
List<VmsVehicle> vehicleList = vmsVehicleService.getNormalVehicle();
SSIResponse ssiResponse = vmsVehicleService.listForMonitor(vehicleList);
//群发消息
WebSocketServiceForVideoList.messageBroadcast(JSONObject.toJSONString(ssiResponse.getData()));
}
}
package com.ssi.websocket.push.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.ssi.constant.RedisKey;
import com.ssi.entity.VmsAlertThreshold;
import com.ssi.entity.VmsVehicle;
import com.ssi.kafka.model.KafkaRecordModel;
import com.ssi.model.RedisDataModel;
import com.ssi.model.VehicleSocketDataCacheQueueModel;
import com.ssi.service.VmsAlertThresholdService;
import com.ssi.service.VmsVehicleService;
import com.ssi.service.platform.WebSocketDataService;
import com.ssi.utils.SpringUtils;
import com.ssi.utils.VehicleDataUtil;
import com.ssi.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import sun.misc.Unsafe;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 目前策略是读取所有数据
* 后期车辆数多, 考虑从CismWebSocketService中获取所有的vin
*/
@Component
@EnableScheduling
@Slf4j
public class WebSocketScheduledSendTask {
@Autowired
private WebSocketDataService webSocketDataService;
@Autowired
private VmsAlertThresholdService vmsAlertThresholdService;
@Autowired
private VmsVehicleService vmsVehicleService;
@Autowired
private RedisDataModel redisDataModel;
@Autowired
private RedisDataModel singleNodeModel;
@Value("${vehicle.cache.key:ivccs:vms:vehicle:cache}")
private String vehicleCacheKey;
@Autowired(required = false)
private VehicleSocketDataCacheQueueModel vehicleSocketDataCacheQueueModel;
@Autowired(required = false)
private KafkaRecordModel kafkaRecordModel;
@Autowired
private VehicleDataUtil vehicleDataUtil;
@Value("${spring.websocket.pushKafka.enable:false}")
private boolean openPushKafka;
@Value("${spring.websocket.pushInPool.enable:false}")
private boolean pushInPool;
@Value("${harbor.command.info_key:harbor:command:vin:info}")
private String infoKey;
@Value("${vehicle.latestData.redis.vehicleTypeSwitch:ivccs:vms:vehicle_type_switch}")
private String typeSwitch;
private Queue<Map<String, Map<String, Object>>> mapQueue = Lists.newLinkedList();
private boolean first=true;
/**
* 缓存车辆信息
*/
@Scheduled(fixedRateString = "${spring.websocket.scheduled.time:60000}")
private void cacheVehicleInfo() {
List<VmsVehicle> list = vmsVehicleService.list();
list.stream().forEach(vehicle -> {
redisDataModel.hset(vehicleCacheKey,vehicle.getVin(), JSONObject.toJSONString(vehicle));
});
}
/**
* 定时读取所有车辆的最新数据(无人集卡)
*/
@Scheduled(fixedRateString = "${spring.websocket.scheduled.time:2000}")
private void scheduledSendInfoMageNoCrad() {
Map<String, Map<String, Object>> messages;//全部
Map<String, Map<String, Object>> messages1 = Maps.newTreeMap();//无人集卡
if (WebSocketService.hasWebSocketSession() || WebSocketServiceForScreenList.hasWebSocketSession() || openPushKafka) {
if (pushInPool) {
messages = vehicleSocketDataCacheQueueModel.getAllVehicleData();
} else {
messages = webSocketDataService.realTimeDataAll();
}
if (!messages.isEmpty()) {
//推送到kafka
try {
mapQueue.offer(messages);
} catch (Exception e) {
log.error("添加推送队列失败。", e);
}
}
if (WebSocketService.hasWebSocketSession()){
if (!messages.isEmpty()) {
//推送至客户端
messages.forEach((vin, record) -> {
WebSocketService.messageBroadcast(vin, record);
});
}
}
if (WebSocketServiceForScreenList.hasWebSocketSession()){
if (!messages.isEmpty()) {
//大屏列表推送
//log.info(String.format("进入无人集卡判断: %s", JSON.toJSONString(messages)));
Collection<Map<String, Object>> values = null;
for (Map.Entry<String, Map<String, Object>> entry:messages.entrySet()){
//String key="harbor:command:vin:info";
String keys = String.format("%s:%s", infoKey, entry.getKey());
String vehicleMap = redisDataModel.get(keys);
if(vehicleMap != null) {//判断redis是否存在,不存在则查询数据库
JSONObject jsonObject = JSONObject.parseObject(vehicleMap);
Integer vehicleType =(Integer) jsonObject.get("vehicleType");//1:X69B ,2:X69C,3:有人集卡两秒
if (vehicleType != 3){
String key = Objects.isNull(entry.getValue().get("vehicleNum"))?entry.getKey():(String) entry.getValue().get("vehicleNum");
//处理异常信息
processVehcileErrorMsg(entry.getValue());
messages1.put(key,entry.getValue());
}
}
}
values=messages1.values();
WebSocketServiceForScreenList.messageBroadcast(values);
}else{
WebSocketServiceForScreenList.messageBroadcast(Lists.newArrayList());
}
}
//debug app车辆实时状态推送
if (WebSocketServiceForDebugApp.hasWebSocketSession()){
messages = webSocketDataService.getVehicleRunData();
if (!messages.isEmpty()) {
//推送至客户端
messages.forEach((vin, record) -> {
WebSocketServiceForDebugApp.messageBroadcast(vin, record);
});
}
}
}
}
/**
* 定时读取所有车辆的最新数据(有人集卡)
*/
@Scheduled(fixedRateString = "${spring.websocket.scheduled.time:2000}")
private void scheduledSendInfoMageYesCrad() {
Map<String, Map<String, Object>> messages;//全部
Map<String, Map<String, Object>> messages1 = Maps.newHashMap();//用于存放有人集卡的数据
String vmsAlertThresholdInfo=redisDataModel.get(typeSwitch);
//用于判断是否开启给有人集卡推送数据的字段
/* String vehicleTypeSwitch="1";
if (vmsAlertThresholdInfo!=null){//redis缓存存在就用redis缓存否则就查询数据库
JSONObject jsonObject = JSONObject.parseObject(vmsAlertThresholdInfo);
vehicleTypeSwitch = (String) jsonObject.get("paramValue");
}else{
VmsAlertThreshold info=new VmsAlertThreshold();
info.setParamKey("vehicle_type_switch");
VmsAlertThreshold current = vmsAlertThresholdService.getOne(new LambdaQueryWrapper<>(info));
//无人集卡开关0关闭,1开启
vehicleTypeSwitch= current.getParamValue();
redisDataModel.set(typeSwitch,JSONObject.toJSONString(current));
}*/
if ((WebSocketService.hasWebSocketSession() || WebSocketServiceForScreenSomeoneCardList.hasWebSocketSession() || openPushKafka)) {
if (pushInPool) {
messages = vehicleSocketDataCacheQueueModel.getAllVehicleData();
} else {
messages = webSocketDataService.realTimeDataAll();
}
/*if (!messages.isEmpty()) {
//推送到kafka
try {
mapQueue.offer(messages);
} catch (Exception e) {
log.error("添加推送队列失败。", e);
}
}
if (WebSocketService.hasWebSocketSession()){
if (!messages.isEmpty()) {
//推送至客户端
messages.forEach((vin, record) -> {
WebSocketService.messageBroadcast(vin, record);
});
}
}*/
if (WebSocketServiceForScreenSomeoneCardList.hasWebSocketSession()){
if (!messages.isEmpty()) {
//前端列表推送
Collection<Map<String, Object>> values = null;
// if(vehicleTypeSwitch.equals("0")){//无人集卡开关0关闭,1开启
for (Map.Entry<String, Map<String, Object>> entry:messages.entrySet()){
String keys = String.format("%s:%s", infoKey, entry.getKey());
String vehicleMap = singleNodeModel.get(keys);
if(vehicleMap != null) {//判断redis是否存在,不存在则查询数据库
JSONObject jsonObject = JSONObject.parseObject(vehicleMap);
Integer vehicleType =jsonObject.getInteger("vehicleType");//1:X69B ,2:X69C,3:有人集卡两秒
if (vehicleType == 3){
messages1.put(entry.getKey(),entry.getValue());
}
}
}
values=messages1.values();
WebSocketServiceForScreenSomeoneCardList.messageBroadcast(values);
// }else{//
// WebSocketServiceForScreenSomeoneCardList.messageBroadcast(Lists.newArrayList());
// }
}else{
WebSocketServiceForScreenSomeoneCardList.messageBroadcast(Lists.newArrayList());
}
}
//debug app车辆实时状态推送
if (WebSocketServiceForDebugApp.hasWebSocketSession()){
messages = webSocketDataService.getVehicleRunData();
if (!messages.isEmpty()) {
//推送至客户端
messages.forEach((vin, record) -> {
WebSocketServiceForDebugApp.messageBroadcast(vin, record);
});
}
}
}else{
WebSocketServiceForScreenSomeoneCardList.messageBroadcast(Lists.newArrayList());
}
}
/**
* 将队列中的数据推送到kafka
*/
@Scheduled(fixedRateString = "${spring.websocket.scheduled.pushKafka.time:500}")
private void scheduledSendInfoMageToKafka() {
List<Map<String, Map<String, Object>>> res = Lists.newLinkedList();
int n = 0;
Map<String, Map<String, Object>> poll = Maps.newHashMap();
int size = 0;
while (n <= 100 && poll != null) {
poll = mapQueue.poll();
if (poll != null && !poll.isEmpty()) {
size += poll.size();
res.add(poll);
}
}
//批量推kafka, 避免积压
if (!res.isEmpty()) {
long l = System.currentTimeMillis();
kafkaRecordModel.messageBroadcast(res);
log.debug(String.format("推送数据到kafka完成,推送数据记录:%s条, 积压数据批次:%s, 耗时:%s",
size, mapQueue.size(), (System.currentTimeMillis() - l)));
}
}
private void processVehcileErrorMsg(Map<String, Object> record) {
try {
JSONObject jsonObject = new JSONObject();
// Double speed = (Double) record.get("speed");
//V2X异常
String v2xStr = redisDataModel.get(RedisKey.ERROR_V2X_INFO.getKeyPrefix().concat((String) record.get("vin")));
if (StringUtils.isNotBlank(v2xStr)) {
JSONObject v2xObj = JSON.parseObject(v2xStr);
if (System.currentTimeMillis() - v2xObj.getLong("time") < 30000) {
record.put("vinFlag", 1);
jsonObject.put("v2xError", v2xObj.getString("description"));
}
/* if(StringUtils.isNotBlank(v2xObj.getString("description"))){
jsonObject.put("v2xError",v2xObj.getString("description"));
}*/
}
//三级告警
String tosStr = redisDataModel.get(RedisKey.ERROR_TOS_INFO.getKeyPrefix().concat((String) record.get("vin")));
if (StringUtils.isNotBlank(tosStr)) {
JSONObject tosObj = JSON.parseObject(tosStr);
jsonObject.put("v2xError", tosObj.getString("faultName"));
}
//平台异常
StringBuilder sb = new StringBuilder("");
Map<String, Object> vehicleEmergencyRecord = redisDataModel.getJson2Map(RedisKey.EMERGENCY_PARKING.getKeyPrefix().concat((String) record.get("vin")));
if (MapUtils.isNotEmpty(vehicleEmergencyRecord)) {
Integer emergencyStatus = (Integer) vehicleEmergencyRecord.get("emergencyStatus");
if (1 == emergencyStatus) {
record.put("vinFlag", 1);
sb.append("车辆紧急急停 ");
}
}
Object stopExceed = record.get("stopExceed");
if (Objects.nonNull(stopExceed) && 1 == (Integer) stopExceed) {
record.put("vinFlag", 1);
sb.append("车辆停止超时 ");
}
Object dynamicFenceAlert = SpringUtils.getValue(RedisKey.DYNAMIC_FENCE_ALERT.getKeyPrefix(), (String) record.get("vin"));
if (Objects.nonNull(dynamicFenceAlert)) {
record.put("vinFlag", 1);
sb.append("封闭区触发任务暂停 ");
}
Object fenceOrderError = SpringUtils.getValue(RedisKey.DYNAMIC_FENCE_ORDER_ERROR.getKeyPrefix(), (String) record.get("vin"));
if (Objects.nonNull(dynamicFenceAlert)) {
sb.append(fenceOrderError+" ");
}
Object chargingAction = SpringUtils.getValue(RedisKey.VMS_CHARGING_ACTION.getKeyPrefix(), (String) record.get("vin"));
if (Objects.nonNull(chargingAction)&&chargingAction instanceof JSONObject) {
JSONObject actionObj = (JSONObject) chargingAction;
if(actionObj.getInteger("chargingAction")==2 && actionObj.getLong("reciveTime")<(System.currentTimeMillis()-300000)){
record.put("vinFlag", 1);
sb.append("请检查车辆充电接口 ");
}
}
jsonObject.put("platError", sb.toString());
//TOS异常
Map<String, Object> json2Map = redisDataModel.getJson2Map(RedisKey.VMS_TOS_ORDER.getKeyPrefix().concat((String) record.get("vin")));
if (MapUtils.isNotEmpty(json2Map)) {
JSONObject lastedOrder = new JSONObject();
lastedOrder.put("startTime", json2Map.get("collectTime"));
lastedOrder.put("lockLabel", json2Map.get("lockLabel"));
lastedOrder.put("portCode", json2Map.get("portCode"));
lastedOrder.put("containerSize", json2Map.get("containerSize"));
lastedOrder.put("containerPosition", json2Map.get("containerPosition"));
Integer subTaskType = Objects.isNull(json2Map.get("subTaskType")) ? null : (Integer) json2Map.get("subTaskType");
String description = vehicleDataUtil.generateOrderDescription(json2Map);
//判断subTaskType:0去缓冲区1,2 去扭锁站 其他用查询出来的
if (Objects.nonNull(subTaskType) && subTaskType == 0) {
lastedOrder.put("orderDescription", "去缓冲区");
} else if (Objects.nonNull(subTaskType) && (subTaskType == 1 || subTaskType == 2)) {
lastedOrder.put("orderDescription", "去扭锁站");
} else {
lastedOrder.put("orderDescription", description);
}
jsonObject.put("tosError", lastedOrder);
}
record.put("errorMsg", jsonObject);
}catch (Exception e){
log.error("告警信息出现异常",e);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment