package com.ssi.mqtt; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.ssi.constant.VehicleConstant; import com.ssi.entity.dto.VmsVehicleDto; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @Slf4j public class MQTTPublishClient { //定义MQTT的ID,可以在MQTT服务配置中指定 private String clientid; private String mqttCAPath; private String mqttName; private String mqttPassword; private String mqttUrl; private MqttClient client; @Setter @Getter private MqttTopic mqttTopic; /** * 构造函数 */ public MQTTPublishClient(String host, String serverId, String mqttCAPath, String mqttName, String mqttPassword, String mqttUrl) { log.info("MQTTPublishClient instance"); this.clientid = serverId; this.mqttCAPath = mqttCAPath; this.mqttName = mqttName; this.mqttPassword = mqttPassword; this.mqttUrl = mqttUrl; // MemoryPersistence设置clientid的保存形式,默认为以内存保存 try { if (client == null) { client = new MqttClient(host, clientid, new MemoryPersistence()); } } catch (MqttException e) { e.printStackTrace(); } connect(); } /** * 用来连接服务器 */ private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); // 设置超时时间 options.setConnectionTimeout(20); // 设置会话心跳时间 options.setKeepAliveInterval(10); // options.setAutomaticReconnect(true);//设置自动重连 try { options.setSocketFactory(SslUtil.getSocketFactory(mqttCAPath, "/UserProfile/ca/client.crt", "/UserProfile/ca/client.key", "123456")); client.setCallback(new PublishCallback()); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } public void mqttRequest(VmsVehicleDto vehicle, String cntType) { try { //发送mqtt请求 MqttMessage message = new MqttMessage(); JSONObject msgObj = new JSONObject(); msgObj.put("devType", "web"); msgObj.put("cntType", cntType); JSONObject dataObj = new JSONObject(); dataObj.put("name", mqttName); dataObj.put("password", mqttPassword); dataObj.put("sn", vehicle.getVinSSI()); JSONArray cameraArray = new JSONArray(); String vehicleNum = vehicle.getVehicleNum().substring(vehicle.getVehicleNum().length() - 1); if (VehicleConstant.START_PUSHER.equals(cntType)) { for (int i = 0; i < 5; i++) { JSONObject cameraObj = new JSONObject(); cameraObj.put("url", mqttUrl + vehicleNum + "_" + i); cameraObj.put("status", "disable"); if (i == 2) { cameraObj.put("status", "enable"); } cameraArray.add(cameraObj); } dataObj.put("camera", cameraArray); msgObj.put("startPusherData", dataObj); } else { for (int i = 0; i < 5; i++) { JSONObject cameraObj = new JSONObject(); cameraObj.put("url", ""); cameraObj.put("status", "disable"); cameraArray.add(cameraObj); } dataObj.put("camera", cameraArray); msgObj.put("stopPusherData", dataObj); } message.setPayload(msgObj.toJSONString().getBytes("UTF-8")); message.setQos(0); message.setRetained(false); this.publish("ssiweb", message); } catch (Exception e) { log.error(e.getMessage(), e); } } //发送消息并获取回执 void publish(String topic, MqttMessage message) throws MqttPersistenceException, MqttException, InterruptedException { log.info("publish topic: " + topic); mqttTopic = client.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(message); token.waitForCompletion(); log.info("message is published completely! " + token.isComplete()); log.info("messageId:" + token.getMessageId()); token.getResponse(); if (client.isConnected()) { client.disconnect(10000); } log.info("Disconnected: delivery token \"" + token.hashCode() + "\" received: " + token.isComplete()); } }