Commit f9ee74ad893be45b6cea5b73b419b45160ea2add

Authored by youxiw2000
2 parents 2c9117a6 41b38d8e

Merge branch 'dev' of http://61.169.120.202:8888/youxiw20000/trash into dev

trash-workFlow/src/main/java/com/trash/casefile/kafka/Consumer.java
@@ -42,6 +42,15 @@ public class Consumer { @@ -42,6 +42,15 @@ public class Consumer {
42 42
43 @KafkaListener(topics = "record_process_alarm") 43 @KafkaListener(topics = "record_process_alarm")
44 public void consume(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException, IOException { 44 public void consume(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException, IOException {
  45 + if(record==null){
  46 + throw new RuntimeException("kafka record对象为空");
  47 + }
  48 + if(record.value()==null){
  49 + throw new RuntimeException("kafka消费数据为空");
  50 + }
  51 + if(ack==null){
  52 + throw new RuntimeException("kafka ack对象为空");
  53 + }
45 log.info("kafka消费数据成功,offset:"+ record.offset() +",data:" + record.value()); 54 log.info("kafka消费数据成功,offset:"+ record.offset() +",data:" + record.value());
46 String id = insertKafkaCompensation(record.value().toString()); 55 String id = insertKafkaCompensation(record.value().toString());
47 autoViolationWarning(record.value().toString(),id); 56 autoViolationWarning(record.value().toString(),id);