PublishManager.java
4.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.genersoft.iot.vmp.jt1078.publisher;
import com.genersoft.iot.vmp.jt1078.entity.Media;
import com.genersoft.iot.vmp.jt1078.entity.Media.Type;
import com.genersoft.iot.vmp.jt1078.server.Jtt1078Handler;
import com.genersoft.iot.vmp.jt1078.subscriber.Subscriber;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class PublishManager {
static Logger logger = LoggerFactory.getLogger(PublishManager.class);
ConcurrentHashMap<String, Channel> channels = new ConcurrentHashMap();
static final PublishManager instance = new PublishManager();
private Integer httpPort;
private PublishManager() {
}
public Subscriber subscribe(String tag, Media.Type type, ChannelHandlerContext ctx, Integer httpPort) {
Channel chl = (Channel)this.channels.get(tag);
if (chl == null) {
chl = new Channel(tag, httpPort);
this.channels.put(tag, chl);
}
this.httpPort = httpPort;
Subscriber subscriber = null;
if (type.equals(Type.Video)) {
subscriber = chl.subscribe(ctx);
subscriber.setName("subscriber-" + tag + "-" + subscriber.getId());
subscriber.start();
return subscriber;
} else {
throw new RuntimeException("unknown media type: " + type);
}
}
public void publishAudio(String tag, int sequence, long timestamp, int payloadType, byte[] data) {
Channel chl = (Channel)this.channels.get(tag);
if (chl != null) {
chl.writeAudio(timestamp, payloadType, data);
}
}
public void publishVideo(String tag, int sequence, long timestamp, int payloadType, byte[] data) {
int length = data.length;
StringBuilder builder = new StringBuilder();
for(int i = 0; i < length; ++i) {
builder.append(this.valu(data, i));
}
Channel chl = (Channel)this.channels.get(tag);
if (chl != null) {
chl.writeVideo((long)sequence, timestamp, payloadType, data);
}
}
public String valu(byte[] data, int index) {
byte val = data[index++];
int ch1 = val >> 4 & 15;
int ch2 = val & 15;
return ch1 + "" + ch2;
}
public Channel open(String tag, Integer httpPort) {
Channel chl = (Channel)this.channels.get(tag);
if (chl == null) {
chl = new Channel(tag, httpPort);
this.channels.put(tag, chl);
}
logger.info("Thread id:[{}]", Thread.currentThread().getId());
if (chl.isPublishing()) {
throw new RuntimeException("channel already publishing");
} else {
return chl;
}
}
public void close(String tag) {
Channel chl = (Channel)this.channels.remove(tag);
if (chl != null) {
chl.close();
}
}
public void unsubscribe(String tag, long watcherId) {
Channel chl = (Channel)this.channels.get(tag);
if (chl != null) {
chl.unsubscribe(watcherId);
}
logger.info("unsubscribe: {} - {}", tag, watcherId);
}
public void unsubscribeAndClose(String tag) {
try {
Channel chl = (Channel)this.channels.get(tag);
if (chl != null) {
long watcherId = chl.getWatcherId(tag);
this.unsubscribe(tag, watcherId);
}
} catch (Exception var6) {
logger.error("unsubscribeAndClose unsubscribe error;[{}]", tag);
}
try {
this.close(tag);
} catch (Exception var5) {
logger.error("unsubscribeAndClose close error;[{}]", tag);
}
}
public static void init() {
}
public static PublishManager getInstance() {
return instance;
}
private void createChannel(String tag, String tagMapping) {
Channel chl = (Channel)this.channels.get(tag);
}
}