NettyRtmpClient.java
3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.genersoft.iot.vmp.jtt1078.rtmp;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Netty RTMP 客户端
*/
public class NettyRtmpClient {
private static final Logger logger = LoggerFactory.getLogger(NettyRtmpClient.class);
// 共享线程池
private static final EventLoopGroup sharedGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
private Channel channel;
private final String rtmpUrl;
private final String app;
private final String stream;
// 增加一个状态标记,只有握手完成才能发送数据
private volatile boolean isReady = false;
public NettyRtmpClient(String rtmpUrl, String app, String stream) {
this.rtmpUrl = rtmpUrl;
this.app = app;
this.stream = stream;
}
public void start() {
// 1. 解析 RTMP URL 获取 IP 和 端口
URI uri;
try {
// Java URI 解析 rtmp:// 有时会报错,简单处理去掉 scheme
String tempUrl = rtmpUrl.replace("rtmp://", "http://");
uri = new URI(tempUrl);
} catch (URISyntaxException e) {
logger.error("[{}] RTMP URL 格式错误: {}", stream, rtmpUrl);
return;
}
String host = uri.getHost();
int port = uri.getPort();
if (port == -1) port = 1935; // 默认 RTMP 端口
Bootstrap b = new Bootstrap();
b.group(sharedGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 算法,降低延迟
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 传入回调,当 Handler 状态变为 STREAMING 时通知 Client
RtmpHandshakeHandler handler = new RtmpHandshakeHandler(app, rtmpUrl, stream);
handler.setOnReadyListener(() -> isReady = true);
ch.pipeline().addLast(handler);
}
});
logger.info("[{}] 正在连接 RTMP 服务器: {}:{}", stream, host, port);
b.connect(host, port).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
this.channel = future.channel();
logger.info("[{}] RTMP TCP 连接成功: {}", stream, rtmpUrl);
} else {
logger.error("[{}] RTMP 连接失败: {}", stream, rtmpUrl, future.cause());
}
});
}
public void send(ByteBuf flvTag) {
// 【关键修复】
// 1. 必须 channel active
// 2. 必须 isReady (RTMP 握手已完成)
// 否则直接丢弃包并释放内存,绝对不能发给服务器!
if (channel != null && channel.isActive() && isReady) {
channel.writeAndFlush(flvTag);
} else {
// 如果还没握手完成就发包,服务器会报错非法 BodySize
// 必须释放 flvTag,否则内存泄漏
if (flvTag.refCnt() > 0) {
flvTag.release();
}
}
}
public void close() {
if (channel != null) {
channel.close();
}
}
}