Commit 41b38d8e5c5cec860650df3c853cdf380dfb8258
1 parent
28f7c958
kafka异常处理
Showing
1 changed file
with
9 additions
and
0 deletions
trash-workFlow/src/main/java/com/trash/casefile/kafka/Consumer.java
| @@ -42,6 +42,15 @@ public class Consumer { | @@ -42,6 +42,15 @@ public class Consumer { | ||
| 42 | 42 | ||
| 43 | @KafkaListener(topics = "record_process_alarm") | 43 | @KafkaListener(topics = "record_process_alarm") |
| 44 | public void consume(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException, IOException { | 44 | public void consume(ConsumerRecord<?, ?> record, Acknowledgment ack) throws InterruptedException, IOException { |
| 45 | + if(record==null){ | ||
| 46 | + throw new RuntimeException("kafka record对象为空"); | ||
| 47 | + } | ||
| 48 | + if(record.value()==null){ | ||
| 49 | + throw new RuntimeException("kafka消费数据为空"); | ||
| 50 | + } | ||
| 51 | + if(ack==null){ | ||
| 52 | + throw new RuntimeException("kafka ack对象为空"); | ||
| 53 | + } | ||
| 45 | log.info("kafka消费数据成功,offset:"+ record.offset() +",data:" + record.value()); | 54 | log.info("kafka消费数据成功,offset:"+ record.offset() +",data:" + record.value()); |
| 46 | String id = insertKafkaCompensation(record.value().toString()); | 55 | String id = insertKafkaCompensation(record.value().toString()); |
| 47 | autoViolationWarning(record.value().toString(),id); | 56 | autoViolationWarning(record.value().toString(),id); |