Consumer.java
10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package com.trash.casefile.kafka;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.trash.casefile.domain.KafkaCompensation;
import com.trash.casefile.domain.ViolationWarningInformation;
import com.trash.casefile.mapper.KafkaCompensationMapper;
import com.trash.casefile.mapper.ViolationWarningInformationMapper;
import com.trash.casefile.service.IViolationWarningInformationService;
import com.trash.common.config.trashConfig;
import com.trash.common.core.redis.RedisCache;
import com.trash.common.utils.DateUtils;
import com.trash.common.utils.RemoteServerUtils;
import com.trash.common.utils.StringUtils;
import com.trash.common.utils.spring.SpringUtils;
import com.trash.framework.web.service.SysLoginService;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
@Autowired
private IViolationWarningInformationService violationWarningInformationService;
@KafkaListener(topics = "record_process_alarm")
public void consume(@Payload String data) throws InterruptedException {
log.info("kafka消费数据成功,data:" + data);
String id = insertKafkaCompensation(data);
autoViolationWarning(data,id);
}
@Autowired
trashConfig trashConfig;
@Autowired
SysLoginService loginService;
@Autowired
RedisCache redisCache;
@Transactional
public void autoViolationWarning(String data,String id) throws InterruptedException {
log.info("开始报警,data:" + data);
String[] code = {"44030020=工地预警-未报开工作业", "44030021=工地预警-视频设备离线超时报警", "44030022=工地预警-三无车辆进入工地", "44030023=工地预警未按时间作业",
"44030024=消纳场预警-未报开工作业", "44030025=消纳场预警-视频设备离线超时报警", "44030026=消纳场预警-三无车辆进入消纳场", "44030027=消纳场预警-未到指定的消纳场作业",
"44030028=离线运输报警(工)", "44030029=离线运输报警(消)", "44030030=未激活车辆作业", "44030031=未核准作业车辆作业", "44030032=未按线路行驶",
"44030033=闯禁行驶", "44030034=失信车辆作业"};
JSONObject jsonObject = JSONObject.parseObject(data);
String nowHour = DateFormatUtils.format(new Date(), "HH");
String nowDate = null;
String tomorrowDate = null;
if(Integer.valueOf(nowHour)<8){
nowDate = DateFormatUtils.format(DateUtils.addDays(new Date(), -1), "yyyy-MM-dd");
tomorrowDate = DateFormatUtils.format(new Date(), "yyyy-MM-dd");
}else{
nowDate = DateFormatUtils.format(new Date(), "yyyy-MM-dd");
tomorrowDate = DateFormatUtils.format(DateUtils.addDays(new Date(), 1), "yyyy-MM-dd");
}
String violationType = jsonObject.getString("alarmName");
if(RemoteServerUtils.remote==null){
//登录
RemoteServerUtils.remote = trashConfig.getRemotePath();
loginService.loginByRemote(trashConfig.getToken());
}
ViolationWarningInformation violationWarningInformation1 = null;
JSONArray company1 = redisCache.getCacheObject("companyList");
JSONObject basevehicleInfo = RemoteServerUtils.getBasevehicleInfo(jsonObject.getString("vehicleId"),trashConfig.getToken());
if(company1==null){
throw new RuntimeException("获取公司列表失败!!!");
}
if(basevehicleInfo==null){
throw new RuntimeException("获取公司详情失败!vehicleId:"+jsonObject.getString("vehicleId"));
}
//这两种类型无需累加
if(violationType.equals("工地预警-视频设备离线超时报警") || violationType.equals("消纳场预警-视频设备离线超时报警")){
violationWarningInformation1 = null;
}else if(violationType.equals("闯禁行驶")||violationType.equals("失信车辆作业")){//这两种类型根据公司累加
String company = jsonObject.getString("enterpriseName");
violationWarningInformation1 = SpringUtils.getBean(ViolationWarningInformationMapper.class).selectViolationWarningInformation(company,null, violationType, nowDate, tomorrowDate);
}else{//其他都是根据工地/消纳站累加
String project = jsonObject.getString("siteName");
violationWarningInformation1 = SpringUtils.getBean(ViolationWarningInformationMapper.class).selectViolationWarningInformation(null,project, violationType, nowDate, tomorrowDate);
}
//判断该类型,该工地,该时间段是否已经存在报警信息,如果存在则不再新增,如果不存在则新增
if (violationWarningInformation1 == null) {
ViolationWarningInformation violationWarningInformation = new ViolationWarningInformation();
String siteType = jsonObject.getString("siteType");
if ("1".equals(siteType)) {//工地
violationWarningInformation.setViolationObjectType("0");
} else if ("2".equals(siteType)) {//消纳场
violationWarningInformation.setViolationObjectType("1");
}
//所属区域
violationWarningInformation.setOwningRegion(jsonObject.getString("areaName"));
//案卷编码
String number = DateFormatUtils.format(new Date(), "yyyyMMddHHmmssSSS");
violationWarningInformation.setNumber(number.substring(2));
//公司名称
violationWarningInformation.setCompanyName(jsonObject.getString("enterpriseName"));
//违规类型
violationWarningInformation.setViolationType(jsonObject.getString("alarmName"));
//发送人
violationWarningInformation.setCreateBy("长沙市建筑垃圾智慧监管平台");
//推送对象
violationWarningInformation.setViolationGrade("一般类");
//项目名称(工地名称,消纳场名称)
violationWarningInformation.setProjectName(jsonObject.getString("siteName"));
violationWarningInformation.setSendObject("区管理部门(治)");
switch (violationWarningInformation.getViolationType()) {
case "工地预警-未报开工作业":
case "消纳场预警-未到指定的消纳场作业":
case "消纳场预警-三无车辆进入消纳场":
case "消纳场预警-未报开工作业":
case "工地预警-未按时间作业":
case "工地预警-三无车辆进入工地":
case "工地预警-视频设备离线超时报警":
case "离线运输报警(消)":
case "离线运输报警(工)":
case "未激活车辆作业":
case "未核准作业车辆作业":
violationWarningInformation.setSendObject("区管理部门(治)");
break;
case "消纳场预警-视频设备离线超时报警":
violationWarningInformation.setSendObject("区管理部门(消)");
break;
case "未按线路行驶":
violationWarningInformation.setViolationGrade("重点类");
violationWarningInformation.setSendObject("区管理部门(治)");
break;
case "闯禁行驶":
case "失信车辆作业":
violationWarningInformation.setSendObject("运输企业");
break;
}
String describe = DateFormatUtils.format(new Date(), "yyyy/MM/dd HH:mm:ss") + " "
+ violationWarningInformation.getCompanyName() + " " + jsonObject.get("licenseplateNo") + "在" +
jsonObject.get("siteName") + "出现" + violationWarningInformation.getViolationType();
//设置公司简称
for(Object o:company1){
JSONObject jo = (JSONObject) o;
if(basevehicleInfo.getString("companyID").equals(jo.getString("id"))){
violationWarningInformation.setAbbreviation(jo.getString("abbreviation"));
}
}
violationWarningInformation.setDescribe(describe);
// 业务逻辑
try {
violationWarningInformationService.insertViolationWarningInformation(null, violationWarningInformation);
} catch (IOException e) {
e.printStackTrace();
}
} else {
String describe = violationWarningInformation1.getDescribe() + ";\n" + DateFormatUtils.format(new Date(), "yyyy/MM/dd HH:mm:ss") + " "
+ jsonObject.getString("enterpriseName") + " " + jsonObject.get("licenseplateNo") + "在" +
jsonObject.get("siteName") + "出现" + jsonObject.getString("alarmName");
violationWarningInformation1.setDescribe(describe);
SpringUtils.getBean(ViolationWarningInformationMapper.class).updateViolationWarningInformation(violationWarningInformation1);
}
//kafka消费成功
KafkaCompensation kafkaCompensation = new KafkaCompensation();
kafkaCompensation.setId(Long.valueOf(id));
kafkaCompensation.setStatus(1);
SpringUtils.getBean(KafkaCompensationMapper.class).updateKafkaCompensation(kafkaCompensation);
log.info("报警结束,data:" + data);
}
@Transactional
public String insertKafkaCompensation(String data){
log.info("添加kafka补偿信息,data:" + data);
KafkaCompensation kafkaCompensation = new KafkaCompensation();
kafkaCompensation.setData(data);
kafkaCompensation.setCreateTime(DateUtils.getNowDate());
kafkaCompensation.setStatus(0);
SpringUtils.getBean(KafkaCompensationMapper.class).insertKafkaCompensation(kafkaCompensation);
return kafkaCompensation.getId().toString();
}
}