Subscriber.java
2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package cn.org.hentai.jtt1078.subscriber;
import cn.org.hentai.jtt1078.flv.FlvEncoder;
import cn.org.hentai.jtt1078.util.Packet;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by matrixy on 2020/1/11.
*/
public abstract class Subscriber extends Thread
{
static Logger logger = LoggerFactory.getLogger(Subscriber.class);
static final AtomicLong SEQUENCE = new AtomicLong(0L);
private long id;
private String tag;
private Object lock;
private ChannelHandlerContext context;
protected LinkedList<byte[]> messages;
public Subscriber(String tag, ChannelHandlerContext ctx)
{
this.tag = tag;
this.context = ctx;
this.lock = new Object();
this.messages = new LinkedList<byte[]>();
this.id = SEQUENCE.getAndAdd(1L);
}
public long getId()
{
return this.id;
}
public String getTag()
{
return this.tag;
}
public abstract void onVideoData(long timeoffset, byte[] data, FlvEncoder flvEncoder);
public abstract void onAudioData(long timeoffset, byte[] data, FlvEncoder flvEncoder);
public void enqueue(byte[] data)
{
if (data == null) return;
synchronized (lock)
{
messages.addLast(data);
lock.notify();
}
}
public void run()
{
loop : while (!this.isInterrupted())
{
try
{
byte[] data = take();
if (data != null) send(data).await();
}
catch(Exception ex)
{
//销毁线程时,如果有锁wait就不会销毁线程,抛出InterruptedException异常
if (ex instanceof InterruptedException)
{
break loop;
}
logger.error("send failed", ex);
}
}
logger.info("subscriber closed");
}
protected byte[] take()
{
byte[] data = null;
try
{
synchronized (lock)
{
while (messages.isEmpty())
{
lock.wait(100);
if (this.isInterrupted()) return null;
}
data = messages.removeFirst();
}
return data;
}
catch(Exception ex)
{
this.interrupt();
return null;
}
}
public void close()
{
this.interrupt();
}
public ChannelFuture send(byte[] message)
{
return context.writeAndFlush(message);
}
}