RTMPPublisher.java
5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package com.genersoft.iot.vmp.jtt1078.subscriber;
import com.genersoft.iot.vmp.jtt1078.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
/**
* FFmpeg 推流器 (仅处理视频直播流)
*/
public class RTMPPublisher extends Thread
{
static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class);
String tag = null;
Process process = null;
private volatile boolean running = true;
public RTMPPublisher(String tag)
{
this.tag = tag;
this.setName("RTMPPublisher-" + tag);
this.setDaemon(true);
}
@Override
public void run()
{
InputStream stderr = null;
InputStream stdout = null;
int len = -1;
byte[] buff = new byte[512];
boolean debugMode = "on".equalsIgnoreCase(Configs.get("debug.mode"));
try
{
String sign = "41db35390ddad33f83944f44b8b75ded";
String rtmpUrl = Configs.get("rtmp.url").replaceAll("\\{TAG\\}", tag).replaceAll("\\{sign\\}",sign);
// 【修复】自动判断协议格式,避免硬编码 -f rtsp 导致 RTMP 推流失败
String formatFlag = "";
if (rtmpUrl.startsWith("rtsp://")) {
formatFlag = "-f rtsp";
} else if (rtmpUrl.startsWith("rtmp://")) {
formatFlag = "-f flv";
}
// 低延迟:缩短 HTTP-FLV 探测与缓冲,尽快向 ZLM 输出(仍受设备 GOP/关键帧限制)
String inputLowLatency =
"-fflags +nobuffer+discardcorrupt -flags low_delay -probesize 32 -analyzeduration 0";
String outputLowLatency = "-flush_packets 1";
// 构造命令:只处理视频流和非对讲的音频流
String cmd = String.format(
"%s %s -i http://127.0.0.1:%d/video/%s -vcodec copy -acodec aac %s %s %s",
Configs.get("ffmpeg.path"),
inputLowLatency,
Configs.getInt("server.http.port", 3333),
tag,
outputLowLatency,
formatFlag,
rtmpUrl
);
logger.info("FFmpeg Push Started. Tag: {}, CMD: {}", tag, cmd);
process = Runtime.getRuntime().exec(cmd);
stderr = process.getErrorStream();
stdout = process.getInputStream();
// 启动一个线程消费 stdout,防止缓冲区满导致进程阻塞
final InputStream finalStdout = stdout;
Thread stdoutConsumer = new Thread(() -> {
try {
byte[] buffer = new byte[512];
while (running && finalStdout.read(buffer) > -1) {
// 只消费,不输出
}
} catch (Exception e) {
// 忽略异常
}
}, "FFmpeg-stdout-" + tag);
stdoutConsumer.setDaemon(true);
stdoutConsumer.start();
// 消费 stderr 日志流,防止阻塞
while (running && (len = stderr.read(buff)) > -1)
{
if (debugMode) {
System.out.print(new String(buff, 0, len));
}
}
// 进程退出处理
int exitCode = process.waitFor();
logger.warn("FFmpeg process exited. Code: {}, Tag: {}", exitCode, tag);
}
catch(InterruptedException ex)
{
logger.info("RTMPPublisher interrupted: {}", tag);
Thread.currentThread().interrupt();
}
catch(Exception ex)
{
logger.error("RTMPPublisher Error: " + tag, ex);
}
finally
{
// 确保所有流都被关闭
closeQuietly(stderr);
closeQuietly(stdout);
if (process != null) {
closeQuietly(process.getInputStream());
closeQuietly(process.getOutputStream());
closeQuietly(process.getErrorStream());
}
}
}
public void close()
{
try {
// 设置停止标志
running = false;
if (process != null) {
// 先尝试正常终止
process.destroy();
// 等待最多 3 秒
boolean exited = process.waitFor(3, TimeUnit.SECONDS);
if (!exited) {
// 如果还没退出,强制终止
logger.warn("FFmpeg process did not exit gracefully, forcing termination: {}", tag);
process.destroyForcibly();
process.waitFor(2, TimeUnit.SECONDS);
}
logger.info("FFmpeg process terminated: {}", tag);
}
// 中断线程(如果还在阻塞读取)
this.interrupt();
// 等待线程结束
this.join(2000);
} catch(Exception e) {
logger.error("Error closing RTMPPublisher: " + tag, e);
}
}
/**
* 安全关闭流,忽略异常
*/
private void closeQuietly(Closeable stream) {
if (stream != null) {
try {
stream.close();
} catch (Exception e) {
// 忽略
}
}
}
}