Jtt1078Handler.java 9.62 KB
package com.genersoft.iot.vmp.jtt1078.server;

import com.genersoft.iot.vmp.VManageBootstrap;
import com.genersoft.iot.vmp.jtt1078.publisher.Channel;
import com.genersoft.iot.vmp.jtt1078.publisher.PublishManager;
import com.genersoft.iot.vmp.jtt1078.util.Packet;
import com.genersoft.iot.vmp.jtt1078.websocket.Jtt1078AudioBroadcastManager;
import com.genersoft.iot.vmp.vmanager.jt1078.platform.Jt1078OfCarController;
import com.genersoft.iot.vmp.vmanager.jt1078.platform.config.DataBuffer;
import com.genersoft.iot.vmp.vmanager.jt1078.platform.domain.SimFlow;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class Jtt1078Handler extends SimpleChannelInboundHandler<Packet> {

    private static final Logger logger = LoggerFactory.getLogger(Jtt1078Handler.class);
    private static DataBuffer simFlowDataBuffer;
    private static StringRedisTemplate redisTemplate;

    private Integer port;
    private String currentTag;      // FFmpeg用 (可能带端口后缀)
    private String currentSim;      // 纯SIM (日志用)
    private int currentChannelId;   // 当前通道号

    private long lastStatTime = 0;
    private long currentSecondFlow = 0;
    private int currentSecondCount = 0;
    private long lastRedisKeepAliveTime = 0;

    public Jtt1078Handler(Integer port) {
        this.port = port;
    }
    public Jtt1078Handler() {}

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        if (redisTemplate == null) redisTemplate = VManageBootstrap.getBean(StringRedisTemplate.class);
        if (simFlowDataBuffer == null) simFlowDataBuffer = VManageBootstrap.getBean(DataBuffer.class);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
        io.netty.channel.Channel nettyChannel = ctx.channel();

        // 1. 协议解析
        packet.seek(8);
        String sim = packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD();
        int channel = packet.nextByte() & 0xff;

        String rawSim = sim;
        // 生成标准 Key: 138000-1 (去除前导0)
        String standardTag = rawSim.replaceAll("^0+", "") + "-" + channel;
        String tag = standardTag;

        this.currentSim = rawSim.replaceAll("^0+", "");
        this.currentChannelId = channel;

        // 2. 端口过滤与重命名 (保持原有逻辑,处理多端口映射)
        if (port != null) {
            Set<String> set = Jt1078OfCarController.map.get(port);
            String findSet = Jt1078OfCarController.getFindSet(set, tag);
            if (findSet != null) {
                tag = findSet + "_" + port; // tag 变成了 138000-1_1078
            } else {
                return;
            }
        }

        this.currentTag = tag;

        // 3. Redis保活
        long now = System.currentTimeMillis();
        if (now - lastRedisKeepAliveTime > 5000) {
            String redisKey = "tag:" + tag;
            if (Boolean.TRUE.equals(redisTemplate.hasKey(redisKey))) {
                redisTemplate.expire(redisKey, 60, TimeUnit.SECONDS);
            } else {
                redisTemplate.opsForSet().add(redisKey, tag);
                redisTemplate.expire(redisKey, 60, TimeUnit.SECONDS);
                logger.info("[{}] Stream Online", tag);
            }
            lastRedisKeepAliveTime = now;
        }

        // 4. 流量统计
        long currentSecond = now / 1000;
        long lastSecond = lastStatTime / 1000;
        currentSecondFlow += packet.size();
        currentSecondCount++;
        if (currentSecond != lastSecond) {
            if (lastStatTime != 0) {
                String timeStr = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now());
                SimFlow simFlow = SimFlow.builder().sim(rawSim).channel(channel).flow(currentSecondFlow).count(currentSecondCount).time(timeStr).build();
                if (simFlowDataBuffer != null) simFlowDataBuffer.setValue(simFlow);
            }
            currentSecondFlow = 0;
            currentSecondCount = 0;
            lastStatTime = now;
        }

        // 5. 初始化组件 (核心修复点)
        if (!SessionManager.contains(nettyChannel, "tag")) {
            // A. 启动 FFmpeg 推流 (使用可能带后缀的 tag,用于视频直播)
            Channel chl = PublishManager.getInstance().open(tag);

            // B. 双重注册:同时注册 tag 和 intercom_tag
            // 这样视频直播用 tag,语音对讲用 intercom_tag,两者互不干扰
            SessionManager.set(nettyChannel, "tag", tag);
            SessionManager.set(nettyChannel, "intercom_tag", standardTag); // 供 WebSocket 查找

            logger.info("Publish Start. VideoTag=[{}], IntercomTag=[{}]", tag, standardTag);
        }

        // 6. 数据处理
        processMediaPayload(nettyChannel, packet, tag, lengthOffset(packet));
    }

    /**
     * 【重要】修正协议头长度计算
     * DataType=3(音频) 和 2(对讲) 没有帧间隔字段(4字节),所以偏移量要减4
     */
    private int lengthOffset(Packet packet) {
        packet.seek(15);
        int dataType = (packet.nextByte() >> 4) & 0x0f;

        if (dataType == 0x04) {
            return 28 - 8 - 2 - 2; // 透传
        } else if (dataType == 0x03 || dataType == 0x02) {
            return 28 - 4; // 音频 & 对讲
        }
        return 28; // 视频
    }

    private void processMediaPayload(io.netty.channel.Channel nettyChannel, Packet packet, String tag, int lengthOffset) {
        Integer sequence = SessionManager.get(nettyChannel, "video-sequence");
        if (sequence == null) sequence = 0;

        packet.seek(15);
        byte dataTypeByte = packet.nextByte();
        int dataType = (dataTypeByte >> 4) & 0x0f; // 0=AV, 1=V, 2=Intercom, 3=Audio
        int pkType = dataTypeByte & 0x0f;

        logger.debug("[{}] Jtt1078Handler媒体数据: dataType=0x{}, pkType={}, lengthOffset={}",
                tag, Integer.toHexString(dataType), pkType, lengthOffset);

        packet.seek(5);
        int pt = packet.nextByte() & 0x7f;

        // --- 分支 A: 视频流 (0, 1) ---
        // 视频流必须发给 PublishManager (FFmpeg)
        if (dataType == 0x00 || dataType == 0x01) {
            if (pkType == 0 || pkType == 2) {
                sequence += 1;
                SessionManager.set(nettyChannel, "video-sequence", sequence);
            }
            long timestamp = packet.seek(16).nextLong();
            byte[] videoData = packet.seek(lengthOffset + 2).nextBytes();

            logger.debug("[{}] Jtt1078Handler收到视频数据: dataType=0x{}, pkType={}, videoDataLen={}",
                    tag, Integer.toHexString(dataType), pkType, videoData.length);

            // 推流到 FFmpeg (直播)
            PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, videoData);
        }

        // --- 分支 B: 音频流 (0, 2, 3) ---
        else if (dataType == 0x03 || dataType == 0x02 || dataType == 0x00) {

            // 只有明确是音频/对讲包时才处理
            if (dataType == 0x03 || dataType == 0x02) {
                long timestamp = packet.seek(16).nextLong();
                byte[] audioData = packet.seek(lengthOffset + 2).nextBytes();
                // 1. 发送给 WebSocket (前端监听 - 对讲核心)
                // 无论是什么音频,只要上来了,就发给 WebSocket,保证对讲能听到
                Jtt1078AudioBroadcastManager.broadcastAudio(this.currentSim, this.currentChannelId, audioData);

                // 2. 发送给 FFmpeg (直播/录像)
                // 【核心分流逻辑】
                // 如果是双向对讲(0x02),严格禁止发给 FFmpeg!
                // 只有 0x03 (环境监听) 才发给 FFmpeg 进行混流录制
                if (dataType != 0x02) {
                    PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, audioData);
                }
            }
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        release(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        String msg = cause.getMessage();
        if (msg != null && (msg.contains("invalid protocol header") || msg.contains("Connection reset"))) {
            // ignore scan errors
        } else {
            logger.error("Jtt1078Handler Exception: {}", cause.getMessage());
        }
        release(ctx.channel());
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                logger.warn("Read Timeout: {}", currentTag);
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    private void release(io.netty.channel.Channel channel) {
        String tag = SessionManager.get(channel, "tag");
        if (tag != null) {
            SessionManager.remove(channel);
            PublishManager.getInstance().close(tag);
        }
    }
}