Commit 78137fe045e5434310b5cd5340741fbcd8e571fa

Authored by yiming
1 parent aefacb21

修改的定时器时间

src/main/java/com/example/demo/model/TJRL.java
@@ -7,7 +7,7 @@ import java.lang.reflect.Field; @@ -7,7 +7,7 @@ import java.lang.reflect.Field;
7 import java.lang.reflect.Method; 7 import java.lang.reflect.Method;
8 8
9 public class TJRL implements Cloneable { 9 public class TJRL implements Cloneable {
10 - //@JsonProperty 10 +
11 private String TJRLCARDNO;// NUMBER Y 交通卡号 11 private String TJRLCARDNO;// NUMBER Y 交通卡号
12 private String TJRLINSID; // NUMBER Y 行业 12 private String TJRLINSID; // NUMBER Y 行业
13 private String TJRLCDKIND;// NUMBER Y 卡类型 13 private String TJRLCDKIND;// NUMBER Y 卡类型
src/main/java/com/example/demo/service/Contrast.java
1 package com.example.demo.service; 1 package com.example.demo.service;
2 2
  3 +import cn.hutool.json.JSON;
3 import com.alibaba.fastjson.JSONArray; 4 import com.alibaba.fastjson.JSONArray;
4 import com.alibaba.fastjson.JSONObject; 5 import com.alibaba.fastjson.JSONObject;
5 import com.bsth.util.HttpUtils; 6 import com.bsth.util.HttpUtils;
@@ -7,8 +8,6 @@ import com.example.demo.SaticScheduleTask; @@ -7,8 +8,6 @@ import com.example.demo.SaticScheduleTask;
7 import com.example.demo.mapper.db1.SiteMapper1; 8 import com.example.demo.mapper.db1.SiteMapper1;
8 import com.example.demo.mapper.db2.SiteMapper2; 9 import com.example.demo.mapper.db2.SiteMapper2;
9 import com.example.demo.model.*; 10 import com.example.demo.model.*;
10 -import com.fasterxml.jackson.core.JsonProcessingException;  
11 -import com.fasterxml.jackson.databind.ObjectMapper;  
12 import org.slf4j.Logger; 11 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory; 12 import org.slf4j.LoggerFactory;
14 import org.springframework.beans.factory.annotation.Autowired; 13 import org.springframework.beans.factory.annotation.Autowired;
@@ -313,15 +312,8 @@ public class Contrast { @@ -313,15 +312,8 @@ public class Contrast {
313 data2.add(tjrl); 312 data2.add(tjrl);
314 } 313 }
315 map.put("datas", data2); 314 map.put("datas", data2);
316 -  
317 - ObjectMapper mapper = new ObjectMapper();  
318 - String json = null;  
319 - try {  
320 - json = mapper.writeValueAsString(map);  
321 - } catch (JsonProcessingException e) {  
322 - e.printStackTrace();  
323 - }  
324 - 315 + JSON j=new cn.hutool.json.JSONObject(map);
  316 + String json =j.toString();
325 ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("PDGJ", json); 317 ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("PDGJ", json);
326 future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { 318 future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
327 @Override 319 @Override
@@ -343,154 +335,7 @@ public class Contrast { @@ -343,154 +335,7 @@ public class Contrast {
343 } 335 }
344 336
345 337
346 - /**  
347 - * @Description: 根据carid 获取票务数据  
348 - * @Param: allData交易记录 isOld是否是残余数据(上次未匹配到的数据)  
349 - * @return:  
350 - * @Author: YM  
351 - * @Date: 2021/10/13  
352 - */  
353 - public void filterCardListByDB(List<TJRLDB> allData,Boolean isOld) {  
354 - long start = System.currentTimeMillis();  
355 - List<Map> driverCardList = getDriverCardList();//获取司机信息  
356 - Map<String,List<Scheduling>> schedulingS =getSchedulingS;  
357 - Map<String,List<Scheduling>> schedulingSSell =getSchedulingSSell;  
358 - DateTimeFormatter dtf=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
359 - int b=0;//用于计算匹配成功的数量  
360 - for (int i = 0; i < allData.size(); i++) {  
361 - TJRLDB data = allData.get(i);  
362 - //通过司售卡找到司机  
363 - String carId = data.getTJRLDRVCRDID().length()>8?  
364 - data.getTJRLDRVCRDID().substring( data.getTJRLDRVCRDID().length()-8):  
365 - data.getTJRLDRVCRDID();  
366 - List<Map> collect = driverCardList.stream().filter(map -> map.get("cardno").equals(carId)).collect(Collectors.toList());  
367 - String mark="";  
368 - String jobCodes = "";  
369 - Boolean flag=false;//用来判断是否没有匹配成功  
370 - Long cardDate = null;  
371 - if (!collect.isEmpty()) {  
372 - if (null != collect.get(0).get("jobCode") && null != collect.get(0).get("personalName")) {  
373 - String personalName = collect.get(0).get("personalName").toString();//姓名  
374 - String[] arr=collect.get(0).get("jobCode").toString().split("-");  
375 - jobCodes =arr.length==0?collect.get(0).get("jobCode").toString():arr[arr.length-1];//工号  
376 - String time=String.valueOf(data.getTJRLRTIME());//时间  
377 - String date=String.valueOf(data.getTJRLRDATE());//日期  
378 - cardDate = LocalDateTime.parse(date+" "+time,dtf).toInstant(ZoneOffset.of("+8")).toEpochMilli(); //刷卡时间(时间戳)  
379 338
380 - //通过司机姓名获得进出记录  
381 - List<Scheduling> schedulingList1=schedulingS.get(personalName);  
382 - //没匹配到匹配售票员  
383 - if(schedulingList1==null){  
384 - schedulingList1=schedulingSSell.get(personalName);  
385 - }  
386 - Scheduling scheduling = null;  
387 -  
388 - if(schedulingList1!=null){  
389 - Collections.sort(schedulingList1, Comparator.comparing(Scheduling::getTs).reversed());  
390 - for (int i1 = 0; i1 < schedulingList1.size(); i1++) {  
391 - Scheduling s=schedulingList1.get(i1);  
392 - if((Long.parseLong(s.getTs())-TIME2*60*1000)<=cardDate){  
393 - scheduling=s;  
394 - b++;  
395 - flag=true;  
396 - break;  
397 - }  
398 - }  
399 - }else {  
400 - mark="未查到驾驶员排班:"+personalName+" 工号:"+jobCodes;  
401 - }  
402 -  
403 - if (scheduling!=null) {  
404 - data.setSTATION_FLAG("1");  
405 - data.setTJRLUNITID(scheduling.getGs_name());  
406 - data.setTJRLSTATID(scheduling.getXl_name());  
407 - data.setROAD_FORM_TYPE(scheduling.getBc_type());  
408 - data.setUPDOWN(scheduling.getXl_dir());  
409 - data.setBUS_CODE(scheduling.getCl_zbh());  
410 - data.setBUS_PLATE(scheduling.getCar_plate());  
411 - data.setROAD_CODE(scheduling.getLp_name());  
412 - data.setDEPART_NAME(scheduling.getQdz_name());  
413 - data.setDEPART_CODE(scheduling.getQdz_code());  
414 - data.setDEPART_ACTUAL_TIME(scheduling.getFcsj_actual());  
415 - data.setREACH_NAME(scheduling.getZdz_name());  
416 - data.setREACH_CODE(scheduling.getZdz_code());  
417 - data.setREACH_ACTUAL_TIME(scheduling.getZdsj_actual());  
418 - if(scheduling.getStation_name()!=null){  
419 - data.setLEVELS_FLAG("1");  
420 - data.setLEVELS1(scheduling.getStation_route_code());//站点路由序号  
421 - data.setLEVELS_NAME1(scheduling.getStation_name());//站点名  
422 - data.setLEVELS1_CODE(scheduling.getStop_no());//站点编号  
423 - }else {  
424 - data.setLEVELS_FLAG("0");  
425 - }  
426 - } else {  
427 - data.setSTATION_FLAG("0");  
428 - }  
429 - }  
430 - }else {  
431 - mark="未查到驾驶员 卡号:"+carId;  
432 - data.setSTATION_FLAG("0");  
433 - }  
434 - if(mark.length()==0 && !flag){  
435 - mark="未匹配到班次 "+jobCodes+"刷卡时间为:"+LocalDateTime.ofEpochSecond(cardDate/1000, 0, ZoneOffset.ofHours(8));  
436 - }  
437 - data.setMARK(mark);  
438 - //如果是新数据 没匹配到 去除这条记录 并放入缓存等到下次继续匹配  
439 - if(!isOld && !flag){  
440 - List<TJRLDB> m =oldData;  
441 - m.add(data);  
442 - allData.set(i,null);  
443 -  
444 - }  
445 - }  
446 - int size=allData.size();  
447 - allData.removeIf(Objects::isNull);  
448 - //防止数据过多 分批插入  
449 - List<List<TJRLDB>> subList = getSubList(2000, allData);  
450 - if(!subList.isEmpty()){  
451 - subList.forEach(data->{  
452 - if (!data.isEmpty()) {  
453 - siteMapper1.insertResult(data);  
454 - }  
455 - });  
456 - }  
457 -  
458 - List<List<TJRLDB>> subList2 = getSubList(500, allData);  
459 - if(!subList2.isEmpty()){  
460 - subList2.forEach(data->{  
461 - if (!data.isEmpty()) {  
462 - Map<String, Object> map = new HashMap<>();  
463 - map.put("datatype", "passengerflow");  
464 - TJRL tjrl= (TJRL) data;  
465 - map.put("datas", tjrl);  
466 -  
467 - ObjectMapper mapper = new ObjectMapper();  
468 - String json = null;  
469 - try {  
470 - json = mapper.writeValueAsString(map);  
471 - } catch (JsonProcessingException e) {  
472 - e.printStackTrace();  
473 - }  
474 -  
475 - ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("PDGJ", json);  
476 - future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {  
477 - @Override  
478 - public void onSuccess(SendResult<String, String> result) {  
479 - }  
480 - @Override  
481 - public void onFailure(Throwable ex) {  
482 - logger.error("kafka发送票务数据异常", ex);  
483 - }  
484 - });  
485 - }  
486 - });  
487 - }  
488 - //如果是上次遗留的数据不管这是是否匹配到都清空  
489 - if(isOld){  
490 - oldData.clear();  
491 - }  
492 - logger.warn("======匹配"+size+"条======匹配到"+b+"条=====耗时=="+(System.currentTimeMillis() - start)/1000 + "秒");  
493 - }  
494 339
495 340
496 341
@@ -534,7 +379,7 @@ public class Contrast { @@ -534,7 +379,7 @@ public class Contrast {
534 record.setTJRLRDATE(s.split(" ")[0]); 379 record.setTJRLRDATE(s.split(" ")[0]);
535 record.setTJRLRTIME(s.split(" ")[1]); 380 record.setTJRLRTIME(s.split(" ")[1]);
536 } 381 }
537 - filterCardListByDB(records,false); 382 + filterCardList(records,false);
538 } 383 }
539 384
540 public List<List<TJRLDB>> getSubList(int length, List<TJRLDB> list){ 385 public List<List<TJRLDB>> getSubList(int length, List<TJRLDB> list){
src/main/resources/mapping/db1/Site-mapper1.xml
@@ -7,13 +7,14 @@ @@ -7,13 +7,14 @@
7 7
8 8
9 <insert id="insertResult" parameterType="List"> 9 <insert id="insertResult" parameterType="List">
10 - INSERT INTO `passenger_flow`.`t_jc_result` (  
11 - `TJRLCARDNO`, `TJRLINSID`, `TJRLCDKIND`, `TJRLPOSID`,`TJRLRDATE`,  
12 - `TJRLRTIME`,`TJRLCDBAL`,`TJRLAMT`,`TJRLORGAMT`,  
13 - `TJRLTXFG`,`STATION_FLAG`,`TJRLUNITID`,`TJRLSTATID`,`ROAD_FORM_TYPE`, `UPDOWN`, `BUS_CODE`,  
14 - `BUS_PLATE`, `ROAD_CODE`, `DEPART_NAME`,`DEPART_CODE`, `DEPART_ACTUAL_TIME`,  
15 - `REACH_NAME`,`REACH_CODE`,`REACH_ACTUAL_TIME`,  
16 - `LEVELS_FLAG`,`LEVELS1`, `LEVELS_NAME1`, `LEVELS1_CODE`,`LEVELS2`, `LEVELS_NAME2`,`MARK`,TRAD_ID 10 + INSERT INTO passenger_flow.t_jc_result (
  11 + TJRLCARDNO, TJRLINSID, TJRLCDKIND, TJRLPOSID,TJRLRDATE,
  12 + TJRLRTIME,TJRLCDBAL,TJRLAMT,TJRLORGAMT,
  13 + TJRLTXFG,STATION_FLAG,TJRLUNITID,TJRLSTATID,ROAD_FORM_TYPE, UPDOWN, BUS_CODE,
  14 + BUS_PLATE, ROAD_CODE, DEPART_NAME,DEPART_CODE, DEPART_ACTUAL_TIME,
  15 + REACH_NAME,REACH_CODE,REACH_ACTUAL_TIME,
  16 + LEVELS_FLAG,LEVELS1, LEVELS_NAME1, LEVELS1_CODE,LEVELS2, LEVELS_NAME2,LEVELS2_CODE,MARK,TRAD_ID,
  17 + GPS_DATE_TIME1,GPS_DATE_TIME2,B_STATION_ID,PLAN_START_TIME,PLAN_ARRIVE_TIME,ACTUAL_START_TIME,ACTUAL_ARRIVE_TIME
17 ) 18 )
18 VALUES 19 VALUES
19 <foreach collection="list" item="em" index="index" separator=","> 20 <foreach collection="list" item="em" index="index" separator=",">
@@ -23,7 +24,8 @@ @@ -23,7 +24,8 @@
23 #{em.BUS_CODE}, 24 #{em.BUS_CODE},
24 #{em.BUS_PLATE}, #{em.ROAD_CODE}, #{em.DEPART_NAME},#{em.DEPART_CODE}, #{em.DEPART_ACTUAL_TIME}, 25 #{em.BUS_PLATE}, #{em.ROAD_CODE}, #{em.DEPART_NAME},#{em.DEPART_CODE}, #{em.DEPART_ACTUAL_TIME},
25 #{em.REACH_NAME},#{em.REACH_CODE},#{em.REACH_ACTUAL_TIME}, 26 #{em.REACH_NAME},#{em.REACH_CODE},#{em.REACH_ACTUAL_TIME},
26 - #{em.LEVELS_FLAG},#{em.LEVELS1}, #{em.LEVELS_NAME1}, #{em.LEVELS1_CODE},#{em.LEVELS2}, #{em.LEVELS_NAME2}, #{em.MARK},#{em.TRAD_ID}) 27 + #{em.LEVELS_FLAG},#{em.LEVELS1}, #{em.LEVELS_NAME1}, #{em.LEVELS1_CODE},#{em.LEVELS2}, #{em.LEVELS_NAME2}, #{em.LEVELS2_CODE}, #{em.MARK},#{em.TRAD_ID},
  28 + #{em.GPS_DATE_TIME1},#{em.GPS_DATE_TIME2},#{em.B_STATION_ID},#{em.PLAN_START_TIME},#{em.PLAN_ARRIVE_TIME},#{em.ACTUAL_START_TIME},#{em.ACTUAL_ARRIVE_TIME})
27 </foreach> 29 </foreach>
28 </insert> 30 </insert>
29 31