Consumer.java 10.5 KB
package com.trash.casefile.kafka;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.trash.casefile.domain.KafkaCompensation;
import com.trash.casefile.domain.ViolationWarningInformation;
import com.trash.casefile.mapper.KafkaCompensationMapper;
import com.trash.casefile.mapper.ViolationWarningInformationMapper;
import com.trash.casefile.service.IViolationWarningInformationService;
import com.trash.common.config.trashConfig;
import com.trash.common.core.redis.RedisCache;
import com.trash.common.utils.DateUtils;
import com.trash.common.utils.RemoteServerUtils;
import com.trash.common.utils.StringUtils;
import com.trash.common.utils.spring.SpringUtils;
import com.trash.framework.web.service.SysLoginService;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

@Component
public class Consumer {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    @Autowired
    private IViolationWarningInformationService violationWarningInformationService;

    @KafkaListener(topics = "record_process_alarm")
    public void consume(@Payload String data) throws InterruptedException {
        log.info("kafka消费数据成功,data:" + data);
        String id = insertKafkaCompensation(data);
        autoViolationWarning(data,id);
    }

    @Autowired
    trashConfig trashConfig;

    @Autowired
    SysLoginService loginService;

    @Autowired
    RedisCache redisCache;

    @Transactional
    public void autoViolationWarning(String data,String id) throws InterruptedException {
        log.info("开始报警,data:" + data);

        String[] code = {"44030020=工地预警-未报开工作业", "44030021=工地预警-视频设备离线超时报警", "44030022=工地预警-三无车辆进入工地", "44030023=工地预警未按时间作业",
                "44030024=消纳场预警-未报开工作业", "44030025=消纳场预警-视频设备离线超时报警", "44030026=消纳场预警-三无车辆进入消纳场", "44030027=消纳场预警-未到指定的消纳场作业",
                "44030028=离线运输报警(工)", "44030029=离线运输报警(消)", "44030030=未激活车辆作业", "44030031=未核准作业车辆作业", "44030032=未按线路行驶",
                "44030033=闯禁行驶", "44030034=失信车辆作业"};
        JSONObject jsonObject = JSONObject.parseObject(data);

        String nowHour = DateFormatUtils.format(new Date(), "HH");
        String nowDate = null;
        String tomorrowDate = null;
        if(Integer.valueOf(nowHour)<8){
            nowDate = DateFormatUtils.format(DateUtils.addDays(new Date(), -1), "yyyy-MM-dd");
            tomorrowDate = DateFormatUtils.format(new Date(), "yyyy-MM-dd");
        }else{
            nowDate = DateFormatUtils.format(new Date(), "yyyy-MM-dd");
            tomorrowDate = DateFormatUtils.format(DateUtils.addDays(new Date(), 1), "yyyy-MM-dd");
        }



        String violationType = jsonObject.getString("alarmName");
        if(RemoteServerUtils.remote==null){
            //登录
            RemoteServerUtils.remote = trashConfig.getRemotePath();
            loginService.loginByRemote(trashConfig.getToken());
        }
        ViolationWarningInformation violationWarningInformation1 = null;
        JSONArray company1 = redisCache.getCacheObject("companyList");
        JSONObject basevehicleInfo = RemoteServerUtils.getBasevehicleInfo(jsonObject.getString("vehicleId"),trashConfig.getToken());
        if(company1==null){
            throw new RuntimeException("获取公司列表失败!!!");
        }

        if(basevehicleInfo==null){
            throw new RuntimeException("获取公司详情失败!vehicleId:"+jsonObject.getString("vehicleId"));
        }
        //这两种类型无需累加
        if(violationType.equals("工地预警-视频设备离线超时报警") || violationType.equals("消纳场预警-视频设备离线超时报警")){
            violationWarningInformation1 = null;
        }else if(violationType.equals("闯禁行驶")||violationType.equals("失信车辆作业")){//这两种类型根据公司累加
            String company = jsonObject.getString("enterpriseName");
            violationWarningInformation1 = SpringUtils.getBean(ViolationWarningInformationMapper.class).selectViolationWarningInformation(company,null, violationType, nowDate, tomorrowDate);
        }else{//其他都是根据工地/消纳站累加
            String project = jsonObject.getString("siteName");
            violationWarningInformation1 = SpringUtils.getBean(ViolationWarningInformationMapper.class).selectViolationWarningInformation(null,project, violationType, nowDate, tomorrowDate);
        }
        //判断该类型,该工地,该时间段是否已经存在报警信息,如果存在则不再新增,如果不存在则新增
        if (violationWarningInformation1 == null) {
            ViolationWarningInformation violationWarningInformation = new ViolationWarningInformation();
            String siteType = jsonObject.getString("siteType");
            if ("1".equals(siteType)) {//工地
                violationWarningInformation.setViolationObjectType("0");
            } else if ("2".equals(siteType)) {//消纳场
                violationWarningInformation.setViolationObjectType("1");
            }
            //所属区域
            violationWarningInformation.setOwningRegion(jsonObject.getString("areaName"));
            //案卷编码
            String number = DateFormatUtils.format(new Date(), "yyyyMMddHHmmssSSS");
            violationWarningInformation.setNumber(number.substring(2));
            //公司名称
            violationWarningInformation.setCompanyName(jsonObject.getString("enterpriseName"));
            //违规类型
            violationWarningInformation.setViolationType(jsonObject.getString("alarmName"));
            //发送人
            violationWarningInformation.setCreateBy("长沙市建筑垃圾智慧监管平台");
            //推送对象
            violationWarningInformation.setViolationGrade("一般类");
            //项目名称(工地名称,消纳场名称)
            violationWarningInformation.setProjectName(jsonObject.getString("siteName"));
            violationWarningInformation.setSendObject("区管理部门(治)");
            switch (violationWarningInformation.getViolationType()) {
                case "工地预警-未报开工作业":
                case "消纳场预警-未到指定的消纳场作业":
                case "消纳场预警-三无车辆进入消纳场":
                case "消纳场预警-未报开工作业":
                case "工地预警-未按时间作业":
                case "工地预警-三无车辆进入工地":
                case "工地预警-视频设备离线超时报警":
                case "离线运输报警(消)":
                case "离线运输报警(工)":
                case "未激活车辆作业":
                case "未核准作业车辆作业":
                    violationWarningInformation.setSendObject("区管理部门(治)");
                    break;
                case "消纳场预警-视频设备离线超时报警":
                    violationWarningInformation.setSendObject("区管理部门(消)");
                    break;
                case "未按线路行驶":
                    violationWarningInformation.setViolationGrade("重点类");
                    violationWarningInformation.setSendObject("区管理部门(治)");
                    break;
                case "闯禁行驶":
                case "失信车辆作业":
                    violationWarningInformation.setSendObject("运输企业");
                    break;
            }

            String describe = DateFormatUtils.format(new Date(), "yyyy/MM/dd HH:mm:ss") + " "
                    + violationWarningInformation.getCompanyName() + " " + jsonObject.get("licenseplateNo") + "在" +
                    jsonObject.get("siteName") + "出现" + violationWarningInformation.getViolationType();
            //设置公司简称
            for(Object o:company1){
                JSONObject jo = (JSONObject) o;
                if(basevehicleInfo.getString("companyID").equals(jo.getString("id"))){
                    violationWarningInformation.setAbbreviation(jo.getString("abbreviation"));
                }
            }
            violationWarningInformation.setDescribe(describe);
            // 业务逻辑
            try {
                violationWarningInformationService.insertViolationWarningInformation(null, violationWarningInformation);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            String describe = violationWarningInformation1.getDescribe() + ";\n" + DateFormatUtils.format(new Date(), "yyyy/MM/dd HH:mm:ss") + " "
                    + jsonObject.getString("enterpriseName") + " " + jsonObject.get("licenseplateNo") + "在" +
                    jsonObject.get("siteName") + "出现" + jsonObject.getString("alarmName");
            violationWarningInformation1.setDescribe(describe);
            SpringUtils.getBean(ViolationWarningInformationMapper.class).updateViolationWarningInformation(violationWarningInformation1);
        }
        //kafka消费成功
        KafkaCompensation kafkaCompensation = new KafkaCompensation();
        kafkaCompensation.setId(Long.valueOf(id));
        kafkaCompensation.setStatus(1);
        SpringUtils.getBean(KafkaCompensationMapper.class).updateKafkaCompensation(kafkaCompensation);
        log.info("报警结束,data:" + data);
    }

    @Transactional
    public String insertKafkaCompensation(String data){
        log.info("添加kafka补偿信息,data:" + data);
        KafkaCompensation kafkaCompensation = new KafkaCompensation();
        kafkaCompensation.setData(data);
        kafkaCompensation.setCreateTime(DateUtils.getNowDate());
        kafkaCompensation.setStatus(0);
        SpringUtils.getBean(KafkaCompensationMapper.class).insertKafkaCompensation(kafkaCompensation);
        return kafkaCompensation.getId().toString();
    }
}