Commit 3850fbd309fd4f34c5416b36256cb738f331f5ab

Authored by 王通
1 parent fab5a228

1.加入kafka传电子路单至数据中台

src/main/java/com/bsth/server_rs/schedule/real/ScheduleRealService.java
... ... @@ -69,6 +69,8 @@ public class ScheduleRealService implements InitializingBean {
69 69  
70 70 private volatile long timestamp;
71 71  
  72 + private final static int KAFKA_BATCH_SIZE = 100;
  73 +
72 74 static {
73 75 secretKey = ConfigUtil.get("http.control.secret.key");
74 76 url = ConfigUtil.get("http.control.service_data_url");// + "/execSchList?secretKey=" + secretKey;
... ... @@ -302,22 +304,35 @@ public class ScheduleRealService implements InitializingBean {
302 304 List<ScheduleRealInfo> scheduleRealInfos = scheduleRealInfoRepository.findByDates(dates, new Date(timestamp));
303 305 timestamp = dateTime.getMillis();
304 306  
305   - List<ScheduleRealInfoVo> scheduleRealInfoVos = new ArrayList<>();
  307 + Queue<ScheduleRealInfoVo> queue = new ConcurrentLinkedDeque<>();
306 308 for (ScheduleRealInfo sch : scheduleRealInfos) {
307 309 ScheduleRealInfoVo scheduleRealInfoVo = new ScheduleRealInfoVo();
308 310 BeanUtils.copyProperties(sch, scheduleRealInfoVo);
309 311 Car car = CarBufferData.findOne(scheduleRealInfoVo.getClZbh());
310 312 scheduleRealInfoVo.setCarPlate(car == null ? "" : car.getCarPlate());
311   - scheduleRealInfoVos.add(scheduleRealInfoVo);
  313 + queue.add(scheduleRealInfoVo);
  314 + }
  315 +
  316 + int size = scheduleRealInfos.size();
  317 + for (int i = 0, len = size % KAFKA_BATCH_SIZE == 0 ? size / KAFKA_BATCH_SIZE : size / KAFKA_BATCH_SIZE + 1;i < len;i++) {
  318 + List<ScheduleRealInfoVo> scheduleRealInfoVos = new ArrayList<>();
  319 + for (int j = 0; j < KAFKA_BATCH_SIZE;j++) {
  320 + ScheduleRealInfoVo scheduleRealInfoVo = queue.poll();
  321 + if (scheduleRealInfoVo == null) {
  322 + break;
  323 + }
  324 + scheduleRealInfoVos.add(scheduleRealInfoVo);
  325 + }
  326 +
  327 + Map<String, Object> data = new HashMap<>();
  328 + data.put("datatype", "waybill");
  329 + data.put("datas", scheduleRealInfoVos);
  330 +
  331 + ObjectMapper mapper = new ObjectMapper();
  332 + String json = mapper.writeValueAsString(data);
  333 + logger.info(json);
  334 + kafkaTemplate.send("PDGJ_JQDD", json);
312 335 }
313   - Map<String, Object> data = new HashMap<>();
314   - data.put("datatype", "waybill");
315   - data.put("datas", scheduleRealInfoVos);
316   -
317   - ObjectMapper mapper = new ObjectMapper();
318   - String json = mapper.writeValueAsString(data);
319   - logger.info(json);
320   - kafkaTemplate.send("PDGJ_JQDD", json);
321 336 } catch (Exception e) {
322 337 logger.error("kafka发电子路单调度异常", e);
323 338 }
... ...