Channel.java
5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package com.genersoft.iot.vmp.jtt1078.publisher;
import com.genersoft.iot.vmp.jtt1078.codec.AudioCodec;
import com.genersoft.iot.vmp.jtt1078.entity.Media;
import com.genersoft.iot.vmp.jtt1078.entity.MediaEncoding;
import com.genersoft.iot.vmp.jtt1078.flv.FlvEncoder;
// [恢复] 引用 FFmpeg 进程管理类
import com.genersoft.iot.vmp.jtt1078.subscriber.RTMPPublisher;
import com.genersoft.iot.vmp.jtt1078.subscriber.Subscriber;
import com.genersoft.iot.vmp.jtt1078.subscriber.VideoSubscriber;
import com.genersoft.iot.vmp.jtt1078.util.ByteHolder;
import com.genersoft.iot.vmp.jtt1078.util.Configs;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Channel
{
static Logger logger = LoggerFactory.getLogger(Channel.class);
ConcurrentLinkedQueue<Subscriber> subscribers;
// [恢复] FFmpeg 推流进程管理器
RTMPPublisher rtmpPublisher;
// [删除] ZlmRtpPublisher rtpPublisher;
String tag;
boolean publishing;
ByteHolder buffer;
AudioCodec audioCodec;
FlvEncoder flvEncoder;
private long firstTimestamp = -1;
public Channel(String tag)
{
this.tag = tag;
this.subscribers = new ConcurrentLinkedQueue<>();
this.flvEncoder = new FlvEncoder(true, true);
this.buffer = new ByteHolder(2048 * 100);
// [恢复] 启动 FFmpeg 进程
// 只要配置了 rtmp.url,就启动 FFmpeg 去拉取当前的 HTTP 流并转码推送
String rtmpUrl = Configs.get("rtmp.url");
if (StringUtils.isNotBlank(rtmpUrl))
{
logger.info("[{}] 启动 FFmpeg 进程推流至: {}", tag, rtmpUrl);
rtmpPublisher = new RTMPPublisher(tag);
rtmpPublisher.start();
}
}
public boolean isPublishing()
{
return publishing;
}
public Subscriber subscribe(ChannelHandlerContext ctx)
{
Subscriber subscriber = new VideoSubscriber(this.tag, ctx);
this.subscribers.add(subscriber);
return subscriber;
}
public void writeAudio(long timestamp, int pt, byte[] data)
{
// 1. 转码为 PCM (用于 FLV 封装,FFmpeg 会从 FLV 中读取 PCM 并转码为 AAC)
if (audioCodec == null)
{
audioCodec = AudioCodec.getCodec(pt);
logger.info("audio codec: {}", MediaEncoding.getEncoding(Media.Type.Audio, pt));
}
// 写入到内部广播,FFmpeg 通过 HTTP 拉取这个数据
broadcastAudio(timestamp, audioCodec.toPCM(data));
// [删除] rtpPublisher.sendAudio(...)
}
public void writeVideo(long sequence, long timeoffset, int payloadType, byte[] h264)
{
if (firstTimestamp == -1) firstTimestamp = timeoffset;
this.publishing = true;
this.buffer.write(h264);
while (true)
{
byte[] nalu = readNalu();
if (nalu == null) break;
if (nalu.length < 4) continue;
// 1. 封装为 FLV Tag (必须)
// FFmpeg 通过 HTTP 读取这些 FLV Tag
byte[] flvTag = this.flvEncoder.write(nalu, (int) (timeoffset - firstTimestamp));
if (flvTag == null) continue;
// 广播给所有的观众
broadcastVideo(timeoffset, flvTag);
}
}
public void broadcastVideo(long timeoffset, byte[] flvTag)
{
for (Subscriber subscriber : subscribers)
{
subscriber.onVideoData(timeoffset, flvTag, flvEncoder);
}
}
public void broadcastAudio(long timeoffset, byte[] flvTag)
{
for (Subscriber subscriber : subscribers)
{
subscriber.onAudioData(timeoffset, flvTag, flvEncoder);
}
}
public void unsubscribe(long watcherId)
{
for (Iterator<Subscriber> itr = subscribers.iterator(); itr.hasNext(); )
{
Subscriber subscriber = itr.next();
if (subscriber.getId() == watcherId)
{
itr.remove();
subscriber.close();
return;
}
}
}
public void close()
{
for (Iterator<Subscriber> itr = subscribers.iterator(); itr.hasNext(); )
{
Subscriber subscriber = itr.next();
subscriber.close();
itr.remove();
}
// [恢复] 关闭 FFmpeg 进程
if (rtmpPublisher != null) {
rtmpPublisher.close();
rtmpPublisher = null;
}
}
// [恢复] 原版 readNalu (FFmpeg 偏好带 StartCode 的数据,或者 FlvEncoder 需要)
// 之前为了 RTP 特意修改了切片逻辑,现在改回原版简单逻辑即可
private byte[] readNalu()
{
// 寻找 00 00 00 01
for (int i = 0; i < buffer.size() - 3; i++)
{
int a = buffer.get(i + 0) & 0xff;
int b = buffer.get(i + 1) & 0xff;
int c = buffer.get(i + 2) & 0xff;
int d = buffer.get(i + 3) & 0xff;
if (a == 0x00 && b == 0x00 && c == 0x00 && d == 0x01)
{
if (i == 0) continue;
byte[] nalu = new byte[i];
buffer.sliceInto(nalu, i);
return nalu;
}
}
return null;
}
}