FFmpegPublisher.java
12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
package com.genersoft.iot.vmp.jtt1078.publisher;
import com.genersoft.iot.vmp.jtt1078.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
/**
* FFmpeg 推流器 (仅处理视频直播流)
*
* 实现了IStreamPublisher接口,支持通过工厂创建
*
* 修改记录:
* 1. 实现IStreamPublisher接口,支持工厂模式
* 2. 添加详细的启动和关闭日志,便于排查问题
* 3. 优化关闭逻辑,确保FFmpeg进程被正确终止
* 4. 添加FFmpeg输出日志监控
* 5. 兼容Java 8
*/
public class FFmpegPublisher extends Thread implements IStreamPublisher
{
static Logger logger = LoggerFactory.getLogger(FFmpegPublisher.class);
String tag = null;
Process process = null;
private volatile boolean running = true;
private volatile boolean connected = false;
/** FFmpeg路径 */
private String ffmpegPath = null;
/** 目标RTMP地址 */
private String rtmpUrl = null;
/** FFmpeg命令格式标志 */
private String formatFlag = null;
/** 进程启动时间 */
private long processStartTime = 0;
/** 推流器类型标识 */
private static final String PUBLISHER_TYPE = "ffmpeg";
public FFmpegPublisher(String tag)
{
this.tag = tag;
this.setName("FFmpegPublisher-" + tag);
this.setDaemon(true);
}
@Override
public void start(String tag) {
this.start();
}
@Override
public void run()
{
InputStream stderr = null;
InputStream stdout = null;
int len = -1;
byte[] buff = new byte[512];
boolean debugMode = "on".equalsIgnoreCase(Configs.get("debug.mode"));
try
{
// 获取FFmpeg配置
String sign = "41db35390ddad33f83944f44b8b75ded";
rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign);
ffmpegPath = Configs.get("ffmpeg.path");
// 自动判断协议格式
formatFlag = "";
if (rtmpUrl.startsWith("rtsp://")) {
formatFlag = "-f rtsp";
} else if (rtmpUrl.startsWith("rtmp://")) {
formatFlag = "-f flv";
}
// 构造FFmpeg命令
String cmd = String.format("%s -i http://127.0.0.1:%d/video/%s -vcodec copy -acodec aac %s %s",
ffmpegPath,
Configs.getInt("server.http.port", 3333),
tag,
formatFlag,
rtmpUrl
);
logger.info("===========================================");
logger.info("[{}] ====== FFmpeg推流任务启动 ======", tag);
logger.info("[{}] FFmpeg路径: {}", tag, ffmpegPath);
logger.info("[{}] 目标地址: {}", tag, rtmpUrl);
logger.info("[{}] 完整命令: {}", tag, cmd);
logger.info("[{}] HTTP端口: {}", tag, Configs.getInt("server.http.port", 3333));
logger.info("[{}] 源流地址: http://127.0.0.1:{}/video/{}", tag, Configs.getInt("server.http.port", 3333), tag);
logger.info("[{}] 启动时间: {}", tag, new java.util.Date());
logger.info("===========================================");
// 执行FFmpeg命令
process = Runtime.getRuntime().exec(cmd);
processStartTime = System.currentTimeMillis();
connected = true;
// 记录进程启动信息(Java 8兼容方式)
logger.info("[{}] FFmpeg进程已启动", tag);
stderr = process.getErrorStream();
stdout = process.getInputStream();
// 启动一个线程消费 stdout,防止缓冲区满导致进程阻塞
final InputStream finalStdout = stdout;
Thread stdoutConsumer = new Thread(() -> {
try {
byte[] buffer = new byte[512];
int count = 0;
while (running && finalStdout.read(buffer) > -1) {
count++;
// 每1000次读取才打印一次(避免刷屏)
if (debugMode && count % 1000 == 0) {
logger.debug("[{}] FFmpeg stdout消费中... count: {}", tag, count);
}
}
logger.info("[{}] FFmpeg stdout消费结束, 共消费{}次", tag, count);
} catch (Exception e) {
// 忽略异常
}
}, "FFmpeg-stdout-" + tag);
stdoutConsumer.setDaemon(true);
stdoutConsumer.start();
// 消费 stderr 日志流
StringBuilder errorLog = new StringBuilder();
int errorCount = 0;
while (running && (len = stderr.read(buff)) > -1)
{
if (debugMode) {
System.out.print(new String(buff, 0, len));
}
// 收集错误日志(便于排查问题)
errorLog.append(new String(buff, 0, len));
errorCount++;
// 每100条错误日志打印一次摘要
if (errorCount % 100 == 0) {
String lastError = errorLog.length() > 500
? errorLog.substring(errorLog.length() - 500)
: errorLog.toString();
logger.debug("[{}] FFmpeg错误日志摘要: {}", tag, lastError);
}
}
// 进程退出处理
int exitCode = process.waitFor();
long runDuration = System.currentTimeMillis() - processStartTime;
connected = false;
logger.warn("===========================================");
logger.warn("[{}] ====== FFmpeg推流任务结束 ======", tag);
logger.warn("[{}] 退出代码: {}", tag, exitCode);
logger.warn("[{}] 运行时间: {} ms", tag, runDuration);
logger.warn("[{}] 错误日志条数: {}", tag, errorCount);
// 分析退出原因
if (exitCode == 0) {
logger.info("[{}] FFmpeg正常退出", tag);
} else if (exitCode == -1 || exitCode == 255) {
logger.warn("[{}] FFmpeg被信号终止 (exitCode={})", tag, exitCode);
} else {
// 保存最后一段错误日志
String lastError = errorLog.length() > 1000
? errorLog.substring(errorLog.length() - 1000)
: errorLog.toString();
logger.error("[{}] FFmpeg异常退出, 最后错误日志:\n{}", tag, lastError);
}
logger.warn("===========================================");
}
catch(InterruptedException ex)
{
logger.info("[{}] FFmpegPublisher被中断: {}", tag, ex.getMessage());
Thread.currentThread().interrupt();
}
catch(Exception ex)
{
logger.error("[{}] FFmpegPublisher异常: {}", tag, ex);
}
finally
{
connected = false;
// 确保所有流都被关闭
closeQuietly(stderr);
closeQuietly(stdout);
if (process != null) {
closeQuietly(process.getInputStream());
closeQuietly(process.getOutputStream());
closeQuietly(process.getErrorStream());
}
logger.info("[{}] FFmpegPublisher资源已释放", tag);
}
}
/**
* 关闭FFmpeg推流
* 优化关闭逻辑,确保进程被正确终止(Java 8兼容)
*/
@Override
public void close()
{
logger.info("[{}] ====== 开始关闭FFmpeg推流 ======", tag);
logger.info("[{}] 关闭请求时间: {}", tag, new java.util.Date());
try {
// 设置停止标志
running = false;
connected = false;
if (process != null) {
long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
logger.info("[{}] 正在终止FFmpeg进程... (已运行{}ms)", tag, runDuration);
// 先尝试正常终止
process.destroy();
// 等待最多3秒
boolean exited = process.waitFor(3, TimeUnit.SECONDS);
if (!exited) {
logger.warn("[{}] FFmpeg进程未能在3秒内正常退出,开始强制终止...", tag);
// 强制终止(Java 8的方式)
process.destroyForcibly();
// 再等待2秒
try {
exited = process.waitFor(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!exited) {
logger.error("[{}] FFmpeg进程强制终止失败,可能存在资源泄漏", tag);
} else {
logger.info("[{}] FFmpeg进程已强制终止", tag);
}
} else {
int exitCode = process.exitValue();
logger.info("[{}] FFmpeg进程已正常终止, 退出代码: {}", tag, exitCode);
}
// 检查是否需要杀掉残留进程
checkAndKillOrphanedProcesses();
} else {
logger.info("[{}] FFmpeg进程为空,无需关闭", tag);
}
logger.info("[{}] ====== FFmpeg推流已关闭 ======", tag);
// 中断线程(如果还在阻塞读取)
this.interrupt();
// 等待线程结束
this.join(2000);
if (this.isAlive()) {
logger.warn("[{}] FFmpegPublisher线程未能正常结束", tag);
}
} catch(Exception e) {
logger.error("[{}] 关闭FFmpegPublisher时出错: {}", tag, e);
}
}
/**
* 检查并杀掉可能残留的FFmpeg进程
* 某些情况下FFmpeg进程可能没有被正确回收
*/
private void checkAndKillOrphanedProcesses() {
try {
// 根据FFmpeg命令特征查找残留进程
// 注意:这里只是记录日志,实际杀进程需要谨慎
logger.debug("[{}] 检查是否有FFmpeg残留进程...", tag);
} catch (Exception e) {
logger.debug("[{}] 检查残留进程时出错: {}", tag, e.getMessage());
}
}
/**
* 安全关闭流,忽略异常
*/
private void closeQuietly(Closeable stream) {
if (stream != null) {
try {
stream.close();
} catch (Exception e) {
// 忽略
}
}
}
/**
* 发送FLV数据 - FFmpeg模式不需要此方法
* 数据通过HTTP接口由FFmpeg自动拉取
*/
@Override
public void sendVideoData(byte[] flvData, int timestamp) {
// FFmpeg模式不需要实现此方法
// 数据通过HTTP接口由FFmpeg自动拉取
}
/**
* 发送AVC序列头 - FFmpeg模式不需要此方法
* FFmpeg模式下,SPS变化通过重启FFmpeg进程来解决
*/
@Override
public void sendAVCSequenceHeader() {
// FFmpeg模式不需要实现此方法
// SPS变化时通过restartRtmpPublisher()重启FFmpeg解决
logger.debug("[{}] FFmpeg模式不需要sendAVCSequenceHeader", tag);
}
/**
* 获取推流状态
*/
@Override
public boolean isConnected() {
return connected && this.isAlive() && process != null;
}
/**
* 获取推流器类型
*/
@Override
public String getType() {
return PUBLISHER_TYPE;
}
/**
* 获取推流器信息(用于调试)
*/
@Override
public String getStatus() {
if (process == null) {
return "FFmpeg进程未启动";
}
try {
long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
return String.format("FFmpeg推流器, 已运行%dms, 进程存活=%s, 连接状态=%s",
runDuration, process.isAlive(), connected ? "已连接" : "未连接");
} catch (Exception e) {
return "获取FFmpeg状态失败: " + e.getMessage();
}
}
/**
* 获取FFmpeg进程信息(用于调试)
*/
public String getProcessInfo() {
if (process == null) {
return "process is null";
}
try {
long runDuration = processStartTime > 0 ? System.currentTimeMillis() - processStartTime : 0;
return String.format("FFmpeg进程, 已运行%dms, alive=%s",
runDuration, process.isAlive());
} catch (Exception e) {
return "获取进程信息失败: " + e.getMessage();
}
}
}