Jtt1078AudioWebSocketServer.java
3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.genersoft.iot.vmp.jtt1078.websocket;
import com.genersoft.iot.vmp.jtt1078.server.SessionManager;
import com.genersoft.iot.vmp.jtt1078.util.AudioCodecUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
@ServerEndpoint("/ws/audio/{sim}/{channel}")
public class Jtt1078AudioWebSocketServer {
private String sim;
private int channel;
private final AtomicInteger sequence = new AtomicInteger(0);
@OnOpen
public void onOpen(Session session, @PathParam("sim") String sim, @PathParam("channel") int channel) {
this.sim = sim.replaceAll("^0+", "");
this.channel = channel;
Jtt1078AudioBroadcastManager.addSession(this.sim, channel, session);
log.info("Intercom Connect: {}", this.sim);
}
@OnClose
public void onClose(Session session) {
Jtt1078AudioBroadcastManager.removeSession(sim, channel, session);
log.info("Intercom Disconnect: {}", sim);
}
@OnError
public void onError(Session session, Throwable error) {
Jtt1078AudioBroadcastManager.removeSession(sim, channel, session);
log.error("Intercom Error: {}", error.getMessage());
}
@OnMessage
public void onMessage(byte[] pcmData, Session session) {
ByteBuf jt1078Packet = null;
try {
// 1. 校验数据
if (pcmData == null || pcmData.length == 0) return;
// 2. 转码
byte[] g711Data = AudioCodecUtil.pcmToG711a(pcmData);
// 3. 封包 (获取 ByteBuf)
int seq = sequence.getAndIncrement();
jt1078Packet = AudioCodecUtil.encodeJt1078AudioPacket(sim, channel, seq, g711Data);
// 4. 发送
String tag = sim + "-" + channel;
Channel nettyChannel = SessionManager.getChannelByTag(tag);
if (nettyChannel != null && nettyChannel.isActive()) {
// writeAndFlush 会自动处理 refCnt 的释放(无论成功失败)
// 我们不需要手动 release,除非发送操作根本没执行
nettyChannel.writeAndFlush(jt1078Packet);
} else {
// 如果没发送,必须释放,防止内存泄漏
if (jt1078Packet.refCnt() > 0) {
jt1078Packet.release();
}
if (sequence.get() % 100 == 0) { // 减少日志频次
log.warn("Device offline or channel missing. Tag: {}", tag);
}
}
} catch (Exception e) {
log.error("Send audio failed", e);
// 异常发生时,如果 packet 已经创建但未发送,需要释放
if (jt1078Packet != null && jt1078Packet.refCnt() > 0) {
try { jt1078Packet.release(); } catch (Exception ex) {}
}
}
}
}