Jtt1078AudioWebSocketServer.java 3.04 KB
package com.genersoft.iot.vmp.jtt1078.websocket;

import com.genersoft.iot.vmp.jtt1078.server.SessionManager;
import com.genersoft.iot.vmp.jtt1078.util.AudioCodecUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@ServerEndpoint("/ws/audio/{sim}/{channel}")
public class Jtt1078AudioWebSocketServer {

    private String sim;
    private int channel;
    private final AtomicInteger sequence = new AtomicInteger(0);

    @OnOpen
    public void onOpen(Session session, @PathParam("sim") String sim, @PathParam("channel") int channel) {
        this.sim = sim.replaceAll("^0+", "");
        this.channel = channel;
        Jtt1078AudioBroadcastManager.addSession(this.sim, channel, session);
        log.info("Intercom Connect: {}", this.sim);
    }

    @OnClose
    public void onClose(Session session) {
        Jtt1078AudioBroadcastManager.removeSession(sim, channel, session);
        log.info("Intercom Disconnect: {}", sim);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        Jtt1078AudioBroadcastManager.removeSession(sim, channel, session);
        log.error("Intercom Error: {}", error.getMessage());
    }

    @OnMessage
    public void onMessage(byte[] pcmData, Session session) {
        ByteBuf jt1078Packet = null;
        try {
            // 1. 校验数据
            if (pcmData == null || pcmData.length == 0) return;

            // 2. 转码
            byte[] g711Data = AudioCodecUtil.pcmToG711a(pcmData);

            // 3. 封包 (获取 ByteBuf)
            int seq = sequence.getAndIncrement();
            jt1078Packet = AudioCodecUtil.encodeJt1078AudioPacket(sim, channel, seq, g711Data);

            // 4. 发送
            String tag = sim + "-" + channel;
            Channel nettyChannel = SessionManager.getChannelByTag(tag);

            if (nettyChannel != null && nettyChannel.isActive()) {
                // writeAndFlush 会自动处理 refCnt 的释放(无论成功失败)
                // 我们不需要手动 release,除非发送操作根本没执行
                nettyChannel.writeAndFlush(jt1078Packet);
            } else {
                // 如果没发送,必须释放,防止内存泄漏
                if (jt1078Packet.refCnt() > 0) {
                    jt1078Packet.release();
                }
                if (sequence.get() % 100 == 0) { // 减少日志频次
                    log.warn("Device offline or channel missing. Tag: {}", tag);
                }
            }
        } catch (Exception e) {
            log.error("Send audio failed", e);
            // 异常发生时,如果 packet 已经创建但未发送,需要释放
            if (jt1078Packet != null && jt1078Packet.refCnt() > 0) {
                try { jt1078Packet.release(); } catch (Exception ex) {}
            }
        }
    }
}