RtmpChunkWriter.java
14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
package com.genersoft.iot.vmp.jtt1078.rtmp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* FLV数据到RTMP消息的转换器
*
* 功能:
* 1. 解析FLV Video Tag,提取H.264 NAL单元
* 2. 将FLV时间戳转换为RTMP时间戳
* 3. 处理SPS/PPS序列头
* 4. 处理I帧和P帧
*
* FLV Video Tag格式:
* [PreviousTagSize(4)][Tag Type(1)][DataSize(3)][Timestamp(3)][TimestampExt(1)][StreamID(3)][VideoData]
*
* VideoData格式(H.264):
* [FrameType(1)][AVCPacketType(1)][CompositionTime(3)][NALUs]
*
* - FrameType: 0x17=I帧, 0x27=P/B帧
* - AVCPacketType: 0x00=AVC Sequence Header, 0x01=NALU
* - CompositionTime: pts - dts偏移
*/
public class RtmpChunkWriter {
static Logger logger = LoggerFactory.getLogger(RtmpChunkWriter.class);
private RtmpConnection connection;
private String tag;
// 时间戳
private int lastVideoTimestamp = 0;
private int lastAudioTimestamp = 0;
// 关键标记
private boolean hasSentAVCSequenceHeader = false;
private byte[] lastSPS = null;
private byte[] lastPPS = null;
// FLV Tag类型常量
private static final byte FLV_TAG_TYPE_VIDEO = 9;
private static final byte FLV_TAG_TYPE_AUDIO = 8;
// AVC NAL单元类型
private static final int NAL_UNIT_TYPE_SPS = 7;
private static final int NAL_UNIT_TYPE_PPS = 8;
private static final int NAL_UNIT_TYPE_IDR = 5;
private static final int NAL_UNIT_TYPE_NON_IDR = 1;
public RtmpChunkWriter(RtmpConnection connection, String tag) {
this.connection = connection;
this.tag = tag;
}
/**
* 发送FLV视频数据
*
* @param flvData 完整的FLV Video Tag数据
* @param timestamp FLV中的时间戳
*/
public void sendVideoData(byte[] flvData, int timestamp) throws IOException {
if (flvData == null || flvData.length < 16) {
return;
}
// 跳过PreviousTagSize (4字节)
int offset = 4;
// 检查Tag Type
if (flvData[offset] != FLV_TAG_TYPE_VIDEO) {
return;
}
offset++;
// 读取DataSize (3字节, big-endian)
int dataSize = ((flvData[offset] & 0xFF) << 16) |
((flvData[offset + 1] & 0xFF) << 8) |
(flvData[offset + 2] & 0xFF);
offset += 3;
// 跳过Timestamp (3字节) + TimestampExt (1字节)
offset += 4;
// 跳过StreamID (3字节)
offset += 3;
// 现在offset指向VideoData
if (offset >= flvData.length) {
return;
}
byte videoInfo = flvData[offset]; // FrameType + CodecID
offset++;
byte avcPacketType = flvData[offset]; // AVCPacketType
offset++;
// 读取CompositionTime (3字节)
int compositionTime = ((flvData[offset] & 0xFF) << 16) |
((flvData[offset + 1] & 0xFF) << 8) |
(flvData[offset + 2] & 0xFF);
offset += 3;
// 计算RTMP时间戳
// FLV timestamp单位是毫秒,RTMP也是毫秒
int rtmpTimestamp = timestamp & 0xFFFFFF; // 24位时间戳
lastVideoTimestamp = rtmpTimestamp;
if (avcPacketType == 0x00) {
// AVC Sequence Header (SPS/PPS)
logger.info("[{}] 收到AVC Sequence Header (SPS/PPS), offset={}, remaining={}", tag, offset, flvData.length - offset);
handleAVCSequenceHeader(flvData, offset);
} else if (avcPacketType == 0x01) {
// NALU
logger.info("[{}] 收到NALU数据, timestamp={}, offset={}, remaining={}", tag, rtmpTimestamp, offset, flvData.length - offset);
handleNALU(flvData, offset, rtmpTimestamp);
} else {
logger.warn("[{}] 未知AVCPacketType: 0x{}, 数据长度={}", tag, String.format("%02X", avcPacketType), flvData.length);
}
}
/**
* 处理AVC Sequence Header
*
* FLV中封装的AVCDecoderConfigurationRecord:
* [configurationVersion(1)][AVCProfileIndication(1)][profile_compatibility(1)][AVCLevelIndication(1)]
* [lengthSizeMinusOne(1)][numOfSequenceParameterSets(1)][SPS length(2)][SPS data][numOfPictureParameterSets(1)][PPS length(2)][PPS data]
*/
private void handleAVCSequenceHeader(byte[] data, int offset) {
try {
// 解析AVCDecoderConfigurationRecord
int pos = offset;
byte configurationVersion = data[pos++];
byte AVCProfileIndication = data[pos++];
byte profileCompatibility = data[pos++];
byte AVCLevelIndication = data[pos++];
byte lengthSizeMinusOne = data[pos++]; // 通常是3,表示NALU长度占4字节
// SPS
byte numOfSequenceParameterSets = data[pos++];
int spsLength = ((data[pos] & 0xFF) << 8) | (data[pos + 1] & 0xFF);
pos += 2;
lastSPS = new byte[spsLength];
System.arraycopy(data, pos, lastSPS, 0, spsLength);
pos += spsLength;
// PPS
byte numOfPictureParameterSets = data[pos++];
int ppsLength = ((data[pos] & 0xFF) << 8) | (data[pos + 1] & 0xFF);
pos += 2;
lastPPS = new byte[ppsLength];
System.arraycopy(data, pos, lastPPS, 0, ppsLength);
hasSentAVCSequenceHeader = false; // 重置,强制发送新的
// 调试日志:打印SPS/PPS的十六进制
StringBuilder spsHex = new StringBuilder();
for (int i = 0; i < Math.min(lastSPS.length, 20); i++) {
spsHex.append(String.format("%02X ", lastSPS[i] & 0xFF));
}
StringBuilder ppsHex = new StringBuilder();
for (int i = 0; i < Math.min(lastPPS.length, 10); i++) {
ppsHex.append(String.format("%02X ", lastPPS[i] & 0xFF));
}
logger.info("[{}] 解析到AVC Sequence Header: SPS长度={}, PPS长度={}, SPS前20字节=[{}], PPS前10字节=[{}], profile={}, level={}",
tag, spsLength, ppsLength, spsHex.toString(), ppsHex.toString(),
String.format("0x%02X", AVCProfileIndication), String.format("0x%02X", AVCLevelIndication));
} catch (Exception e) {
logger.error("[{}] 解析AVC Sequence Header失败: {}", tag, e.getMessage());
}
}
/**
* 处理NALU数据
*/
private void handleNALU(byte[] data, int offset, int timestamp) throws IOException {
if (lastSPS == null || lastPPS == null) {
logger.warn("[{}] 还未收到SPS/PPS,跳过NALU. lastSPS={}, lastPPS={}", tag,
lastSPS != null ? "set(" + lastSPS.length + ")" : "null",
lastPPS != null ? "set(" + lastPPS.length + ")" : "null");
return;
}
// 发送AVC Sequence Header(如果还没发送)
if (!hasSentAVCSequenceHeader) {
sendAVCSequenceHeaderPacket();
hasSentAVCSequenceHeader = true;
}
// 解析NALU
// FLV中NALU格式: [NALU长度(4字节)][NALU数据...]
int pos = offset;
int nalType = -1;
while (pos + 4 < data.length) {
// 读取NALU长度
int nalLength = ((data[pos] & 0xFF) << 24) |
((data[pos + 1] & 0xFF) << 16) |
((data[pos + 2] & 0xFF) << 8) |
(data[pos + 3] & 0xFF);
pos += 4;
if (pos + nalLength > data.length) {
break;
}
nalType = data[pos] & 0x1F;
// 构建RTMP视频数据
// 格式: [FrameType(1)][AVCPacketType(1)][CompositionTime(3)][NALU]
byte[] rtmpData = new byte[5 + nalLength];
rtmpData[0] = (byte) (nalType == NAL_UNIT_TYPE_IDR ? 0x17 : 0x27); // I帧或P帧
rtmpData[1] = 0x01; // AVCPacketType = NALU
rtmpData[2] = 0x00; // CompositionTime
rtmpData[3] = 0x00;
rtmpData[4] = 0x00;
// 复制NALU数据
System.arraycopy(data, pos, rtmpData, 5, nalLength);
// 打印NALU的十六进制便于调试
StringBuilder hex = new StringBuilder();
for (int i = 0; i < Math.min(nalLength, 20); i++) {
hex.append(String.format("%02X ", rtmpData[5 + i] & 0xFF));
}
if (nalLength > 20) {
hex.append("...(").append(nalLength).append(" bytes total)");
}
logger.info("[{}] 发送NALU到RTMP: nalType=0x{}, timestamp={}, nalLength={}, firstBytes={}", tag,
Integer.toHexString(nalType), timestamp, nalLength, hex.toString());
// 发送到RTMP
connection.sendVideoData(rtmpData, timestamp);
pos += nalLength;
}
}
/**
* 发送AVC Sequence Header到RTMP
* 这是实现无缝码流切换的关键!
*
* 注意:lastSPS和lastPPS是从FLV数据中提取的,不包含start code
* FLV中的SPS格式: [profile_idc(1)][constraint(1)][level_idc(1)][...]
* 实际H.264 SPS (Annex B): [00 00 00 01][67][profile_idc][constraint][level_idc][...]
*/
private void sendAVCSequenceHeaderPacket() throws IOException {
if (lastSPS == null || lastPPS == null) {
logger.warn("[{}] SPS/PPS为空,无法发送Sequence Header", tag);
return;
}
// 构建AVCDecoderConfigurationRecord
// 这个格式和FLV中的略有不同,需要转换
int recordLength = 11 + lastSPS.length + lastPPS.length;
byte[] configRecord = new byte[recordLength];
int pos = 0;
configRecord[pos++] = 0x01; // configurationVersion
// SPS是从FLV提取的,没有start code
// FLV SPS: [profile_idc(1)][constraint(1)][level_idc(1)][...]
// 所以 profile_idc 在 lastSPS[0], constraint 在 lastSPS[1], level 在 lastSPS[2]
configRecord[pos++] = lastSPS.length > 0 ? lastSPS[0] : 0x64; // AVCProfileIndication
configRecord[pos++] = lastSPS.length > 1 ? lastSPS[1] : 0x00; // profile_compatibility
configRecord[pos++] = lastSPS.length > 2 ? lastSPS[2] : 0x1F; // AVCLevelIndication
configRecord[pos++] = (byte) 0xFF; // lengthSizeMinusOne = 3 (NAL长度占4字节)
// SPS
configRecord[pos++] = (byte) 0xE1; // numOfSequenceParameterSets = 1
configRecord[pos++] = (byte) ((lastSPS.length >> 8) & 0xFF); // SPS length high
configRecord[pos++] = (byte) (lastSPS.length & 0xFF); // SPS length low
System.arraycopy(lastSPS, 0, configRecord, pos, lastSPS.length);
pos += lastSPS.length;
// PPS (同样没有start code)
configRecord[pos++] = 0x01; // numOfPictureParameterSets = 1
configRecord[pos++] = (byte) ((lastPPS.length >> 8) & 0xFF); // PPS length high
configRecord[pos++] = (byte) (lastPPS.length & 0xFF); // PPS length low
System.arraycopy(lastPPS, 0, configRecord, pos, lastPPS.length);
// 构建RTMP视频数据
// [FrameType(1)][AVCPacketType(1)][CompositionTime(3)][ConfigRecord]
byte[] rtmpData = new byte[5 + recordLength];
rtmpData[0] = 0x17; // AVC sequence header (I-frame)
rtmpData[1] = 0x00; // AVCPacketType = AVC sequence header
rtmpData[2] = 0x00;
rtmpData[3] = 0x00;
rtmpData[4] = 0x00;
System.arraycopy(configRecord, 0, rtmpData, 5, recordLength);
// 打印Sequence Header的十六进制便于调试
StringBuilder hex = new StringBuilder();
for (int i = 0; i < Math.min(rtmpData.length, 64); i++) {
hex.append(String.format("%02X ", rtmpData[i] & 0xFF));
}
if (rtmpData.length > 64) {
hex.append("...(").append(rtmpData.length).append(" bytes total)");
}
logger.info("[{}] 发送AVC Sequence Header到RTMP, 数据长度={}, hex={}", tag, rtmpData.length, hex.toString());
logger.info("[{}] SPS profile/level: configRecord[1]={}, configRecord[3]={}",
tag, String.format("0x%02X", configRecord[1]), String.format("0x%02X", configRecord[3]));
// 发送到RTMP
connection.sendVideoData(rtmpData, 0);
}
/**
* 强制发送新的AVC Sequence Header
* 用于码流切换时立即发送新的SPS/PPS
*/
public void forceSendAVCSequenceHeader() {
hasSentAVCSequenceHeader = false;
}
/**
* 更新SPS/PPS(当检测到码流变化时调用)
*/
public void updateSPSPPS(byte[] sps, byte[] pps) {
this.lastSPS = sps;
this.lastPPS = pps;
this.hasSentAVCSequenceHeader = false; // 重置,强制发送新的
logger.info("[{}] 更新SPS/PPS: SPS长度={}, PPS长度={}", tag,
sps != null ? sps.length : 0, pps != null ? pps.length : 0);
}
/**
* 发送FLV音频数据
*/
public void sendAudioData(byte[] flvData, int timestamp) throws IOException {
if (flvData == null || flvData.length < 16) {
return;
}
// 跳过PreviousTagSize (4字节)
int offset = 4;
// 检查Tag Type
if (flvData[offset] != FLV_TAG_TYPE_AUDIO) {
return;
}
offset++;
// 读取DataSize (3字节)
int dataSize = ((flvData[offset] & 0xFF) << 16) |
((flvData[offset + 1] & 0xFF) << 8) |
(flvData[offset + 2] & 0xFF);
offset += 3;
// 跳过Timestamp (3字节) + TimestampExt (1字节)
offset += 4;
// 跳过StreamID (3字节)
offset += 3;
// 提取音频数据
int audioDataSize = dataSize - 1; // 减去audioInfo的1字节
if (offset + audioDataSize > flvData.length) {
return;
}
byte[] audioData = new byte[audioDataSize];
System.arraycopy(flvData, offset + 1, audioData, 0, audioDataSize); // +1 跳过audioInfo
lastAudioTimestamp = timestamp & 0xFFFFFF;
connection.sendAudioData(audioData, lastAudioTimestamp);
}
public boolean hasSentAVCSequenceHeader() {
return hasSentAVCSequenceHeader;
}
}