package com.ssi.model; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.EvictingQueue; import com.google.common.collect.Maps; import com.ssi.constant.RedisKey; import com.ssi.constant.enums.VmsTosOrderStatusEnum; import com.ssi.entity.dto.Point; import com.ssi.kafka.listener.VehicleRealTimeInfoListener; import com.ssi.service.platform.WebSocketDataService; import com.ssi.utils.*; import com.ssi.utils.grid.RealTimeVehicleSchedule; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; import reactor.util.function.Tuple2; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.DecimalFormat; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * Description: * * @author LiXiaoCong * @version 2019/11/11 11:26 */ @Slf4j @Component @ConditionalOnProperty(name = "spring.websocket.pushInPool.enable", havingValue = "true") public class VehicleSocketDataCacheQueueModel { private static final Object LOCK = new Object(); @Value("${spring.websocket.scheduled.cache.max:20}") private int maxCacheSize; @Value("${spring.websocket.scheduled.push.threshold:1}") private int pushThreshold; @Value("${spring.websocket.point.degree.threshold:90.0}") private double degreeThreshold; @Value("${spring.websocket.scheduled.time:1000}") private int pushRate; @Value("${vehicle.latestData.redis.preLocationFix:ivccs:vms:vehicle:preLocation}") private String preLocationFix; @Value("${order.latestOrderKeyPrefix:harbor:command:status}") private String latestOrderKeyPrefix; @Value("${ivccs.vehicle.latestData.redis.prefix:ivccs:vms:vehicle:latest}") private String latestRedisKeyPrefix; @Value("${vehicle.latestData.redis.postfix:harbor_D00A}") private String realPostfix; @Autowired private RedisDataModel redisDataModel; @Autowired private VehicleBaseInfoModel vehicleBaseInfoModel; @Autowired private VehicleDataUtil vehicleDataUtil; @Autowired private WebSocketDataService webSocketDataService; private final ExecutorService executorService = new ThreadPoolExecutor(1,1,0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10),new ThreadPoolExecutor.DiscardOldestPolicy()); private final Map cachePoolMaps = Maps.newConcurrentMap(); private final Map gpsSpeedMap = Maps.newConcurrentMap(); @Autowired private VehicleRealTimeInfoListener vehicleRealTimeInfoListener; private AtomicInteger versionNo = new AtomicInteger(0); /** * 设置车辆的最新数据 */ @Scheduled(fixedRateString = "${spring.websocket.scheduled.pool.time:500}") private void scheduledSendInfoMage() { Map> messages = webSocketDataService.realTimeDataAll(); if (messages != null && messages.size() > 0) { vehicleRealTimeInfoListener.clearOutVersionVehicle(versionNo.get()); messages.forEach((vin, record) -> { record.put("versionNo",versionNo.getAndIncrement()> getAllVehicleData() { Map> messages = Maps.newHashMap(); this.cachePoolMaps.forEach((vin, pool) -> { Map data = pool.getData(); if (data != null) { messages.put(vin, data); } }); return messages; } private VehicleDataCachePool getCachePool(String vin) { VehicleDataCachePool vehicleDataCachePool = this.cachePoolMaps.get(vin); if (vehicleDataCachePool == null) { synchronized (LOCK) { if (vehicleDataCachePool == null) { vehicleDataCachePool = new VehicleDataCachePool(vin); this.cachePoolMaps.put(vin, vehicleDataCachePool); } } } return vehicleDataCachePool; } private class VehicleDataCachePool { private final String vin; private final EvictingQueue> evictingQueue; private int dataFrequency; private AtomicBoolean canPush = new AtomicBoolean(false); private Map preMap; //速度不为0缓存位置信息 private Map preWithSpeedMap; private VehicleDataCachePool(String vin) { this.vin = vin; /* Temporarily not find infomation of vehicle. */ // Map vehicleBaseInfo = vehicleBaseInfoModel.getVehicleBaseInfo(vin); // // Object dataFrequency = vehicleBaseInfo.get("dataFrequency"); // if(dataFrequency != null){ // this.dataFrequency = Integer.parseInt(dataFrequency.toString()); // }else{ this.dataFrequency = 1000; // } if (this.dataFrequency < pushRate) { this.dataFrequency = pushRate; } this.evictingQueue = EvictingQueue.create(maxCacheSize); } private void pushData(Map map) { boolean valid = isValid(map); //航向角计算 //addLastLocationInfo(map, valid); calculateNextLocationByDirection(map); if("ON".equals(redisDataModel.get(RedisKey.DYNAMIC_FENCE_SWITCH.getKeyPrefix()))){ //处理封闭区冲突 vehicleRealTimeInfoListener.processVehicleCloseArea(map); } //处理异常信息 // processVehcileErrorMsg(map); this.evictingQueue.add(map); if (!this.canPush.get()) { this.canPush.set(size() >= pushThreshold); } } private void processVehcileErrorMsg(Map record) { JSONObject jsonObject = new JSONObject(); Double speed = (Double) record.get("speed"); //V2X异常 String v2xStr = redisDataModel.get(RedisKey.ERROR_V2X_INFO.getKeyPrefix().concat(vin)); if(StringUtils.isNotBlank(v2xStr)){ JSONObject v2xObj = JSON.parseObject(v2xStr); if(System.currentTimeMillis() - v2xObj.getLong("time")<30000){ record.put("vinFlag",1); jsonObject.put("v2xError",v2xObj.getString("description")); } /* if(StringUtils.isNotBlank(v2xObj.getString("description"))){ jsonObject.put("v2xError",v2xObj.getString("description")); }*/ } //三级告警 String tosStr = redisDataModel.get(RedisKey.ERROR_TOS_INFO.getKeyPrefix().concat(vin)); if(StringUtils.isNotBlank(tosStr)){ JSONObject tosObj = JSON.parseObject(tosStr); jsonObject.put("v2xError",tosObj.getString("faultName")); } //平台异常 StringBuilder sb = new StringBuilder(""); Map vehicleEmergencyRecord = redisDataModel.getJson2Map(RedisKey.EMERGENCY_PARKING.getKeyPrefix().concat(vin)); if(MapUtils.isNotEmpty(vehicleEmergencyRecord)){ Integer emergencyStatus = (Integer)vehicleEmergencyRecord.get("emergencyStatus"); if(1==emergencyStatus){ record.put("vinFlag",1); sb.append("车辆紧急急停 "); } } Object stopExceed = record.get("stopExceed"); if(Objects.nonNull(stopExceed)&&1 == (Integer) stopExceed){ record.put("vinFlag",1); sb.append("车辆停止超时 "); } Object dynamicFenceAlert = SpringUtils.getValue("DYNAMIC_FENCE_ALERT", vin); if(Objects.nonNull(dynamicFenceAlert)){ record.put("vinFlag",1); sb.append("封闭区触发任务暂停 "); } jsonObject.put("platError",sb.toString()); //TOS异常 Map json2Map = redisDataModel.getJson2Map(RedisKey.VMS_TOS_ORDER.getKeyPrefix().concat(vin)); if(MapUtils.isNotEmpty(json2Map)){ JSONObject lastedOrder = new JSONObject(); lastedOrder.put("startTime",json2Map.get("collectTime")); lastedOrder.put("lockLabel",json2Map.get("lockLabel")); lastedOrder.put("portCode",json2Map.get("portCode")); lastedOrder.put("containerSize",json2Map.get("containerSize")); lastedOrder.put("containerPosition",json2Map.get("containerPosition")); Integer subTaskType = Objects.isNull(json2Map.get("subTaskType")) ? null : (Integer) json2Map.get("subTaskType"); String description = vehicleDataUtil.generateOrderDescription(json2Map); //判断subTaskType:0去缓冲区1,2 去扭锁站 其他用查询出来的 if (Objects.nonNull(subTaskType)&&subTaskType == 0){ lastedOrder.put("orderDescription","去缓冲区"); }else if(Objects.nonNull(subTaskType)&&(subTaskType ==1 || subTaskType==2)){ lastedOrder.put("orderDescription","去扭锁站"); }else{ lastedOrder.put("orderDescription",description); } jsonObject.put("tosError",lastedOrder); } record.put("errorMsg",jsonObject); } private synchronized void calculateNextLocationByDirection(Map map) { //获取航向角 if(Objects.isNull(map.get("direction"))){ return; } try{ double direction = Double.parseDouble(String.valueOf(map.get("direction")))/100; Double speed = Double.parseDouble(String.valueOf(map.get("speed"))); double[] location = (double[]) map.get("location"); int decLabel = 0; if(speed<=0){ fixSpeedByGps(map); Double gpsSpeed = Double.parseDouble(String.valueOf(map.get("speed"))); if(gpsSpeed<=0){ direction = 168.96; decLabel = 1; if(FieldUtil.isPortraitField(new Point(location[0],location[1]))){ direction = 77.43; decLabel = 2; } } JSONObject old = gpsSpeedMap.get(vin); if(Objects.nonNull(old)){ location = (double[])old.get("realLocation"); map.put("location",old.get("realLocation")); } }else { gpsSpeedMap.remove(vin); } //计算航向坐标 String[] strings = GeoPosTransformUtil.calLocationByDistanceAndLocationAndDirection(direction, location[0], location[1], 3); map.put("nextLocation",strings); map.put("decLabel",decLabel); }catch (Exception e){ log.info("=====>{}",e.getMessage()); return; } } private void fixSpeedByGps(Map map) { try { String vin = String.valueOf(map.get("vin")); double[] location = (double[]) map.get("location"); long collectTime = System.currentTimeMillis(); //(long) map.get("collectTime"); JSONObject old = gpsSpeedMap.get(vin); if (Objects.isNull(old)) { JSONObject newObj = new JSONObject(); newObj.put("realLocation",location); newObj.put("location",location); newObj.put("collectTime",collectTime); newObj.put("lastUpdateTime",collectTime); gpsSpeedMap.put(vin, newObj); return; } Long oldCollectTime = old.getLong("collectTime"); Long lastUpdateTime = old.getLong("lastUpdateTime"); double[] oldLocation = (double[])old.get("location"); BigDecimal bigDecimal= null; if(collectTime-oldCollectTime<600&&(location[0]!=oldLocation[0]||location[1]!=oldLocation[1])){ long deltTime = collectTime - oldCollectTime; double distance = GpsUtil.getDistance(oldLocation[0], oldLocation[1], location[0], location[1]); if(distance>0.0003&&deltTime>0){ bigDecimal = new BigDecimal(distance / (deltTime /1000.0/3600.0)); bigDecimal = bigDecimal.setScale(2, RoundingMode.HALF_UP); log.info("vin:{}====>{},{},speed:{}",vin,distance,deltTime/1000.0/3600.0,bigDecimal.doubleValue()); map.put("speed",bigDecimal.doubleValue()); old.put("lastUpdateTime",collectTime); old.put("realLocation",location); } } if(Objects.isNull(bigDecimal)&&(collectTime-lastUpdateTime)>5*60*1000){//速度为0计算是否超时 Object o = StringUtils.isNotBlank(String.valueOf(map.get("orderData")))? map.put("stopExceed", 1) : null; } old.put("location",location); old.put("collectTime",collectTime); }catch (Exception e){ log.error("fixSpeedByGps occur error:",e); } } private Map getData() { Map poll = null; if (this.canPush.get()) { try { if (!this.evictingQueue.isEmpty()) { poll = this.evictingQueue.poll(); if (poll != null) { poll.put("poolSize", size()); checkNetError(poll); } } else { this.canPush.set(false); } } catch (Exception e) { log.error(String.format("获取%s数据失败。", vin), e); } } return poll; } private int size() { return this.evictingQueue.size(); } private void addLastLocationInfo(Map map, boolean valid) { //上一个点的坐标及前一坐标 String location = redisDataModel.get(String.format("%s:%s", preLocationFix, map.get("vin"))); Map status = redisDataModel.getJson2Map(String.format("%s:%s", latestOrderKeyPrefix, map.get("vin"))); Map realMap = redisDataModel.getJson2Map(String.format("%s:%s-%s", latestRedisKeyPrefix, map.get("vin"), realPostfix)); map.put("status",status==null?"-": VmsTosOrderStatusEnum.find(Integer.valueOf(status.get("status")+""))==null?"":VmsTosOrderStatusEnum.find(Integer.valueOf(status.get("status")+"")).getDescription()); map.put("direction",realMap==null?"-":realMap.get("direction")); // 上一个点的坐标 Double lastLongitude = null; Double lastLatitude = null; if (StringUtils.isNotBlank(location)) { JSONObject locationMap = JSON.parseObject(location); lastLongitude = locationMap.getDouble("longitude"); lastLatitude = locationMap.getDouble("latitude"); } //当前点坐标 double speed = Double.parseDouble(map.getOrDefault("speed", "0").toString()); Double longitude = ConfigUtils.getAsDoubleWithDefault(map, "longitude", null); Double latitude = ConfigUtils.getAsDoubleWithDefault(map, "latitude", null); if (speed != 0 && valid) { if (longitude != null && latitude != null) { Map preWithSpeedMapTemp = new HashMap<>(); preWithSpeedMapTemp.put("longitude", longitude); preWithSpeedMapTemp.put("latitude", latitude); preWithSpeedMapTemp.put("lastLongitude", lastLongitude); preWithSpeedMapTemp.put("lastLatitude", lastLatitude); redisDataModel.set(String.format("%s:%s", preLocationFix, map.get("vin")), JSON.toJSONString(preWithSpeedMapTemp)); preWithSpeedMap = preWithSpeedMapTemp; map.put("preLocation", new double[]{lastLongitude, lastLatitude}); } } else { if (preWithSpeedMap != null) { Double longitude1 = ConfigUtils.getAsDoubleWithDefault(preWithSpeedMap, "longitude", null); Double latitude1 = ConfigUtils.getAsDoubleWithDefault(preWithSpeedMap, "latitude", null); Double longitude2 = ConfigUtils.getAsDoubleWithDefault(preWithSpeedMap, "lastLongitude", null); Double latitude2 = ConfigUtils.getAsDoubleWithDefault(preWithSpeedMap, "lastLatitude", null); if (longitude1 != null && latitude1 != null) { map.put("location", new double[]{longitude1, latitude1}); } if (longitude1 != null && latitude1 != null) { map.put("preLocation",getEndPointByTrigonometric(Double.valueOf(realMap.get("direction")+""),new double[]{longitude1, latitude1},0.0001)); } } } } private boolean isValid(Map map) { boolean valid = true; try { if (preMap != null) { //检查位置和速度 String prelocation = String.format("%s,%s", preMap.get("latitude"), preMap.get("longitude")); String location = String.format("%s,%s", map.get("latitude"), map.get("longitude")); double speed = Double.parseDouble(map.getOrDefault("speed", "0").toString()); valid = !(prelocation.equals(location) && speed != 0); if (!valid) { // log.warn(String.format("车辆%s数据无效:\n (prelocation = %s, location = %s, speed = %s)", // vin, prelocation, location, speed)); long currentTime = (long) map.get("collectTime"); long preTime = (long) preMap.get("collectTime"); //统计无效数据 preMap.put("validCount", (long) preMap.getOrDefault("validCount", 0L) + (currentTime - preTime)); preMap.put("speed", speed); } } if (valid) { preMap = map; } } catch (Exception e) { log.error("数据检查异常。", e); valid = false; } return valid; } private void checkNetError(Map current) { Map next = this.evictingQueue.peek(); try { long validCount = 0; if (preMap != null) { validCount = (long) preMap.getOrDefault("validCount", 0L); } if (next != null) { //检查是否存在掉包行为 long currentTime = (long) current.get("collectTime"); long nextTime = (long) next.get("collectTime"); //网络延迟 if ((nextTime - currentTime) >= this.dataFrequency * 1.2 + validCount) { current.put("netError", true); } } else if (validCount <= 0) { //避免一直传无效数据的情况 double speed = Double.parseDouble(current.getOrDefault("speed", "0").toString()); if (speed > 0) { current.put("netError", true); } } } catch (Exception e) { log.error(String.format("检查%s掉包行为失败...", vin), e); } } } private static double[] getEndPointByTrigonometric(double angle, double[] startPoint, double distance) { double[] endPoint = new double[2]; //角度转弧度 double radian = (angle * Math.PI) / 180; //计算新坐标 r 就是两者的距离 endPoint[0] = startPoint[0] + distance * Math.cos(radian); endPoint[1] = startPoint[1] + distance * Math.sin(radian); return endPoint; } private Runnable asyncProcessRoutingConflict(Map> messages){ return ()->vehicleRealTimeInfoListener.processRoutingConflict(messages); } }