ZlmRtpPublisher.java
5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.genersoft.iot.vmp.jtt1078.subscriber;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.genersoft.iot.vmp.jtt1078.util.Configs;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
public class ZlmRtpPublisher {
private static final Logger logger = LoggerFactory.getLogger(ZlmRtpPublisher.class);
private static final EventLoopGroup group = new NioEventLoopGroup();
private Channel nettyChannel;
private InetSocketAddress zlmRtpAddress;
private String streamId;
// [核心修复1] 音视频分离的状态
private int audioSsrc;
private int videoSsrc;
private int audioSeq = 0;
private int videoSeq = 0;
public ZlmRtpPublisher(String streamId) {
this.streamId = streamId;
}
public void start() {
try {
String zlmHost = Configs.get("zlm.host");
int zlmHttpPort = Configs.getInt("zlm.http.port", 80);
String zlmSecret = Configs.get("zlm.secret");
// 生成 SSRC: 视频用 Hash,音频用 Hash+1,确保不冲突
int baseSsrc = (streamId.hashCode() & 0x7FFFFFFF);
this.videoSsrc = baseSsrc;
this.audioSsrc = baseSsrc + 1;
// 申请端口
String url = String.format("http://%s:%d/index/api/openRtpServer?secret=%s&stream_id=%s&ssrc=%s&port=0&enable_tcp=0&app=schedule",
zlmHost, zlmHttpPort, zlmSecret, streamId, videoSsrc);
logger.info("[{}] 申请 RTP 端口: {}", streamId, url);
String response = HttpUtil.get(url);
JSONObject json = JSONUtil.parseObj(response);
if (json.getInt("code") != 0) {
throw new RuntimeException("ZLM openRtpServer error: " + response);
}
int rtpPort = json.getInt("port");
this.zlmRtpAddress = new InetSocketAddress(zlmHost, rtpPort);
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_SNDBUF, 1024 * 1024)
.handler(new io.netty.channel.ChannelHandlerAdapter() {});
this.nettyChannel = b.bind(0).sync().channel();
logger.info("[{}] RTP 推流就绪 (UDP) -> {}:{}", streamId, zlmHost, rtpPort);
} catch (Exception e) {
logger.error("[{}] 启动 RTP 推流失败", streamId, e);
}
}
public void sendVideo(long timestampMs, byte[] nalu) {
if (nettyChannel == null || zlmRtpAddress == null) return;
// [核心修复2] 智能剔除 H.264 Start Code (00 00 00 01 或 00 00 01)
int offset = 0;
if (nalu.length > 4 && nalu[0] == 0 && nalu[1] == 0 && nalu[2] == 0 && nalu[3] == 1) {
offset = 4;
} else if (nalu.length > 3 && nalu[0] == 0 && nalu[1] == 0 && nalu[2] == 1) {
offset = 3;
}
int length = nalu.length - offset;
if (length <= 0) return; // 空包不发
long rtpTimestamp = timestampMs * 90; // 90kHz
int maxPacketSize = 1400;
if (length <= maxPacketSize) {
// 单包
ByteBuf packet = createRtpPacket(rtpTimestamp, true, 96, videoSsrc, videoSeq++);
packet.writeBytes(nalu, offset, length);
sendRtp(packet);
} else {
// FU-A 分片
byte naluHeader = nalu[offset];
int nri = naluHeader & 0x60;
int type = naluHeader & 0x1f;
offset += 1;
length -= 1;
boolean first = true;
while (length > 0) {
int currLen = Math.min(length, maxPacketSize);
boolean last = (length == currLen);
// 只有最后一包打 Marker
ByteBuf packet = createRtpPacket(rtpTimestamp, last, 96, videoSsrc, videoSeq++);
packet.writeByte(nri | 28); // FU Indicator
int fuHeader = type;
if (first) fuHeader |= 0x80;
if (last) fuHeader |= 0x40;
packet.writeByte(fuHeader); // FU Header
packet.writeBytes(nalu, offset, currLen);
sendRtp(packet);
offset += currLen;
length -= currLen;
first = false;
}
}
}
public void sendAudio(long timestampMs, byte[] data) {
if (nettyChannel == null || zlmRtpAddress == null) return;
// 音频必须使用独立的序列号 audioSeq
// RTP 音频时钟 8kHz
long rtpTimestamp = timestampMs * 8;
ByteBuf packet = createRtpPacket(rtpTimestamp, true, 8, audioSsrc, audioSeq++);
packet.writeBytes(data);
sendRtp(packet);
}
// 统一封装 RTP Header
private ByteBuf createRtpPacket(long timestamp, boolean marker, int pt, int ssrc, int seqNum) {
ByteBuf buf = Unpooled.buffer(1500);
buf.writeByte(0x80); // V=2
buf.writeByte((marker ? 0x80 : 0x00) | pt); // M | PT
buf.writeShort(seqNum); // Sequence
buf.writeInt((int) timestamp); // Timestamp
buf.writeInt(ssrc); // SSRC
return buf;
}
private void sendRtp(ByteBuf packet) {
nettyChannel.writeAndFlush(new DatagramPacket(packet, zlmRtpAddress));
}
public void close() {
if (nettyChannel != null) nettyChannel.close();
}
}