NettyRtmpPublisher.java
7.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package com.genersoft.iot.vmp.jtt1078.publisher;
import com.genersoft.iot.vmp.jtt1078.rtmp.NettyRtmpClient;
import com.genersoft.iot.vmp.jtt1078.util.Configs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Netty RTMP 推流器
*
* 使用 Netty 实现的非阻塞 RTMP 客户端
*/
public class NettyRtmpPublisher implements IStreamPublisher {
private static final Logger logger = LoggerFactory.getLogger(NettyRtmpPublisher.class);
private String tag;
private volatile boolean connected = false;
private static final String PUBLISHER_TYPE = "netty";
// RTMP连接配置
private String host;
private int port;
private String app;
private String tcUrl;
private String streamName;
// Netty RTMP 客户端
private NettyRtmpClient client;
public NettyRtmpPublisher(String tag) {
this.tag = tag;
loadConfig();
}
/**
* 从配置加载RTMP连接参数
*/
private void loadConfig() {
// 优先使用原生RTMP配置
String configHost = Configs.get("rtmp.native.host");
String configPort = Configs.get("rtmp.native.port");
String configApp = Configs.get("rtmp.native.app");
String configTcUrl = Configs.get("rtmp.native.tcUrl");
// 如果有完整的tcUrl配置,直接使用
if (configTcUrl != null && !configTcUrl.isEmpty()) {
this.tcUrl = configTcUrl.replace("{TAG}", tag);
// 从tcUrl解析host和port
parseTcUrl(configTcUrl);
logger.info("[{}] 使用配置的tcUrl: {}", tag, this.tcUrl);
return;
}
// 如果有分离配置,使用分离配置
if (configHost != null && !configHost.isEmpty()) {
this.host = configHost;
} else {
this.host = "127.0.0.1";
}
if (configPort != null && !configPort.isEmpty()) {
this.port = Integer.parseInt(configPort);
} else {
this.port = 1935;
}
if (configApp != null && !configApp.isEmpty()) {
this.app = configApp;
} else {
this.app = "schedule";
}
// streamName = tag
this.streamName = tag;
// 从rtmp.url获取sign参数
String sign = "41db35390ddad33f83944f44b8b75ded";
String rtmpUrl = Configs.get("rtmp.url");
if (rtmpUrl != null && rtmpUrl.contains("sign=")) {
int signStart = rtmpUrl.indexOf("sign=") + 5;
int signEnd = rtmpUrl.indexOf("&", signStart);
if (signEnd == -1) signEnd = rtmpUrl.length();
String extractedSign = rtmpUrl.substring(signStart, signEnd);
if (!extractedSign.isEmpty() && !"{sign}".equals(extractedSign)) {
sign = extractedSign;
}
}
// 完整URL: rtmp://host:port/app/streamName?sign=xxx
this.tcUrl = String.format("rtmp://%s:%d/%s/%s?sign=%s", host, port, app, tag, sign);
logger.info("[{}] Netty RTMP推流器配置: host={}, port={}, app={}, stream={}, tcUrl={}",
tag, host, port, app, streamName, tcUrl);
}
/**
* 从tcUrl解析host和port
*/
private void parseTcUrl(String tcUrl) {
try {
// 移除sign参数
if (tcUrl.contains("?")) {
tcUrl = tcUrl.substring(0, tcUrl.indexOf("?"));
}
// 支持 rtmp://, rtsp://, http:// 等协议
String prefix = "://";
int prefixIndex = tcUrl.indexOf(prefix);
if (prefixIndex == -1) {
logger.warn("[{}] tcUrl没有有效的协议前缀: {}", tag, tcUrl);
return;
}
String afterPrefix = tcUrl.substring(prefixIndex + prefix.length());
// 分割host:port和path
int slashIndex = afterPrefix.indexOf("/");
String hostPort;
String path;
if (slashIndex == -1) {
hostPort = afterPrefix;
path = "";
} else {
hostPort = afterPrefix.substring(0, slashIndex);
path = afterPrefix.substring(slashIndex + 1);
}
// 解析host:port
int colonIndex = hostPort.indexOf(":");
if (colonIndex == -1) {
this.host = hostPort;
this.port = 1935;
} else {
this.host = hostPort.substring(0, colonIndex);
this.port = Integer.parseInt(hostPort.substring(colonIndex + 1));
}
// app是路径的第一部分
if (path.length() > 0) {
this.app = path;
this.streamName = tag;
this.tcUrl = String.format("rtmp://%s:%d/%s", host, port, path);
}
} catch (Exception e) {
logger.error("[{}] 解析tcUrl失败: {}", tag, e.getMessage());
}
}
@Override
public void start(String tag) {
this.tag = tag;
logger.info("[{}] ====== 启动Netty RTMP推流器 ======", tag);
logger.info("[{}] 目标服务器: {}:{}", tag, host, port);
logger.info("[{}] 推流地址: {}/{}", tag, tcUrl, streamName);
try {
// 创建 Netty RTMP 客户端
client = new NettyRtmpClient(tcUrl, app, streamName);
client.start();
connected = true;
logger.info("[{}] ====== Netty RTMP推流器启动成功 ======", tag);
} catch (Exception e) {
logger.error("[{}] Netty RTMP推流器启动异常: {}", tag, e.getMessage(), e);
connected = false;
}
}
@Override
public void sendVideoData(byte[] flvData, int timestamp) {
if (!connected || client == null) {
logger.debug("[{}] sendVideoData: not connected", tag);
return;
}
try {
// 将 byte[] 转换为 Netty ByteBuf
ByteBuf buf = Unpooled.wrappedBuffer(flvData);
client.send(buf);
logger.debug("[{}] sendVideoData: flvDataLen={}, timestamp={}", tag, flvData.length, timestamp);
} catch (Exception e) {
logger.error("[{}] 发送视频数据失败: {}", tag, e.getMessage());
connected = false;
}
}
@Override
public void sendAVCSequenceHeader() {
// Netty 版本不需要单独发送 Sequence Header
// 因为 FLV Tag 已经包含了 Sequence Header 信息
logger.debug("[{}] sendAVCSequenceHeader: 无需操作(由FLV Tag携带)", tag);
}
@Override
public void close() {
logger.info("[{}] ====== 关闭Netty RTMP推流器 ======", tag);
connected = false;
if (client != null) {
try {
client.close();
} catch (Exception e) {
logger.debug("[{}] 关闭客户端时出错: {}", tag, e.getMessage());
}
client = null;
}
logger.info("[{}] Netty RTMP推流器已关闭", tag);
}
@Override
public boolean isConnected() {
return connected && client != null;
}
@Override
public String getType() {
return PUBLISHER_TYPE;
}
@Override
public String getStatus() {
if (connected) {
return String.format("Netty RTMP推流器, 已连接, streamName=%s", streamName);
} else {
return "Netty RTMP推流器, 未连接";
}
}
}