VehicleDataSyncServiceImpl.java
9.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package com.bsth.service.schedule.datasync;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.core.FileAppender;
import com.bsth.entity.schedule.datasync.VehicleDataSyncLog;
import com.bsth.entity.schedule.datasync.VehicleDataSyncStatusEnum;
import com.bsth.service.schedule.datasync.log.VehicleDataSyncLogService;
import com.bsth.service.schedule.datasync.task.VehicleDataSyncTaskFlag;
import com.bsth.service.schedule.datasync.task.VehicleDataSyncTaskThread;
import com.bsth.service.schedule.utils.DataToolsProperties;
import com.bsth.service.schedule.utils.DataToolsService;
import com.bsth.service.schedule.utils.DataToolsServiceImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.MessageFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
/**
* 车辆信息同步实现。
*/
@Service("VehicleDataSyncServiceImpl2")
public class VehicleDataSyncServiceImpl implements VehicleDataSyncService, Runnable, DisposableBean {
/** 日志记录器 */
private final static Logger LOG = LoggerFactory.getLogger(VehicleDataSyncServiceImpl.class);
/** 同步阻塞队列 */
private ArrayBlockingQueue<VehicleDataSyncTaskFlag> dataSyncQueue;
/** 队列容量 */
private final static int QUEUE_CAPACITY = 100;
/** bean是否销毁(用于优雅退出) */
private volatile boolean beanDestroy = false;
@PostConstruct
public void init() {
LOG.info("车辆数据同步服务启动!");
// 创建队列,指定长度100
dataSyncQueue = new ArrayBlockingQueue<>(100);
LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
// 启动线程实例
new Thread(this).start();
}
@Override
public void addToDataSyncTaskQueue(VehicleDataSyncTaskFlag vehicleDataSyncFlag) {
if (!this.beanDestroy) {
// 1、初始创建同步日志对象
VehicleDataSyncLog vehicleDataSyncLog = new VehicleDataSyncLog();
vehicleDataSyncLog.setStartDate(new Date());
vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.PREPARE);
vehicleDataSyncLog = this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
vehicleDataSyncFlag.setVehicleDataSyncLogId(vehicleDataSyncLog.getId()); // 设定日志Id
if (!this.dataSyncQueue.offer(vehicleDataSyncFlag)) {
vehicleDataSyncLog.setEndDate(new Date());
vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.PREPARE_FAIL);
vehicleDataSyncLog.setProcessMsg(MessageFormat.format(
"同步队列已满,队列容量={0},待处理量={1}, 无法添加同步任务!",
QUEUE_CAPACITY, dataSyncQueue.size()));
this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
}
}
LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
}
@Override
public void destroy() throws Exception {
beanDestroy = true;
int queueSize = this.dataSyncQueue.size();
while (queueSize != 0) {
// 队列中的同步操作全部完成
try {
// 等待1秒后继续判定
Thread.sleep(1000);
} catch (Exception exp) {
exp.printStackTrace();
}
}
LOG.info("车辆数据同步服务销毁!");
}
@Autowired
private VehicleDataSyncLogService vehicleDataSyncLogService;
@Autowired
@Qualifier(value = "dataToolsServiceImpl")
private DataToolsService dataToolsService;
@Autowired
private DataToolsProperties dataToolsProperties;
@Override
public void run() {
int queueSize = this.dataSyncQueue.size();
// 如果没有销毁bean,则一直检索队列,如果销毁bean,则需要等待所有队列元素执行完毕
while (!this.beanDestroy || queueSize > 0) {
// flag出队列
VehicleDataSyncTaskFlag flag = this.dataSyncQueue.poll();
if (flag != null) {
// 同步日志对象
VehicleDataSyncLog vehicleDataSyncLog = null;
try {
// 1、获取同步日志
vehicleDataSyncLog = this.vehicleDataSyncLogService.findById(
flag.getVehicleDataSyncLogId());
// 2、设定logback MDC key,用于子线程单独输出日志
String taskLogKey = String.format(
"%s.%s",
DateFormatUtils.format(vehicleDataSyncLog.getStartDate(), "yyyy-MM-dd"),
String.valueOf(vehicleDataSyncLog.getId()));
MDC.put("taskLogKey", taskLogKey);
// 3、开始同步
vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.SYNCING);
vehicleDataSyncLog = this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
// 3-1、启动同步线程
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(VehicleDataSyncTaskThread.builder()
.countDownLatch(countDownLatch)
.dataToolsService((DataToolsServiceImpl) dataToolsService)
.dataToolsProperties(dataToolsProperties)
.vehicleDataSyncLogService(vehicleDataSyncLogService)
.vehicleDataSyncLog(vehicleDataSyncLog)
.mdcCopyOfContextMap(MDC.getCopyOfContextMap())
.build()).start();
// 4、等待同步结束
countDownLatch.await();
// 4-1、更新日志
vehicleDataSyncLog.setEndDate(new Date());
long runMills = vehicleDataSyncLog.getEndDate().getTime() - vehicleDataSyncLog.getStartDate().getTime();
vehicleDataSyncLog.setProcessSeconds((int) runMills / 1000);
vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.END);
vehicleDataSyncLog.setProcessMsg("成功!");
this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
} catch (Exception exp) {
// 获取异常stack信息
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
exp.printStackTrace(pw);
// 更新日志
vehicleDataSyncLog.setEndDate(new Date());
long runMills = vehicleDataSyncLog.getEndDate().getTime() - vehicleDataSyncLog.getStartDate().getTime();
vehicleDataSyncLog.setProcessSeconds((int) runMills / 1000);
vehicleDataSyncLog.setStatus(VehicleDataSyncStatusEnum.END);
vehicleDataSyncLog.setProcessMsg("有问题:" + StringUtils.substring(sw.toString(), 0, 1800)); // 日志限定长度
this.vehicleDataSyncLogService.save(vehicleDataSyncLog);
} finally {
MDC.clear();
}
}
// 等待100毫秒后继续判定
try {
Thread.sleep(100);
} catch (Exception exp) {
exp.printStackTrace();
}
}
}
/** 日志格式 */
private final static String TASK_LOG_PATTERN = "vehicle-data-sync.%s.%s.log";
@Override
public File getTaskLogFile(Long vehicleDataSyncLogId) {
VehicleDataSyncLog vehicleDataSyncLog = this.vehicleDataSyncLogService.findById(vehicleDataSyncLogId);
if (vehicleDataSyncLog == null) {
throw new RuntimeException("日志Id=" + vehicleDataSyncLogId + ",不存在!");
}
// 组合日志文件名称
String logFileName = String.format(
TASK_LOG_PATTERN,
DateFormatUtils.format(vehicleDataSyncLog.getStartDate(), "yyyy-MM-dd"),
String.valueOf(vehicleDataSyncLogId));
File file = null;
try {
Path rootPath = this.getLogbackFilePath();
Path logFilePath = Paths.get(rootPath.toString(), "datasync", "vehicle", logFileName);
return logFilePath.toFile();
} catch (Exception exp) {
exp.printStackTrace();
}
return file;
}
// 获取Logback中命名为File的Appender的日志文件输出路径,然后反向获取 ${LOG_BASE} 目录
private Path getLogbackFilePath() throws Exception {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
ch.qos.logback.classic.Logger logger = loggerContext.getLogger("ROOT");
FileAppender fileAppender = (FileAppender) logger.getAppender("FILE");//"FILE"是logback配置中,输出日志文件的appender的name属性
File file = new File(fileAppender.getFile());
Path filePath = Paths.get(file.getCanonicalPath()); // 这个等于 ${LOG_BASE}/main/main.log
return filePath.getParent().getParent();
}
public static void main(String[] args) throws Exception {
String filepath = "/Users/xu/resource/project_code/bsth_project/bsth_control_parent/E:/bsth_control_logs";
Path p = Paths.get(filepath);
System.out.println(p.getParent().getParent());
}
}