VehicleDataSyncTaskServiceImpl.java
9.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package com.bsth.service.schedule.datasync;
import com.bsth.entity.schedule.datasync.VehicleDataSyncTask;
import com.bsth.entity.schedule.datasync.VehicleDataSyncTaskStatusEnum;
import com.bsth.service.schedule.datasync.task.VehicleDataSyncTaskThread;
import com.bsth.service.schedule.exception.ScheduleException;
import com.bsth.service.schedule.impl.BServiceImpl;
import com.bsth.service.schedule.utils.DataToolsProperties;
import com.bsth.service.schedule.utils.DataToolsService;
import com.bsth.service.schedule.utils.DataToolsServiceImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.MessageFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
/**
* 车辆信息同步实现。
*/
@Service("VehicleDataSyncServiceImpl2")
public class VehicleDataSyncTaskServiceImpl extends BServiceImpl<VehicleDataSyncTask, Long> implements VehicleDataSyncTaskService, Runnable, DisposableBean {
/** 日志记录器 */
private final static Logger LOG = LoggerFactory.getLogger(VehicleDataSyncTaskServiceImpl.class);
/** 同步阻塞队列 */
private ArrayBlockingQueue<VehicleDataSyncTask> dataSyncQueue;
/** 队列容量 */
private final static int QUEUE_CAPACITY = 100;
/** bean是否销毁(用于优雅退出) */
private volatile boolean beanDestroy = false;
@PostConstruct
public void init() {
LOG.info("车辆数据同步服务启动!");
// 创建队列,指定长度100
dataSyncQueue = new ArrayBlockingQueue<>(100);
LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
// 启动线程实例
new Thread(this).start();
}
//--------------------- 禁止override某些方法(如下) -----------------//
@Override
public VehicleDataSyncTask save(VehicleDataSyncTask vehicleDataSyncTask) {
throw new RuntimeException("VehicleDataSyncTaskServiceImpl 不支持save操作!");
}
@Override
public <S extends VehicleDataSyncTask> List<S> bulkSave(List<S> entities) {
throw new RuntimeException("VehicleDataSyncTaskServiceImpl 不支持bulkSave操作!");
}
@Override
public void delete(Long aLong) throws ScheduleException {
throw new RuntimeException("VehicleDataSyncTaskServiceImpl 不支持delete操作!");
}
//--------------------- 禁止override某些方法(如上) -----------------//
@Override
public void addToDataSyncTaskQueue(VehicleDataSyncTask task) {
if (!this.beanDestroy) {
// 1、初始创建同步日志对象
VehicleDataSyncTask internalTask = VehicleDataSyncTask.builder()
.paramClzbh(task.getParamClzbh())
.paramFrom(task.getParamFrom())
.paramTo(task.getParamTo())
.type(task.getType())
.startDate(new Date())
.status(VehicleDataSyncTaskStatusEnum.PREPARE)
.build();
internalTask = super.save(internalTask);
if (!this.dataSyncQueue.offer(internalTask)) {
internalTask.setEndDate(new Date());
internalTask.setStatus(VehicleDataSyncTaskStatusEnum.PREPARE_FAIL);
internalTask.setProcessMsg(MessageFormat.format(
"同步队列已满,队列容量={0},待处理量={1}, 无法添加同步任务!",
QUEUE_CAPACITY, dataSyncQueue.size()));
super.save(internalTask);
}
}
LOG.info("队列容量={},待处理量={}", QUEUE_CAPACITY, dataSyncQueue.size());
}
@Override
public void destroy() throws Exception {
beanDestroy = true;
int queueSize = this.dataSyncQueue.size();
while (queueSize != 0) {
// 队列中的同步操作全部完成
try {
// 等待1秒后继续判定
Thread.sleep(1000);
} catch (Exception exp) {
exp.printStackTrace();
}
}
LOG.info("车辆数据同步服务销毁!");
}
@Autowired
@Qualifier(value = "dataToolsServiceImpl")
private DataToolsService dataToolsService;
@Autowired
private DataToolsProperties dataToolsProperties;
@Override
public void run() {
int queueSize = this.dataSyncQueue.size();
// 如果没有销毁bean,则一直检索队列,如果销毁bean,则需要等待所有队列元素执行完毕
while (!this.beanDestroy || queueSize > 0) {
// task出队列
VehicleDataSyncTask task = this.dataSyncQueue.poll();
if (task != null) {
try {
// 1、设定logback MDC key,用于子线程单独输出日志
String taskLogKey = String.format(
"%s.%s",
DateFormatUtils.format(task.getStartDate(), "yyyy-MM-dd"),
String.valueOf(task.getId()));
MDC.put("taskLogKey", taskLogKey);
// 2、开始同步
task.setStatus(VehicleDataSyncTaskStatusEnum.SYNCING);
task = super.save(task);
// 2-1、启动同步线程
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(VehicleDataSyncTaskThread.builder()
.countDownLatch(countDownLatch)
.dataToolsService((DataToolsServiceImpl) dataToolsService)
.dataToolsProperties(dataToolsProperties)
.vehicleDataSyncTask(task)
.mdcCopyOfContextMap(MDC.getCopyOfContextMap())
.build()).start();
// 3、等待同步结束
countDownLatch.await();
// 3-1、更新日志
task.setEndDate(new Date());
long runMills = task.getEndDate().getTime() - task.getStartDate().getTime();
task.setProcessSeconds((int) runMills / 1000);
task.setStatus(VehicleDataSyncTaskStatusEnum.END);
task.setProcessMsg("成功!");
super.save(task);
} catch (Exception exp) {
// 获取异常stack信息
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
exp.printStackTrace(pw);
// 更新日志
task.setEndDate(new Date());
long runMills = task.getEndDate().getTime() - task.getStartDate().getTime();
task.setProcessSeconds((int) runMills / 1000);
task.setStatus(VehicleDataSyncTaskStatusEnum.END);
task.setProcessMsg("有问题:" + StringUtils.substring(sw.toString(), 0, 1800)); // 日志限定长度
super.save(task);
} finally {
MDC.clear();
}
}
// 等待100毫秒后继续判定
try {
Thread.sleep(100);
} catch (Exception exp) {
exp.printStackTrace();
}
}
}
//----------------------- 获取日志相关文件 ------------------//
@Override
public File getTaskLogFile(Long vehicleDataSyncTaskId) {
return this.getInternalFile(vehicleDataSyncTaskId, ".log");
}
@Override
public File getTaskValidateErrorFile(Long vehicleDataSyncTaskId) {
return this.getInternalFile(vehicleDataSyncTaskId, "-validate_error.xls");
}
@Override
public File getTaskPrepareFile(Long vehicleDataSyncTaskId) {
return this.getInternalFile(vehicleDataSyncTaskId, "-prepare.xls");
}
@Override
public File getTaskAddErrorFile(Long vehicleDataSyncTaskId) {
return this.getInternalFile(vehicleDataSyncTaskId, "-add_error.xls");
}
@Override
public File getTaskUpdateErrorFile(Long vehicleDataSyncTaskId) {
return this.getInternalFile(vehicleDataSyncTaskId, "-update_error.xls");
}
@Override
public File getTaskDeviceAddErrorFile(Long vehicleDataSyncTaskId) {
return this.getInternalFile(vehicleDataSyncTaskId, "-device_add_error.xls");
}
/** 文件前缀格式 */
private final static String FILE_PREFIX_PATTERN = "vehicle-data-sync.%s.%s";
private File getInternalFile(Long vehicleDataSyncLogId, String fileSuffix) {
VehicleDataSyncTask vehicleDataSyncLog = this.findById(vehicleDataSyncLogId);
if (vehicleDataSyncLog == null) {
throw new RuntimeException("日志Id=" + vehicleDataSyncLogId + ",不存在!");
}
// 组合日志文件名称
String fileName = String.format(
FILE_PREFIX_PATTERN,
DateFormatUtils.format(vehicleDataSyncLog.getStartDate(), "yyyy-MM-dd"),
String.valueOf(vehicleDataSyncLogId)) + fileSuffix;
File file = null;
try {
Path rootPath = VehicleDataSyncTaskThread.getLogbackFilePath();
Path logFilePath = Paths.get(rootPath.toString(), "datasync", "vehicle", fileName);
return logFilePath.toFile();
} catch (Exception exp) {
exp.printStackTrace();
}
return file;
}
}