Jtt1078Handler.java 6.71 KB
package com.genersoft.iot.vmp.jtt1078.server;

import com.genersoft.iot.vmp.jtt1078.publisher.Channel;
import com.genersoft.iot.vmp.jtt1078.publisher.PublishManager;
import com.genersoft.iot.vmp.jtt1078.util.Packet;
import com.genersoft.iot.vmp.vmanager.jt1078.platform.Jt1078OfCarController;
import com.genersoft.iot.vmp.vmanager.jt1078.platform.config.DataBuffer;
import com.genersoft.iot.vmp.vmanager.jt1078.platform.domain.SimFlow;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static com.genersoft.iot.vmp.VManageBootstrap.getBean;

/**
 * Created by matrixy on 2019/4/9.
 */
public class Jtt1078Handler extends SimpleChannelInboundHandler<Packet> {
    private static ApplicationContext applicationContext;
    static Logger logger = LoggerFactory.getLogger(Jtt1078Handler.class);
    private static final AttributeKey<Session> SESSION_KEY = AttributeKey.valueOf("session-key");
    private Integer port;

    private String time;

    private static final DataBuffer simFlowDataBuffer = getBean(DataBuffer.class);

    private static final ConcurrentHashMap<String, SimFlow> sizeMap = new ConcurrentHashMap<>();

    public Jtt1078Handler(Integer port) {
        this.port = port;
    }

    public Jtt1078Handler() {
    }

    /**
     * 流来
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
        io.netty.channel.Channel nettyChannel = ctx.channel();
        packet.seek(8);
        String sim = packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD();
        int channel = packet.nextByte() & 0xff;
        String tag = sim + "-" + channel;
        tag = tag.replaceAll("^0+", "");
        if (port != null) {
            Set<String> set = Jt1078OfCarController.map.get(port);
            String findSet = Jt1078OfCarController.getFindSet(set, tag);
            if (findSet != null) {
                tag = findSet + "_" + port;
            } else {
                return;
            }
        }
        StringRedisTemplate redisTemplate = getBean(StringRedisTemplate.class);
        if (redisTemplate.opsForSet().add("tag:" + tag, tag) > 0) {
            redisTemplate.expire("tag:" + tag, 60, TimeUnit.SECONDS);
            logger.info("[ {} ] --> 流接收成功 ", tag);
        } else {
            redisTemplate.expire("tag:" + tag, 60, TimeUnit.SECONDS);
        }
        String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

        String key = sim + "-" + channel;
        if (sizeMap.containsKey(key)){
            SimFlow simFlow = sizeMap.get(key);
            simFlow.setFlow(simFlow.getFlow() + packet.size());
            simFlow.setCount(simFlow.getCount() + 1);
        }else {
            sizeMap.put(key, SimFlow.builder().sim(sim).count(1).flow((long) packet.size()).channel(channel).time(format).build());
        }
        if (time == null || !time.equals(format)) {
            time = format;
            sizeMap.entrySet().forEach(entry -> {
                SimFlow value = entry.getValue();
                logger.debug("{} === {}次 === {} 流来的大小 {} ", value.getTime(), value.getCount(), value.getSim() + "-" + value.getChannel(), value.getFlow());
                if (simFlowDataBuffer != null) {
                    simFlowDataBuffer.setValue(value);
                }
            });
            sizeMap.clear();
        }
        if (SessionManager.contains(nettyChannel, "tag") == false) {
            Channel chl = PublishManager.getInstance().open(tag);
            SessionManager.set(nettyChannel, "tag", tag);
            logger.info("start publishing: {} -> {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel);
        }

        Integer sequence = SessionManager.get(nettyChannel, "video-sequence");
        if (sequence == null) sequence = 0;
        // 1. 做好序号
        // 2. 音频需要转码后提供订阅
        int lengthOffset = 28;
        int dataType = (packet.seek(15).nextByte() >> 4) & 0x0f;
        int pkType = packet.seek(15).nextByte() & 0x0f;
        // 透传数据类型:0100,没有后面的时间以及Last I Frame Interval和Last Frame Interval字段
        if (dataType == 0x04) lengthOffset = 28 - 8 - 2 - 2;
        else if (dataType == 0x03) lengthOffset = 28 - 4;

        int pt = packet.seek(5).nextByte() & 0x7f;

        if (dataType == 0x00 || dataType == 0x01 || dataType == 0x02) {
            // 碰到结束标记时,序号+1
            if (pkType == 0 || pkType == 2) {
                sequence += 1;
                SessionManager.set(nettyChannel, "video-sequence", sequence);
            }
            long timestamp = packet.seek(16).nextLong();
            PublishManager.getInstance().publishVideo(tag, sequence, timestamp, pt, packet.seek(lengthOffset + 2).nextBytes());
        } else if (dataType == 0x03) {
            long timestamp = packet.seek(16).nextLong();
            byte[] data = packet.seek(lengthOffset + 2).nextBytes();
            PublishManager.getInstance().publishAudio(tag, sequence, timestamp, pt, data);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        release(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        release(ctx.channel());
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                String tag = SessionManager.get(ctx.channel(), "tag");
                logger.info("read timeout: {}", tag);
                release(ctx.channel());
            }
        }
    }

    private void release(io.netty.channel.Channel channel) {
        String tag = SessionManager.get(channel, "tag");
        if (tag != null) {
            logger.info("close netty channel: {}", tag);
            PublishManager.getInstance().close(tag);
        }
    }


}