Commit b2eded42 authored by kang.nie@inzymeits.com's avatar kang.nie@inzymeits.com
Browse files

初始化代码

parent 12156d65
Pipeline #3109 failed with stages
in 0 seconds
package com.cusc.nirvana.user.rnr.notice.dao;
import com.cusc.nirvana.user.rnr.notice.dao.entity.RnrNoticeConfigPO;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* 实名通知配置(同一个主体和状态支持多个通知方)(RnrNoticeConfig)表数据库访问层
*
* @author yuy336
* @since 2022-03-31 15:04:49
*/
public interface RnrNoticeConfigDao extends BaseMapper<RnrNoticeConfigPO> {
}
package com.cusc.nirvana.user.rnr.notice.dao;
import com.cusc.nirvana.user.rnr.notice.dao.entity.RnrNoticeContentPO;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* 实名通知内容(RnrNoticeContent)表数据库访问层
*
* @author yuy336
* @since 2022-03-31 15:05:11
*/
public interface RnrNoticeContentDao extends BaseMapper<RnrNoticeContentPO> {
}
package com.cusc.nirvana.user.rnr.notice.dao;
import com.cusc.nirvana.user.rnr.notice.dao.entity.RnrNoticeContentLogPO;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* 实名通知内容日志(RnrNoticeContentLog)表数据库访问层
*
* @author yuy336
* @since 2022-03-31 15:05:28
*/
public interface RnrNoticeContentLogDao extends BaseMapper<RnrNoticeContentLogPO> {
}
package com.cusc.nirvana.user.rnr.notice.dao.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.cusc.nirvana.user.rnr.common.po.BaseRnrPO;
import lombok.Data;
/**
* 实名通知配置(同一个主体和状态支持多个通知方)(RnrNoticeConfig)实体类
*
* @author yuy336
* @since 2022-03-31 15:04:50
*/
@TableName("rnr_notice_config")
@Data
public class RnrNoticeConfigPO extends BaseRnrPO {
private static final long serialVersionUID = 876380460886385145L;
/**
* 业务主键
*/
@TableField("uuid")
private String uuid;
/**
* 租户编号
*/
@TableField("tenant_no")
private String tenantNo;
/**
* 通知业务主体,统一运管为项目编号
*/
@TableField("notice_subject")
private String noticeSubject;
/**
* 业务类型:1 实名, 2 解绑, 3 换件, 4 信息变更,5 重绑
*/
@TableField("rnr_bizz_type")
private String rnrBizzType;
/**
* URL 类型
*/
@TableField("url_type")
private Integer urlType;
/**
* URL
*/
@TableField("url")
private String url;
/**
* 用户名
*/
@TableField("user_name")
private String userName;
/**
* 密码
*/
@TableField("password")
private String password;
/**
* 第三方服务特殊key,某些系统除了用户名和密码还需要一个key
*/
@TableField("license_key")
private String licenseKey;
/**
* 是否签名
*/
@TableField("is_sign")
private Integer isSign;
/**
* 三方系统授权的appid
*/
@TableField("app_id")
private String appId;
/**
* 三方系统授权的appkey
*/
@TableField("app_key")
private String appKey;
/**
* 备注
*/
@TableField("remark")
private String remark;
/**
* 最后一次操作人
*/
@TableField("operator")
private String operator;
}
package com.cusc.nirvana.user.rnr.notice.dao.entity;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.*;
import java.util.Date;
/**
* 实名通知内容日志(RnrNoticeContentLog)实体类
*
* @author yuy336
* @since 2022-03-31 15:05:28
*/
@TableName("rnr_notice_content_log")
@Data
public class RnrNoticeContentLogPO implements Serializable {
private static final long serialVersionUID = -44360923018559636L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 创建时间
*/
@TableField(value = "create_time", insertStrategy = FieldStrategy.NEVER, updateStrategy = FieldStrategy.NEVER)
private Date createTime;
/**
* 创建人
*/
@TableField(value = "creator", updateStrategy = FieldStrategy.NEVER)
private String creator;
/**
* 业务主键
*/
@TableField("uuid")
private String uuid;
/**
* 通知业务主体
*/
@TableField("notice_content_id")
private String noticeContentId;
/**
* 业务类型:1 实名, 2 解绑, 3 换件, 4 信息变更,5 重绑
*/
@TableField("rnr_bizz_type")
private String rnrBizzType;
/**
* 链路id
*/
@TableField("trace_id")
private String traceId;
/**
* 请求时间
*/
@TableField("request_time")
private Date requestTime;
/**
* 响应时间
*/
@TableField("response_time")
private Date responseTime;
/**
* 响应描述
*/
@TableField("response_msg")
private String responseMsg;
/**
* 是否成功
*/
@TableField("is_success")
private Integer isSuccess;
}
package com.cusc.nirvana.user.rnr.notice.dao.entity;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.cusc.nirvana.user.rnr.common.po.BaseRnrPO;
import lombok.Data;
import java.io.*;
import java.util.Date;
/**
* 实名通知内容(RnrNoticeContent)实体类
*
* @author yuy336
* @since 2022-03-31 15:05:11
*/
@TableName("rnr_notice_content")
@Data
public class RnrNoticeContentPO implements Serializable {
private static final long serialVersionUID = -31304861657579746L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 创建时间
*/
@TableField(value = "create_time", insertStrategy = FieldStrategy.NEVER, updateStrategy = FieldStrategy.NEVER)
private Date createTime;
/**
* 更新时间
*/
@TableField(value = "update_time", insertStrategy = FieldStrategy.NEVER, updateStrategy = FieldStrategy.NEVER)
private Date updateTime;
/**
* 创建人
*/
@TableField(value = "creator", updateStrategy = FieldStrategy.NEVER)
private String creator;
/**
* 业务主键
*/
@TableField("uuid")
private String uuid;
/**
* 通知配置id
*/
@TableField("notice_config_id")
private String noticeConfigId;
/**
* 通知业务主体
*/
@TableField("notice_subject")
private String noticeSubject;
/**
* 业务类型:1 实名, 2 解绑, 3 换件, 4 信息变更,5 重绑
*/
@TableField("rnr_bizz_type")
private String rnrBizzType;
/**
* 通知内容
*/
@TableField("notice_content")
private String noticeContent;
/**
* 通知状态
*/
@TableField("notice_status")
private Integer noticeStatus;
/**
* 第几次通知
*/
@TableField("count")
private Integer count;
/**
* 最后一次操作人
*/
@TableField("operator")
private String operator;
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.cusc.nirvana.user.rnr.notice.dao.RnrNoticeConfigDao">
<resultMap type="com.cusc.nirvana.user.rnr.notice.dao.entity.RnrNoticeConfigPO" id="rnrNoticeConfigMap">
<id property="id" column="id" jdbcType="BIGINT" />
<result property="uuid" column="uuid" jdbcType="VARCHAR"/>
<result property="tenantNo" column="tenant_no" jdbcType="VARCHAR"/>
<result property="noticeSubject" column="notice_subject" jdbcType="VARCHAR"/>
<result property="rnrBizzType" column="rnr_bizz_type" jdbcType="INTEGER"/>
<result property="url" column="url" jdbcType="VARCHAR"/>
<result property="userName" column="user_name" jdbcType="VARCHAR"/>
<result property="password" column="password" jdbcType="VARCHAR"/>
<result property="licenseKey" column="license_key" jdbcType="VARCHAR"/>
<result property="isSign" column="is_sign" jdbcType="INTEGER"/>
<result property="appId" column="app_id" jdbcType="VARCHAR"/>
<result property="appKey" column="app_key" jdbcType="VARCHAR"/>
<result property="remark" column="remark" jdbcType="VARCHAR"/>
<result property="isDelete" column="is_delete" jdbcType="INTEGER" />
<result property="creator" column="creator" jdbcType="VARCHAR" />
<result property="createTime" column="create_time" />
<result property="operator" column="operator" jdbcType="VARCHAR"/>
<result property="updateTime" column="update_time" />
</resultMap>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.cusc.nirvana.user.rnr.notice.dao.RnrNoticeContentLogDao">
<resultMap type="com.cusc.nirvana.user.rnr.notice.dao.entity.RnrNoticeContentLogPO" id="rnrNoticeContentLogMap">
<id property="id" column="id" jdbcType="BIGINT" />
<result property="uuid" column="uuid" jdbcType="VARCHAR"/>
<result property="noticeContentId" column="notice_content_id" jdbcType="VARCHAR"/>
<result property="traceId" column="trace_id" jdbcType="VARCHAR"/>
<result property="requestTime" column="request_time" jdbcType="TIMESTAMP"/>
<result property="responseTime" column="response_time" jdbcType="TIMESTAMP"/>
<result property="responseMsg" column="response_msg" jdbcType="VARCHAR"/>
<result property="isSuccess" column="is_success" jdbcType="INTEGER"/>
<result property="creator" column="creator" jdbcType="VARCHAR" />
<result property="createTime" column="create_time" />
</resultMap>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.cusc.nirvana.user.rnr.notice.dao.RnrNoticeContentDao">
<resultMap type="com.cusc.nirvana.user.rnr.notice.dao.entity.RnrNoticeContentPO" id="rnrNoticeContentMap">
<id property="id" column="id" jdbcType="BIGINT" />
<result property="uuid" column="uuid" jdbcType="VARCHAR"/>
<result property="noticeConfigId" column="notice_config_id" jdbcType="VARCHAR"/>
<result property="noticeSubject" column="notice_subject" jdbcType="VARCHAR"/>
<result property="noticeContent" column="notice_content" jdbcType="VARCHAR"/>
<result property="noticeStatus" column="notice_status" jdbcType="INTEGER"/>
<result property="count" column="count" jdbcType="INTEGER"/>
<result property="creator" column="creator" jdbcType="VARCHAR" />
<result property="createTime" column="create_time" />
<result property="operator" column="operator" jdbcType="VARCHAR"/>
<result property="updateTime" column="update_time" />
</resultMap>
</mapper>
package com.cusc.nirvana.user.rnr.notice.dto;
import com.cusc.nirvana.user.rnr.mg.common.BaseRnrMgPageDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* 实名通知配置(同一个主体和状态支持多个通知方)(RnrNoticeConfig)传输对象类
*
* @author yuy336
* @since 2022-03-31 15:04:53
*/
@ApiModel(value = "实名通知配置(同一个主体和状态支持多个通知方)", description = "实名通知配置(同一个主体和状态支持多个通知方)")
@Data
public class RnrNoticeConfigDTO extends BaseRnrMgPageDTO {
private static final long serialVersionUID = 812014839600935383L;
@ApiModelProperty(value = "通知业务主体,统一运管为项目编号")
private String noticeSubject;
@ApiModelProperty(value = "租户编码")
private String tenantNo;
@ApiModelProperty(value = "业务类型:1 实名, 2 解绑, 3 换件, 4 信息变更,5 重绑")
private Integer rnrBizzType;
@ApiModelProperty(value = "URL类型")
private Integer urlType;
@ApiModelProperty(value = "URL")
private String url;
@ApiModelProperty(value = "用户名")
private String userName;
@ApiModelProperty(value = "密码")
private String password;
@ApiModelProperty(value = "第三方服务特殊key,某些系统除了用户名和密码还需要一个key")
private String licenseKey;
@ApiModelProperty(value = "是否签名")
private Integer isSign;
@ApiModelProperty(value = "三方系统授权的appid")
private String appId;
@ApiModelProperty(value = "三方系统授权的appkey")
private String appKey;
@ApiModelProperty(value = "备注")
private String remark;
@ApiModelProperty(value = "更新时间")
private Date updateTime;
@ApiModelProperty(value = "最后一次操作人")
private String operator;
}
package com.cusc.nirvana.user.rnr.notice.dto;
import com.cusc.nirvana.user.rnr.mg.common.BaseRnrMgPageDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* 实名通知内容(RnrNoticeContent)传输对象类
*
* @author yuy336
* @since 2022-03-31 15:05:11
*/
@ApiModel(value = "实名通知内容", description = "实名通知内容")
@Data
public class RnrNoticeContentDTO extends BaseRnrMgPageDTO {
private static final long serialVersionUID = -57855867333314594L;
@ApiModelProperty(value = "通知配置id")
private String noticeConfigId;
@ApiModelProperty(value = "通知业务主体")
private String noticeSubject;
@ApiModelProperty(value = "业务类型:1 实名, 2 解绑, 3 换件, 4 信息变更,5 重绑")
private Integer rnrBizzType;
@ApiModelProperty(value = "通知内容")
private String noticeContent;
@ApiModelProperty(value = "通知状态")
private Integer noticeStatus;
@ApiModelProperty(value = "第几次通知")
private Integer count;
@ApiModelProperty(value = "更新时间")
private Date updateTime;
@ApiModelProperty(value = "最后一次操作人")
private String operator;
@ApiModelProperty(value = "数据库主键")
private Long id;
@ApiModelProperty(value = "业务主键")
private String uuid;
@ApiModelProperty(value = "创建时间")
private Date createTime;
@ApiModelProperty(value = "创建人")
private String creator;
}
package com.cusc.nirvana.user.rnr.notice.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* 实名通知内容日志(RnrNoticeContentLog)传输对象类
*
* @author yuy336
* @since 2022-03-31 15:05:28
*/
@ApiModel(value = "实名通知内容日志", description = "实名通知内容日志")
@Data
public class RnrNoticeContentLogDTO extends RnrNoticeDTO {
private static final long serialVersionUID = -20975877793614453L;
@ApiModelProperty(value = "通知业务主体")
private String noticeContentId;
@ApiModelProperty(value = "业务类型:1 实名, 2 解绑, 3 换件, 4 信息变更,5 重绑")
private Integer rnrBizzType;
@ApiModelProperty(value = "链路id")
private String traceId;
@ApiModelProperty(value = "请求时间")
private Date requestTime;
@ApiModelProperty(value = "响应时间")
private Date responseTime;
@ApiModelProperty(value = "响应描述")
private String responseMsg;
@ApiModelProperty(value = "是否成功")
private Integer isSuccess;
}
package com.cusc.nirvana.user.rnr.notice.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.*;
import java.util.Date;
/**
* <p>
* notice的PO基类
* </p>
*
* @author yuyi
* @since 2021-10-21
*/
@Data
public class RnrNoticeDTO implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "数据库主键")
private Long id;
@ApiModelProperty(value = "业务主键")
private String uuid;
@ApiModelProperty(value = "逻辑删除(0-未删除,1-已删除)")
private Integer isDelete;
@ApiModelProperty(value = "创建时间")
private Date createTime;
@ApiModelProperty(value = "创建人")
private String creator;
}
package com.cusc.nirvana.user.rnr.notice.dto;
import lombok.Data;
/**
* 实名通知请求DTO
*
* @author yuy336
* @since 2022-03-31 15:04:53
*/
@Data
public class RnrNoticeReqDTO extends ThirdPartyNoticeReqDTO {
private static final long serialVersionUID = 812014839600935383L;
/**
* 通知主体
*/
private String noticeSubject;
/**
* 通知类型
*/
private String noticeType;
}
package com.cusc.nirvana.user.rnr.notice.dto;
import lombok.Data;
import java.io.*;
/**
* 实名三方通知请求DTO
*
* @author yuy336
* @since 2022-03-31 15:04:53
*/
@Data
public class ThirdPartyNoticeReqDTO implements Serializable {
private static final long serialVersionUID = 812014839600935383L;
/**
* 车辆vin
*/
private String vin;
/**
* Iccid
*/
private String iccid;
/**
* 实名状态 0:实名成功; 1:实名不成功;
*/
private String rnrStatus;
/**
* 实名描述,当实名状态不为0时,描述失败原因
*/
private String rnrComment;
}
package com.cusc.nirvana.user.rnr.notice.kafka;
import com.cache.CacheFactory;
import com.cache.exception.CacheException;
import com.cache.service.ISortSetService;
import com.cusc.nirvana.user.rnr.mg.constants.RnrMgMqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
@Configuration
@EnableScheduling
@Slf4j
public class ConsumeDelaySchedule {
@Resource
private CacheFactory cacheFactory;
@Autowired
private KafkaProduct kafkaProduct;
@Autowired
private ThreadPoolTaskScheduler taskScheduler;
private ScheduledFuture<?> future;
@PostConstruct
public void StartCron() {
try {
//kafka定时任务执行
String basetime = "*/3 * * * * * ";
if (future == null) {
future = taskScheduler.schedule(new runnable(), new CronTrigger(basetime));
}
} catch (Exception e) {
log.error("延迟定时任务启动失败:{}", e.getMessage());
}
}
// 模仿消费者 每3秒执行一次
//duled(cron = "*/3 * * * * * ")
public void consume() {
// 取出redis里的消息,小于当前时间戳的进行消费
try {
ISortSetService sortSetService = cacheFactory.getSortSetService();
Map<String, Double> sortSetWithScore =
sortSetService.getSortSetWithScore("CT:USER:" + RnrMgMqConstant.RNR_CARD_OPERATION_DELAY_TOPIC, 1);
sortSetWithScore.forEach((key, value) -> {
if (value <= System.currentTimeMillis()) {
try {
//消息发送成功后删除redis里的member
boolean result = kafkaProduct.sendMessage(RnrMgMqConstant.RNR_CARD_OPERATION_DELAY_TOPIC, key);
if (result) {
sortSetService.delMembers("CT:USER:" + RnrMgMqConstant.RNR_CARD_OPERATION_DELAY_TOPIC, key);
}
} catch (CacheException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
log.error("消费redis消息异常:{}", e.getMessage());
}
}
private class runnable implements Runnable {
@Override
public void run() {
try {
consume();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.cusc.nirvana.user.rnr.notice.kafka;
import com.cusc.nirvana.user.util.CuscStringUtils;
import lombok.Data;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Data
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String server;
@Value("${spring.kafka.producer.retries}")
private String producerRetries;
@Value("${spring.kafka.producer.sasl-jaas-config:}")
private String kafkaProducerSASLJaasConfig;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.sasl-jaas-config:}")
private String kafkaConsumerSASLJaasConfig;
@Value("${spring.kafka.inner.security-protocol:}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka.inner.sasl-mechanism:}")
private String kafkaSASLMechanism;
/**
* 生产者配置
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
props.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
if (!CuscStringUtils.isEmpty(kafkaSecurityProtocol) && !CuscStringUtils.isEmpty(kafkaSASLMechanism)
&& !CuscStringUtils.isEmpty(kafkaProducerSASLJaasConfig)) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
props.put("sasl.jaas.config", kafkaProducerSASLJaasConfig);
}
return props;
}
/**
* 消费者配置
* @return
*/
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//设置自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
//消费者偏移自动提交给Kafka的频率(以毫秒为单位)、默认值为5000
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
//kafka偏移量设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
if (!CuscStringUtils.isEmpty(kafkaSecurityProtocol) && !CuscStringUtils.isEmpty(kafkaSASLMechanism)
&& !CuscStringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
props.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
}
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.cusc.nirvana.user.rnr.notice.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.bind.annotation.RequestParam;
@Configuration
@Slf4j
public class KafkaProduct {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息到kafka对应的topic
*
* @param topic
* @param message
*/
public boolean sendMessage(@RequestParam("topic") String topic
, @RequestParam("message") String message) {
try {
ListenableFuture<SendResult<String, String>> send =
kafkaTemplate.send(topic, message);
send.addCallback(success -> log.info("KafkaMessageProducer 发送消息成功!topic:{}, message:{}", topic, message),
fail -> log.error("KafkaMessageProducer 发送消息失败topic:{}, message:{}", topic, message));
/*send.addCallback(new SuccessCallback() {
@Override
public void onSuccess(@Nullable Object obj) {
SendResult<String, String> sendResult = (SendResult) obj;
//...
log.info("消息发送成功:{}", sendResult);
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable throwable) {
log.info("消息发送失败,case:{}", throwable);
//失败处理
}
});*/
return true;
} catch (Exception e) {
return false;
}
}
}
package com.cusc.nirvana.user.rnr.notice.kafka;
import com.alibaba.fastjson.JSON;
import com.cusc.nirvana.user.rnr.mg.constants.*;
import com.cusc.nirvana.user.rnr.mg.dto.RnrOrderDTO;
import com.cusc.nirvana.user.rnr.mg.service.IMgRnrCardInfoService;
import com.cusc.nirvana.user.rnr.mg.service.IMgRnrInfoService;
import com.cusc.nirvana.user.rnr.mg.service.IRnrOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.Optional;
/**
* @author fyp
* @since 2022-06-10
*/
@Slf4j
@Service
public class KfKRnrBindExchangeConsumer {
@Autowired
IRnrOrderService orderService;
@Autowired
IMgRnrCardInfoService cardInfoService;
@Resource
private IMgRnrInfoService rnrInfoService;
@KafkaListener(id = RnrMgMqConstant.RNR_CARD_OPERATION_DELAY_GROUP, topics = {RnrMgMqConstant.RNR_CARD_OPERATION_DELAY_TOPIC})
public void onMessage(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
String message = "";
if (kafkaMessage.isPresent()) {
message = (String) kafkaMessage.get();
}
log.info("接收到延迟消息:{}", JSON.toJSONString(message));
//查询工单信息
RnrOrderDTO rnrOrderDTO = new RnrOrderDTO();
rnrOrderDTO.setUuid(message);
rnrOrderDTO = orderService.getByUuid(rnrOrderDTO);
if (null == rnrOrderDTO) {
log.info("接收到延迟消息 未查询到order,uuid = {}",rnrOrderDTO.getUuid());
return;
}
if (RnrOrderType.UNBIND.getCode().equals(rnrOrderDTO.getOrderType())
|| RnrOrderType.TBOX_CHANGE.getCode().equals(rnrOrderDTO.getOrderType())
|| RnrOrderType.ONE_CAR_MORE_CARD_BIND.getCode().equals(rnrOrderDTO.getOrderType())
|| RnrOrderType.ENTERPRISEUNBIND.getCode().equals(rnrOrderDTO.getOrderType())|| RnrOrderType.SEC_UNBIND.getCode().equals(rnrOrderDTO.getOrderType())){
// 原车主解绑 、 换件 、 一车多卡 、 企业解绑 发送短信后order_status是1或者2(待审核) 如果此时收到延迟消息时 还是此状态 作废工单和卡
if (!RnrOrderStatusEnum.notFinished(rnrOrderDTO.getOrderStatus())) {
log.info("接收到延迟消息,不处理,工单类型:{} 工单状态:{} 工单uuid:{}",rnrOrderDTO.getOrderType(),rnrOrderDTO.getOrderStatus(),rnrOrderDTO.getUuid());
return;
}
//还是待审核状态
//工单设置成未通过
orderService.updateOrderStatus(Collections.singletonList(rnrOrderDTO.getUuid()), RnrOrderStatusEnum.CANCEL.getCode());
//卡设置成失败状态
cardInfoService.updateCardStatusByOrderId(rnrOrderDTO.getUuid(), NoticeStatusEnum.NONEED.getCode(), RnrStatus.RNR_FAIL.getCode());
//原车主解绑 、 换件 、 一车多卡 挂在原先的rnr_id 下面 不需要处理超时的rnr_info 企业解绑是新的rnr_info 需要处理超时的情况
if (RnrOrderType.ENTERPRISEUNBIND.getCode().equals(rnrOrderDTO.getOrderType())) {
//rnr_info 设置成为失败
rnrInfoService.updateRnrStatus(rnrOrderDTO.getRnrId(),RnrStatus.RNR_FAIL);
}
} else if(RnrOrderType.NEW_VEHICLE.getCode().equals(rnrOrderDTO.getOrderType())
|| RnrOrderType.SEC_VEHICLE.getCode().equals(rnrOrderDTO.getOrderType())
|| RnrOrderType.COMPANY_NEW_VEHICLE.getCode().equals(rnrOrderDTO.getOrderType())) {
//新车自然人实名 二手车实名 企业新车实名 提交资料后 order_status 是0(初始化状态) 如果此时收到延迟消息时 还是此状态 作废工单和卡和实名信息
if (!RnrOrderStatusEnum.COMMIT.getCode().equals(rnrOrderDTO.getOrderStatus())) {
log.info("接收到延迟消息,不处理,工单类型:{} 工单状态:{} 工单uuid:{}",rnrOrderDTO.getOrderType(),rnrOrderDTO.getOrderStatus(),rnrOrderDTO.getUuid());
return;
}
//还是初始化状态 需要作废这笔实名信息
//工单设置成未通过
orderService.updateOrderStatus(Collections.singletonList(rnrOrderDTO.getUuid()), RnrOrderStatusEnum.CANCEL.getCode());
//卡设置成失败状态
cardInfoService.updateCardStatusByOrderId(rnrOrderDTO.getUuid(), NoticeStatusEnum.NONEED.getCode(), RnrStatus.RNR_FAIL.getCode());
//rnr_info 设置成为失败
rnrInfoService.updateRnrStatus(rnrOrderDTO.getRnrId(),RnrStatus.RNR_FAIL);
}
}
}
package com.cusc.nirvana.user.rnr.notice.kafka;
import com.alibaba.fastjson.JSON;
import com.cusc.nirvana.user.rnr.mg.constants.RnrMgMqConstant;
import com.cusc.nirvana.user.rnr.mg.dto.MgCardNoticeDTO;
import com.cusc.nirvana.user.rnr.notice.service.NoticeHandler;
import com.cusc.nirvana.user.rnr.notice.service.NoticeHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.Optional;
/**
* 绑定、解绑 队列消费
*
* @author huzl
*/
@Slf4j
@Service
public class KfKRnrNoticeConsumer {
@Autowired
NoticeHandlerContext handlerContext;
@KafkaListener(id = RnrMgMqConstant.RNR_CARD_OPERATION_GROUP, topics = {RnrMgMqConstant.RNR_CARD_OPERATION_TOPIC})
public void onMessage(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
MgCardNoticeDTO bean = null;
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
bean = JSON.parseObject(message.toString(), MgCardNoticeDTO.class);
}
log.info("RnrNoticeConsumer onMessage: {}", JSON.toJSONString(bean));
if (Objects.isNull(bean)) {
return;
}
for (NoticeHandler noticeHandler : handlerContext.getHandlerList()) {
noticeHandler.handleNotice(bean);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment