RTMPPublisher.java
15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
package com.genersoft.iot.vmp.jtt1078.subscriber;
import com.genersoft.iot.vmp.jtt1078.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
/**
* FFmpeg 推流器 (仅处理视频直播流)
*
* 修改记录:
* 1. 添加详细的启动和关闭日志,便于排查问题
* 2. 优化关闭逻辑,确保FFmpeg进程被正确终止
* 3. 添加FFmpeg输出日志监控
* 4. 兼容Java 8
*/
public class RTMPPublisher extends Thread
{
static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class);
String tag = null;
Process process = null;
private volatile boolean running = true;
/** FFmpeg路径 */
private String ffmpegPath = null;
/** 目标RTMP地址 */
private String rtmpUrl = null;
/** FFmpeg命令格式标志 */
private String formatFlag = null;
/** 进程启动时间 */
private long processStartTime = 0;
public RTMPPublisher(String tag)
{
this.tag = tag;
this.setName("RTMPPublisher-" + tag);
this.setDaemon(true);
}
@Override
public void run()
{
InputStream stderr = null;
InputStream stdout = null;
int len = -1;
byte[] buff = new byte[512];
boolean debugMode = "on".equalsIgnoreCase(Configs.get("debug.mode"));
try
{
// 获取FFmpeg配置
String sign = "41db35390ddad33f83944f44b8b75ded";
rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign);
ffmpegPath = Configs.get("ffmpeg.path");
// 自动判断协议格式
formatFlag = "";
if (rtmpUrl.startsWith("rtsp://")) {
formatFlag = "-f rtsp";
} else if (rtmpUrl.startsWith("rtmp://")) {
formatFlag = "-f flv";
}
// 构造FFmpeg命令
String cmd = String.format("%s -i http://127.0.0.1:%d/video/%s -vcodec copy -acodec aac %s %s",
ffmpegPath,
Configs.getInt("server.http.port", 3333),
tag,
formatFlag,
rtmpUrl
);
logger.info("===========================================");
logger.info("[{}] ====== FFmpeg推流任务启动 ======", tag);
logger.info("[{}] FFmpeg路径: {}", tag, ffmpegPath);
logger.info("[{}] 目标地址: {}", tag, rtmpUrl);
logger.info("[{}] 完整命令: {}", tag, cmd);
logger.info("[{}] HTTP端口: {}", tag, Configs.getInt("server.http.port", 3333));
logger.info("[{}] 源流地址: http://127.0.0.1:{}/video/{}", tag, Configs.getInt("server.http.port", 3333), tag);
logger.info("[{}] 启动时间: {}", tag, new java.util.Date());
logger.info("===========================================");
// 执行FFmpeg命令
process = Runtime.getRuntime().exec(cmd);
processStartTime = System.currentTimeMillis();
// 记录进程启动信息(Java 8兼容方式)
logger.info("[{}] FFmpeg进程已启动", tag);
stderr = process.getErrorStream();
stdout = process.getInputStream();
// 启动一个线程消费 stdout,防止缓冲区满导致进程阻塞
final InputStream finalStdout = stdout;
Thread stdoutConsumer = new Thread(() -> {
try {
byte[] buffer = new byte[512];
int count = 0;
while (running && finalStdout.read(buffer) > -1) {
count++;
// 每1000次读取才打印一次(避免刷屏)
if (debugMode && count % 1000 == 0) {
logger.debug("[{}] FFmpeg stdout消费中... count: {}", tag, count);
}
}
logger.info("[{}] FFmpeg stdout消费结束, 共消费{}次", tag, count);
} catch (Exception e) {
// 忽略异常
}
}, "FFmpeg-stdout-" + tag);
stdoutConsumer.setDaemon(true);
stdoutConsumer.start();
// 消费 stderr 日志流
StringBuilder errorLog = new StringBuilder();
int errorCount = 0;
while (running && (len = stderr.read(buff)) > -1)
{
if (debugMode) {
System.out.print(new String(buff, 0, len));
}
// 收集错误日志(便于排查问题)
errorLog.append(new String(buff, 0, len));
errorCount++;
// 每100条错误日志打印一次摘要
if (errorCount % 100 == 0) {
String lastError = errorLog.length() > 500
? errorLog.substring(errorLog.length() - 500)
: errorLog.toString();
logger.debug("[{}] FFmpeg错误日志摘要: {}", tag, lastError);
}
}
// 进程退出处理
int exitCode = process.waitFor();
long runDuration = System.currentTimeMillis() - processStartTime;
logger.warn("===========================================");
logger.warn("[{}] ====== FFmpeg推流任务结束 ======", tag);
logger.warn("[{}] 退出代码: {}", tag, exitCode);
logger.warn("[{}] 运行时间: {} ms", tag, runDuration);
logger.warn("[{}] 错误日志条数: {}", tag, errorCount);
// 分析退出原因
if (exitCode == 0) {
logger.info("[{}] FFmpeg正常退出", tag);
} else if (exitCode == -1 || exitCode == 255) {
logger.warn("[{}] FFmpeg被信号终止 (exitCode={})", tag, exitCode);
} else {
// 保存最后一段错误日志
String lastError = errorLog.length() > 1000
? errorLog.substring(errorLog.length() - 1000)
: errorLog.toString();
logger.error("[{}] FFmpeg异常退出, 最后错误日志:\n{}", tag, lastError);
}
logger.warn("===========================================");
}
catch(InterruptedException ex)
{
logger.info("[{}] RTMPPublisher被中断: {}", tag, ex.getMessage());
Thread.currentThread().interrupt();
}
catch(Exception ex)
{
logger.error("[{}] RTMPPublisher异常: {}", tag, ex);
}
finally
{
// 确保所有流都被关闭
closeQuietly(stderr);
closeQuietly(stdout);
if (process != null) {
closeQuietly(process.getInputStream());
closeQuietly(process.getOutputStream());
closeQuietly(process.getErrorStream());
}
logger.info("[{}] RTMPPublisher资源已释放", tag);
}
}
/**
* 关闭FFmpeg推流
* 优化关闭逻辑,确保进程被正确终止(Java 8兼容)
*/
public void close()
{
logger.info("[{}] ====== 开始关闭FFmpeg推流 ======", tag);
logger.info("[{}] 关闭请求时间: {}", tag, new java.util.Date());
try {
// 设置停止标志
running = false;
if (process != null) {
long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
logger.info("[{}] 正在终止FFmpeg进程... (已运行{}ms)", tag, runDuration);
// 先尝试正常终止
process.destroy();
// 等待最多3秒
boolean exited = process.waitFor(3, TimeUnit.SECONDS);
if (!exited) {
logger.warn("[{}] FFmpeg进程未能在3秒内正常退出,开始强制终止...", tag);
// 强制终止(Java 8的方式)
process.destroyForcibly();
// 再等待2秒
try {
exited = process.waitFor(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!exited) {
logger.error("[{}] FFmpeg进程强制终止失败,可能存在资源泄漏", tag);
} else {
logger.info("[{}] FFmpeg进程已强制终止", tag);
}
} else {
int exitCode = process.exitValue();
logger.info("[{}] FFmpeg进程已正常终止, 退出代码: {}", tag, exitCode);
}
// 检查是否需要杀掉残留进程
checkAndKillOrphanedProcesses();
} else {
logger.info("[{}] FFmpeg进程为空,无需关闭", tag);
}
logger.info("[{}] ====== FFmpeg推流已关闭 ======", tag);
// 中断线程(如果还在阻塞读取)
this.interrupt();
// 等待线程结束
this.join(2000);
if (this.isAlive()) {
logger.warn("[{}] RTMPPublisher线程未能正常结束", tag);
}
} catch(Exception e) {
logger.error("[{}] 关闭RTMPPublisher时出错: {}", tag, e);
}
}
/**
* 检查并杀掉可能残留的FFmpeg进程
* Java 8兼容实现,使用系统命令查找和终止进程
*/
private void checkAndKillOrphanedProcesses() {
try {
logger.debug("[{}] 检查是否有FFmpeg残留进程...", tag);
// 判断操作系统类型
String os = System.getProperty("os.name").toLowerCase();
boolean isWindows = os.contains("win");
if (isWindows) {
// Windows: 使用tasklist和taskkill
killOrphanedProcessesWindows();
} else {
// Linux/Unix: 使用ps和kill
killOrphanedProcessesUnix();
}
} catch (Exception e) {
logger.error("[{}] 检查残留进程时出错: {}", tag, e.getMessage());
}
}
/**
* Windows环境下查找并终止残留的FFmpeg进程
*/
private void killOrphanedProcessesWindows() {
try {
// 查找包含ffmpeg的进程
ProcessBuilder pb = new ProcessBuilder("tasklist", "/FI", "IMAGENAME eq ffmpeg.exe", "/FO", "CSV", "/NH");
Process process = pb.start();
java.io.BufferedReader reader = new java.io.BufferedReader(
new java.io.InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
// CSV格式: "Image Name","PID","Session Name","Session#","Mem Usage"
String[] parts = line.split(",");
if (parts.length > 1) {
String pidStr = parts[1].replaceAll("\"", "").trim();
try {
long pid = Long.parseLong(pidStr);
// 检查命令行是否包含当前tag或wvp-jt1078
ProcessBuilder cmdPb = new ProcessBuilder("wmic", "process", "where", "ProcessId=" + pid, "get", "CommandLine");
Process cmdProcess = cmdPb.start();
java.io.BufferedReader cmdReader = new java.io.BufferedReader(
new java.io.InputStreamReader(cmdProcess.getInputStream()));
String cmdLine;
boolean shouldKill = false;
while ((cmdLine = cmdReader.readLine()) != null) {
if (cmdLine.toLowerCase().contains("ffmpeg")
&& (cmdLine.contains(tag) || cmdLine.contains("wvp-jt1078"))) {
shouldKill = true;
break;
}
}
if (shouldKill) {
logger.warn("[{}] 发现残留FFmpeg进程[PID:{}],正在终止...", tag, pid);
Process killProcess = Runtime.getRuntime().exec("taskkill /PID " + pid + " /F");
killProcess.waitFor();
logger.info("[{}] 残留FFmpeg进程已终止[PID:{}]", tag, pid);
}
} catch (NumberFormatException e) {
// 忽略非数字PID
}
}
}
} catch (Exception e) {
logger.error("[{}] Windows检查残留进程时出错: {}", tag, e.getMessage());
}
}
/**
* Linux/Unix环境下查找并终止残留的FFmpeg进程
*/
private void killOrphanedProcessesUnix() {
try {
// 查找包含ffmpeg的进程
ProcessBuilder pb = new ProcessBuilder("ps", "-ef");
Process process = pb.start();
java.io.BufferedReader reader = new java.io.BufferedReader(
new java.io.InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
// ps -ef 输出格式: UID PID PPID C STIME TTY TIME CMD
if (line.toLowerCase().contains("ffmpeg") && line.contains(tag)) {
String[] parts = line.split("\\s+");
if (parts.length > 1) {
String pidStr = parts[1];
try {
long pid = Long.parseLong(pidStr);
logger.warn("[{}] 发现残留FFmpeg进程[PID:{}],正在终止...", tag, pid);
Process killProcess = Runtime.getRuntime().exec("kill -9 " + pid);
killProcess.waitFor();
logger.info("[{}] 残留FFmpeg进程已终止[PID:{}]", tag, pid);
} catch (NumberFormatException e) {
// 忽略非数字PID
}
}
}
}
} catch (Exception e) {
logger.error("[{}] Unix检查残留进程时出错: {}", tag, e.getMessage());
}
}
/**
* 安全关闭流,忽略异常
*/
private void closeQuietly(Closeable stream) {
if (stream != null) {
try {
stream.close();
} catch (Exception e) {
// 忽略
}
}
}
/**
* 获取推流状态
*/
public boolean isRunning() {
return running && this.isAlive() && process != null;
}
/**
* 获取FFmpeg进程信息(用于调试)
*/
public String getProcessInfo() {
if (process == null) {
return "process is null";
}
try {
long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
return String.format("FFmpeg进程, 已运行%dms, alive=%s",
runDuration, process.isAlive());
} catch (Exception e) {
return "获取进程信息失败: " + e.getMessage();
}
}
}