Commit ebc904e4d5fe07ecc269927f0e6669ad4f8bda19

Authored by 648540858
Committed by GitHub
2 parents 813fd772 97b673d6

Merge pull request #836 from keDaYao/featur-jt1078

新增JT1078 Template支持
Showing 29 changed files with 1868 additions and 0 deletions
src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.annotation;
  2 +
  3 +import java.lang.annotation.*;
  4 +
  5 +/**
  6 + * @author QingtaiJiang
  7 + * @date 2023/4/27 18:31
  8 + * @email qingtaij@163.com
  9 + */
  10 +@Target(ElementType.TYPE)
  11 +@Retention(RetentionPolicy.RUNTIME)
  12 +@Documented
  13 +public @interface MsgId {
  14 + String id();
  15 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.cmd;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
  4 +import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
  5 +import com.genersoft.iot.vmp.jt1078.proc.response.J9102;
  6 +import com.genersoft.iot.vmp.jt1078.session.SessionManager;
  7 +
  8 +import java.util.Random;
  9 +
  10 +/**
  11 + * @author QingtaiJiang
  12 + * @date 2023/4/27 18:58
  13 + * @email qingtaij@163.com
  14 + */
  15 +public class JT1078Template {
  16 +
  17 + private final Random random = new Random();
  18 +
  19 + /**
  20 + * 开启直播视频
  21 + *
  22 + * @param devId 设备号
  23 + * @param j9101 开启视频参数
  24 + */
  25 + public String startLive(String devId, J9101 j9101, Integer timeOut) {
  26 + Cmd cmd = new Cmd.Builder()
  27 + .setDevId(devId)
  28 + .setPackageNo(randomInt())
  29 + .setMsgId("9101")
  30 + .setRespId("0001")
  31 + .setRs(j9101)
  32 + .build();
  33 + return SessionManager.INSTANCE.request(cmd, timeOut);
  34 + }
  35 +
  36 + /**
  37 + * 关闭直播视频
  38 + *
  39 + * @param devId 设备号
  40 + * @param j9102 关闭视频参数
  41 + */
  42 + public String stopLive(String devId, J9102 j9102, Integer timeOut) {
  43 + Cmd cmd = new Cmd.Builder()
  44 + .setDevId(devId)
  45 + .setPackageNo(randomInt())
  46 + .setMsgId("9102")
  47 + .setRespId("0001")
  48 + .setRs(j9102)
  49 + .build();
  50 + return SessionManager.INSTANCE.request(cmd, timeOut);
  51 + }
  52 +
  53 + private Long randomInt() {
  54 + return (long) random.nextInt(1000) + 1;
  55 + }
  56 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.codec.decode;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  4 +import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory;
  5 +import com.genersoft.iot.vmp.jt1078.proc.request.Re;
  6 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  7 +import com.genersoft.iot.vmp.jt1078.session.Session;
  8 +import io.netty.buffer.ByteBuf;
  9 +import io.netty.buffer.ByteBufUtil;
  10 +import io.netty.buffer.CompositeByteBuf;
  11 +import io.netty.buffer.UnpooledByteBufAllocator;
  12 +import io.netty.channel.ChannelHandlerContext;
  13 +import io.netty.handler.codec.ByteToMessageDecoder;
  14 +import org.slf4j.Logger;
  15 +import org.slf4j.LoggerFactory;
  16 +
  17 +import java.util.ArrayList;
  18 +import java.util.List;
  19 +
  20 +/**
  21 + * @author QingtaiJiang
  22 + * @date 2023/4/27 18:10
  23 + * @email qingtaij@163.com
  24 + */
  25 +public class Jt808Decoder extends ByteToMessageDecoder {
  26 + private final static Logger log = LoggerFactory.getLogger(Jt808Decoder.class);
  27 +
  28 + @Override
  29 + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  30 + Session session = ctx.channel().attr(Session.KEY).get();
  31 + log.info("> {} hex:{}", session, ByteBufUtil.hexDump(in));
  32 +
  33 + try {
  34 + ByteBuf buf = unEscapeAndCheck(in);
  35 +
  36 + Header header = new Header();
  37 + header.setMsgId(ByteBufUtil.hexDump(buf.readSlice(2)));
  38 + header.setMsgPro(buf.readUnsignedShort());
  39 + if (header.is2019Version()) {
  40 + header.setVersion(buf.readUnsignedByte());
  41 + String devId = ByteBufUtil.hexDump(buf.readSlice(10));
  42 + header.setDevId(devId.replaceFirst("^0*", ""));
  43 + } else {
  44 + header.setDevId(ByteBufUtil.hexDump(buf.readSlice(6)).replaceFirst("^0*", ""));
  45 + }
  46 + header.setSn(buf.readUnsignedShort());
  47 +
  48 + Re handler = CodecFactory.getHandler(header.getMsgId());
  49 + if (handler == null) {
  50 + log.error("get msgId is null {}", header.getMsgId());
  51 + return;
  52 + }
  53 + Rs decode = handler.decode(buf, header, session);
  54 + if (decode != null) {
  55 + out.add(decode);
  56 + }
  57 + } finally {
  58 + in.skipBytes(in.readableBytes());
  59 + }
  60 +
  61 +
  62 + }
  63 +
  64 +
  65 + /**
  66 + * 转义与验证校验码
  67 + *
  68 + * @param byteBuf 转义Buf
  69 + * @return 转义好的数据
  70 + */
  71 + public ByteBuf unEscapeAndCheck(ByteBuf byteBuf) throws Exception {
  72 + int low = byteBuf.readerIndex();
  73 + int high = byteBuf.writerIndex();
  74 + byte checkSum = 0;
  75 + int calculationCheckSum = 0;
  76 +
  77 + byte aByte = byteBuf.getByte(high - 2);
  78 + byte protocolEscapeFlag7d = 0x7d;
  79 + //0x7d转义
  80 + byte protocolEscapeFlag01 = 0x01;
  81 + //0x7e转义
  82 + byte protocolEscapeFlag02 = 0x02;
  83 + if (aByte == protocolEscapeFlag7d) {
  84 + byte b2 = byteBuf.getByte(high - 1);
  85 + if (b2 == protocolEscapeFlag01) {
  86 + checkSum = protocolEscapeFlag7d;
  87 + } else if (b2 == protocolEscapeFlag02) {
  88 + checkSum = 0x7e;
  89 + } else {
  90 + log.error("转义1异常:{}", ByteBufUtil.hexDump(byteBuf));
  91 + throw new Exception("转义错误");
  92 + }
  93 + high = high - 2;
  94 + } else {
  95 + high = high - 1;
  96 + checkSum = byteBuf.getByte(high);
  97 + }
  98 + List<ByteBuf> bufList = new ArrayList<>();
  99 + int index = low;
  100 + while (index < high) {
  101 + byte b = byteBuf.getByte(index);
  102 + if (b == protocolEscapeFlag7d) {
  103 + byte c = byteBuf.getByte(index + 1);
  104 + if (c == protocolEscapeFlag01) {
  105 + ByteBuf slice = slice0x01(byteBuf, low, index);
  106 + bufList.add(slice);
  107 + b = protocolEscapeFlag7d;
  108 + } else if (c == protocolEscapeFlag02) {
  109 + ByteBuf slice = slice0x02(byteBuf, low, index);
  110 + bufList.add(slice);
  111 + b = 0x7e;
  112 + } else {
  113 + log.error("转义2异常:{}", ByteBufUtil.hexDump(byteBuf));
  114 + throw new Exception("转义错误");
  115 + }
  116 + index += 2;
  117 + low = index;
  118 + } else {
  119 + index += 1;
  120 + }
  121 + calculationCheckSum = calculationCheckSum ^ b;
  122 + }
  123 +
  124 + if (calculationCheckSum == checkSum) {
  125 + if (bufList.size() == 0) {
  126 + return byteBuf.slice(low, high);
  127 + } else {
  128 + bufList.add(byteBuf.slice(low, high - low));
  129 + return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, bufList.size(), bufList);
  130 + }
  131 + } else {
  132 + log.info("{} 解析校验码:{}--计算校验码:{}", ByteBufUtil.hexDump(byteBuf), checkSum, calculationCheckSum);
  133 + throw new Exception("校验码错误!");
  134 + }
  135 + }
  136 +
  137 +
  138 + private ByteBuf slice0x01(ByteBuf buf, int low, int sign) {
  139 + return buf.slice(low, sign - low + 1);
  140 + }
  141 +
  142 + private ByteBuf slice0x02(ByteBuf buf, int low, int sign) {
  143 + buf.setByte(sign, 0x7e);
  144 + return buf.slice(low, sign - low + 1);
  145 + }
  146 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.codec.encode;
  2 +
  3 +
  4 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  5 +import com.genersoft.iot.vmp.jt1078.session.Session;
  6 +import io.netty.buffer.ByteBuf;
  7 +import io.netty.buffer.ByteBufUtil;
  8 +import io.netty.channel.ChannelHandlerContext;
  9 +import io.netty.handler.codec.MessageToByteEncoder;
  10 +import org.slf4j.Logger;
  11 +import org.slf4j.LoggerFactory;
  12 +
  13 +/**
  14 + * @author QingtaiJiang
  15 + * @date 2023/4/27 18:10
  16 + * @email qingtaij@163.com
  17 + */
  18 +public class Jt808Encoder extends MessageToByteEncoder<Rs> {
  19 + private final static Logger log = LoggerFactory.getLogger(Jt808Encoder.class);
  20 +
  21 + @Override
  22 + protected void encode(ChannelHandlerContext ctx, Rs msg, ByteBuf out) throws Exception {
  23 + Session session = ctx.channel().attr(Session.KEY).get();
  24 +
  25 + ByteBuf encode = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo());
  26 + if(encode!=null){
  27 + log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
  28 + out.writeBytes(encode);
  29 + }
  30 + }
  31 +
  32 +
  33 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.codec.encode;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  5 +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
  6 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  7 +import com.genersoft.iot.vmp.jt1078.session.Session;
  8 +import com.genersoft.iot.vmp.jt1078.util.Bin;
  9 +import io.netty.buffer.ByteBuf;
  10 +import io.netty.buffer.ByteBufUtil;
  11 +import io.netty.buffer.CompositeByteBuf;
  12 +import io.netty.buffer.Unpooled;
  13 +import io.netty.channel.ChannelHandlerContext;
  14 +import io.netty.handler.codec.MessageToByteEncoder;
  15 +import io.netty.util.ByteProcessor;
  16 +import org.slf4j.Logger;
  17 +import org.slf4j.LoggerFactory;
  18 +import org.springframework.util.StringUtils;
  19 +
  20 +import java.util.LinkedList;
  21 +
  22 +/**
  23 + * @author QingtaiJiang
  24 + * @date 2023/4/27 18:25
  25 + * @email qingtaij@163.com
  26 + */
  27 +public class Jt808EncoderCmd extends MessageToByteEncoder<Cmd> {
  28 + private final static Logger log = LoggerFactory.getLogger(Jt808EncoderCmd.class);
  29 +
  30 + @Override
  31 + protected void encode(ChannelHandlerContext ctx, Cmd cmd, ByteBuf out) throws Exception {
  32 + Session session = ctx.channel().attr(Session.KEY).get();
  33 + Rs msg = cmd.getRs();
  34 + ByteBuf encode = encode(msg, session, cmd.getPackageNo().intValue());
  35 + if (encode != null) {
  36 + log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
  37 + out.writeBytes(encode);
  38 + }
  39 + }
  40 +
  41 +
  42 + public static ByteBuf encode(Rs msg, Session session, Integer packageNo) {
  43 + String id = msg.getClass().getAnnotation(MsgId.class).id();
  44 + if (!StringUtils.hasLength(id)) {
  45 + log.error("Not find msgId");
  46 + return null;
  47 + }
  48 +
  49 + ByteBuf byteBuf = Unpooled.buffer();
  50 +
  51 + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(id));
  52 +
  53 + ByteBuf encode = msg.encode();
  54 +
  55 + Header header = msg.getHeader();
  56 + if (header == null) {
  57 + header = session.getHeader();
  58 + }
  59 +
  60 + if (header.is2019Version()) {
  61 + // 消息体属性
  62 + byteBuf.writeShort(encode.readableBytes() | 1 << 14);
  63 +
  64 + // 版本号
  65 + byteBuf.writeByte(header.getVersion());
  66 +
  67 + // 终端手机号
  68 + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 20)));
  69 + } else {
  70 + // 消息体属性
  71 + byteBuf.writeShort(encode.readableBytes());
  72 +
  73 + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 12)));
  74 + }
  75 +
  76 + // 消息体流水号
  77 + byteBuf.writeShort(packageNo);
  78 +
  79 + // 写入消息体
  80 + byteBuf.writeBytes(encode);
  81 +
  82 + // 计算校验码,并反转义
  83 + byteBuf = escapeAndCheck0(byteBuf);
  84 + return byteBuf;
  85 + }
  86 +
  87 +
  88 + private static final ByteProcessor searcher = value -> !(value == 0x7d || value == 0x7e);
  89 +
  90 + //转义与校验
  91 + public static ByteBuf escapeAndCheck0(ByteBuf source) {
  92 +
  93 + sign(source);
  94 +
  95 + int low = source.readerIndex();
  96 + int high = source.writerIndex();
  97 +
  98 + LinkedList<ByteBuf> bufList = new LinkedList<>();
  99 + int mark, len;
  100 + while ((mark = source.forEachByte(low, high - low, searcher)) > 0) {
  101 +
  102 + len = mark + 1 - low;
  103 + ByteBuf[] slice = slice(source, low, len);
  104 + bufList.add(slice[0]);
  105 + bufList.add(slice[1]);
  106 + low += len;
  107 + }
  108 +
  109 + if (bufList.size() > 0) {
  110 + bufList.add(source.slice(low, high - low));
  111 + } else {
  112 + bufList.add(source);
  113 + }
  114 +
  115 + ByteBuf delimiter = Unpooled.buffer(1, 1).writeByte(0x7e).retain();
  116 + bufList.addFirst(delimiter);
  117 + bufList.addLast(delimiter);
  118 +
  119 + CompositeByteBuf byteBufLs = Unpooled.compositeBuffer(bufList.size());
  120 + byteBufLs.addComponents(true, bufList);
  121 + return byteBufLs;
  122 + }
  123 +
  124 + public static void sign(ByteBuf buf) {
  125 + byte checkCode = bcc(buf);
  126 + buf.writeByte(checkCode);
  127 + }
  128 +
  129 + public static byte bcc(ByteBuf byteBuf) {
  130 + byte cs = 0;
  131 + while (byteBuf.isReadable())
  132 + cs ^= byteBuf.readByte();
  133 + byteBuf.resetReaderIndex();
  134 + return cs;
  135 + }
  136 +
  137 + protected static ByteBuf[] slice(ByteBuf byteBuf, int index, int length) {
  138 + byte first = byteBuf.getByte(index + length - 1);
  139 +
  140 + ByteBuf[] byteBufList = new ByteBuf[2];
  141 + byteBufList[0] = byteBuf.retainedSlice(index, length);
  142 +
  143 + if (first == 0x7d) {
  144 + byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x01);
  145 + } else {
  146 + byteBuf.setByte(index + length - 1, 0x7d);
  147 + byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x02);
  148 + }
  149 + return byteBufList;
  150 + }
  151 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.codec.netty;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  4 +import com.genersoft.iot.vmp.jt1078.session.Session;
  5 +import com.genersoft.iot.vmp.jt1078.session.SessionManager;
  6 +import io.netty.channel.Channel;
  7 +import io.netty.channel.ChannelHandlerContext;
  8 +import io.netty.channel.ChannelInboundHandlerAdapter;
  9 +import io.netty.handler.timeout.IdleState;
  10 +import io.netty.handler.timeout.IdleStateEvent;
  11 +import org.slf4j.Logger;
  12 +import org.slf4j.LoggerFactory;
  13 +
  14 +/**
  15 + * @author QingtaiJiang
  16 + * @date 2023/4/27 18:14
  17 + * @email qingtaij@163.com
  18 + */
  19 +public class Jt808Handler extends ChannelInboundHandlerAdapter {
  20 +
  21 + private final static Logger log = LoggerFactory.getLogger(Jt808Handler.class);
  22 +
  23 + @Override
  24 + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  25 + if (msg instanceof Rs) {
  26 + ctx.writeAndFlush(msg);
  27 + } else {
  28 + ctx.fireChannelRead(msg);
  29 + }
  30 + }
  31 +
  32 + @Override
  33 + public void channelActive(ChannelHandlerContext ctx) {
  34 + Channel channel = ctx.channel();
  35 + Session session = SessionManager.INSTANCE.newSession(channel);
  36 + channel.attr(Session.KEY).set(session);
  37 + log.info("> Tcp connect {}", session);
  38 + }
  39 +
  40 + @Override
  41 + public void channelInactive(ChannelHandlerContext ctx) {
  42 + Session session = ctx.channel().attr(Session.KEY).get();
  43 + log.info("< Tcp disconnect {}", session);
  44 + ctx.close();
  45 + }
  46 +
  47 + @Override
  48 + public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
  49 + Session session = ctx.channel().attr(Session.KEY).get();
  50 + String message = e.getMessage();
  51 + if (message.toLowerCase().contains("Connection reset by peer".toLowerCase())) {
  52 + log.info("< exception{} {}", session, e.getMessage());
  53 + } else {
  54 + log.info("< exception{} {}", session, e.getMessage(), e);
  55 + }
  56 +
  57 + }
  58 +
  59 + @Override
  60 + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
  61 + if (evt instanceof IdleStateEvent) {
  62 + IdleStateEvent event = (IdleStateEvent) evt;
  63 + IdleState state = event.state();
  64 + if (state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) {
  65 + Session session = ctx.channel().attr(Session.KEY).get();
  66 + log.warn("< Proactively disconnect{}", session);
  67 + ctx.close();
  68 + }
  69 + }
  70 + }
  71 +
  72 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.codec.netty;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.codec.decode.Jt808Decoder;
  4 +import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808Encoder;
  5 +import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808EncoderCmd;
  6 +import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory;
  7 +import io.netty.bootstrap.ServerBootstrap;
  8 +import io.netty.buffer.ByteBuf;
  9 +import io.netty.buffer.Unpooled;
  10 +import io.netty.channel.ChannelFuture;
  11 +import io.netty.channel.ChannelInitializer;
  12 +import io.netty.channel.EventLoopGroup;
  13 +import io.netty.channel.nio.NioEventLoopGroup;
  14 +import io.netty.channel.socket.nio.NioChannelOption;
  15 +import io.netty.channel.socket.nio.NioServerSocketChannel;
  16 +import io.netty.channel.socket.nio.NioSocketChannel;
  17 +import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  18 +import io.netty.handler.timeout.IdleStateHandler;
  19 +import io.netty.util.concurrent.Future;
  20 +import org.slf4j.Logger;
  21 +import org.slf4j.LoggerFactory;
  22 +
  23 +import java.util.concurrent.TimeUnit;
  24 +
  25 +/**
  26 + * @author QingtaiJiang
  27 + * @date 2023/4/27 18:01
  28 + * @email qingtaij@163.com
  29 + */
  30 +
  31 +public class TcpServer {
  32 + private final static Logger log = LoggerFactory.getLogger(TcpServer.class);
  33 +
  34 + private final Integer port;
  35 + private boolean isRunning = false;
  36 + private EventLoopGroup bossGroup = null;
  37 + private EventLoopGroup workerGroup = null;
  38 +
  39 + private final ByteBuf DECODER_JT808 = Unpooled.wrappedBuffer(new byte[]{0x7e});
  40 +
  41 + public TcpServer(Integer port) {
  42 + this.port = port;
  43 + }
  44 +
  45 + private void startTcpServer() {
  46 + try {
  47 + CodecFactory.init();
  48 + this.bossGroup = new NioEventLoopGroup();
  49 + this.workerGroup = new NioEventLoopGroup();
  50 + ServerBootstrap bootstrap = new ServerBootstrap();
  51 + bootstrap.channel(NioServerSocketChannel.class);
  52 + bootstrap.group(bossGroup, workerGroup);
  53 +
  54 + bootstrap.option(NioChannelOption.SO_BACKLOG, 1024)
  55 + .option(NioChannelOption.SO_REUSEADDR, true)
  56 + .childOption(NioChannelOption.TCP_NODELAY, true)
  57 + .childHandler(new ChannelInitializer<NioSocketChannel>() {
  58 + @Override
  59 + public void initChannel(NioSocketChannel channel) {
  60 + channel.pipeline()
  61 + .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES))
  62 + .addLast(new DelimiterBasedFrameDecoder(1024 * 2, DECODER_JT808))
  63 + .addLast(new Jt808Decoder())
  64 + .addLast(new Jt808Encoder())
  65 + .addLast(new Jt808EncoderCmd())
  66 + .addLast(new Jt808Handler());
  67 + }
  68 + });
  69 + ChannelFuture channelFuture = bootstrap.bind(port).sync();
  70 + // 监听设备TCP端口是否启动成功
  71 + channelFuture.addListener(future -> {
  72 + if (!future.isSuccess()) {
  73 + log.error("Binding port:{} fail! cause: {}", port, future.cause().getCause(), future.cause());
  74 + }
  75 + });
  76 + log.info("服务:JT808 Server 启动成功, port:{}", port);
  77 + channelFuture.channel().closeFuture().sync();
  78 + } catch (Exception e) {
  79 + log.warn("服务:JT808 Server 启动异常, port:{},{}", port, e.getMessage(), e);
  80 + } finally {
  81 + stop();
  82 + }
  83 + }
  84 +
  85 + /**
  86 + * 开启一个新的线程,拉起来Netty
  87 + */
  88 + public synchronized void start() {
  89 + if (this.isRunning) {
  90 + log.warn("服务:JT808 Server 已经启动, port:{}", port);
  91 + return;
  92 + }
  93 + this.isRunning = true;
  94 + new Thread(this::startTcpServer).start();
  95 + }
  96 +
  97 + public synchronized void stop() {
  98 + if (!this.isRunning) {
  99 + log.warn("服务:JT808 Server 已经停止, port:{}", port);
  100 + }
  101 + this.isRunning = false;
  102 + Future<?> future = this.bossGroup.shutdownGracefully();
  103 + if (!future.isSuccess()) {
  104 + log.warn("bossGroup 无法正常停止", future.cause());
  105 + }
  106 + future = this.workerGroup.shutdownGracefully();
  107 + if (!future.isSuccess()) {
  108 + log.warn("workerGroup 无法正常停止", future.cause());
  109 + }
  110 + log.warn("服务:JT808 Server 已经停止, port:{}", port);
  111 + }
  112 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.config;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
  4 +import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
  5 +import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  6 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  7 +import org.springframework.web.bind.annotation.GetMapping;
  8 +import org.springframework.web.bind.annotation.PathVariable;
  9 +import org.springframework.web.bind.annotation.RequestMapping;
  10 +import org.springframework.web.bind.annotation.RestController;
  11 +
  12 +import javax.annotation.Resource;
  13 +
  14 +/**
  15 + * curl http://localhost:18080/api/jt1078/start/live/18864197066/1
  16 + *
  17 + * @author QingtaiJiang
  18 + * @date 2023/4/27 18:12
  19 + * @email qingtaij@163.com
  20 + */
  21 +@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true")
  22 +@RestController
  23 +@RequestMapping("/api/jt1078")
  24 +public class JT1078Controller {
  25 +
  26 + @Resource
  27 + JT1078Template jt1078Template;
  28 +
  29 + @GetMapping("/start/live/{deviceId}/{channelId}")
  30 + public WVPResult<?> startLive(@PathVariable String deviceId, @PathVariable String channelId) {
  31 + J9101 j9101 = new J9101();
  32 + j9101.setChannel(Integer.valueOf(channelId));
  33 + j9101.setIp("192.168.1.1");
  34 + j9101.setRate(1);
  35 + j9101.setTcpPort(7618);
  36 + j9101.setUdpPort(7618);
  37 + j9101.setType(0);
  38 +
  39 + String s = jt1078Template.startLive(deviceId, j9101, 6);
  40 + WVPResult<String> wvpResult = new WVPResult<>();
  41 + wvpResult.setCode(200);
  42 + wvpResult.setData(String.format("http://192.168.1.1/rtp/%s_%s.live.mp4", deviceId, channelId));
  43 + return wvpResult;
  44 + }
  45 +}
  46 +
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.config;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
  4 +import com.genersoft.iot.vmp.jt1078.codec.netty.TcpServer;
  5 +import org.springframework.beans.factory.annotation.Value;
  6 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  7 +import org.springframework.context.annotation.Bean;
  8 +import org.springframework.context.annotation.Configuration;
  9 +import org.springframework.core.annotation.Order;
  10 +
  11 +/**
  12 + * @author QingtaiJiang
  13 + * @date 2023/4/27 19:35
  14 + * @email qingtaij@163.com
  15 + */
  16 +@Order(Integer.MIN_VALUE)
  17 +@Configuration
  18 +@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true")
  19 +public class TcpAutoConfiguration {
  20 +
  21 + @Bean(initMethod = "start", destroyMethod = "stop")
  22 + public TcpServer jt1078Server(@Value("${jt1078.port}") Integer port) {
  23 + return new TcpServer(port);
  24 + }
  25 +
  26 + @Bean
  27 + public JT1078Template jt1078Template() {
  28 + return new JT1078Template();
  29 + }
  30 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.util.Bin;
  4 +
  5 +/**
  6 + * @author QingtaiJiang
  7 + * @date 2023/4/27 18:22
  8 + * @email qingtaij@163.com
  9 + */
  10 +public class Header {
  11 + // 消息ID
  12 + String msgId;
  13 +
  14 + // 消息体属性
  15 + Integer msgPro;
  16 +
  17 + // 标识
  18 + String devId;
  19 +
  20 + // 消息体流水号
  21 + Integer sn;
  22 +
  23 + // 协议版本号
  24 + Short version = -1;
  25 +
  26 +
  27 + public String getMsgId() {
  28 + return msgId;
  29 + }
  30 +
  31 + public void setMsgId(String msgId) {
  32 + this.msgId = msgId;
  33 + }
  34 +
  35 + public Integer getMsgPro() {
  36 + return msgPro;
  37 + }
  38 +
  39 + public void setMsgPro(Integer msgPro) {
  40 + this.msgPro = msgPro;
  41 + }
  42 +
  43 + public String getDevId() {
  44 + return devId;
  45 + }
  46 +
  47 + public void setDevId(String devId) {
  48 + this.devId = devId;
  49 + }
  50 +
  51 + public Integer getSn() {
  52 + return sn;
  53 + }
  54 +
  55 + public void setSn(Integer sn) {
  56 + this.sn = sn;
  57 + }
  58 +
  59 + public Short getVersion() {
  60 + return version;
  61 + }
  62 +
  63 + public void setVersion(Short version) {
  64 + this.version = version;
  65 + }
  66 +
  67 + /**
  68 + * 判断是否是2019的版本
  69 + *
  70 + * @return true 2019后的版本。false 2013
  71 + */
  72 + public boolean is2019Version() {
  73 + return Bin.get(msgPro, 14);
  74 + }
  75 +
  76 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.entity;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  4 +
  5 +/**
  6 + * @author QingtaiJiang
  7 + * @date 2023/4/27 18:23
  8 + * @email qingtaij@163.com
  9 + */
  10 +public class Cmd {
  11 + String devId;
  12 + Long packageNo;
  13 + String msgId;
  14 + String respId;
  15 + Rs rs;
  16 +
  17 + public Cmd() {
  18 + }
  19 +
  20 + public Cmd(Builder builder) {
  21 + this.devId = builder.devId;
  22 + this.packageNo = builder.packageNo;
  23 + this.msgId = builder.msgId;
  24 + this.respId = builder.respId;
  25 + this.rs = builder.rs;
  26 + }
  27 +
  28 + public String getDevId() {
  29 + return devId;
  30 + }
  31 +
  32 + public void setDevId(String devId) {
  33 + this.devId = devId;
  34 + }
  35 +
  36 + public Long getPackageNo() {
  37 + return packageNo;
  38 + }
  39 +
  40 + public void setPackageNo(Long packageNo) {
  41 + this.packageNo = packageNo;
  42 + }
  43 +
  44 + public String getMsgId() {
  45 + return msgId;
  46 + }
  47 +
  48 + public void setMsgId(String msgId) {
  49 + this.msgId = msgId;
  50 + }
  51 +
  52 + public String getRespId() {
  53 + return respId;
  54 + }
  55 +
  56 + public void setRespId(String respId) {
  57 + this.respId = respId;
  58 + }
  59 +
  60 + public Rs getRs() {
  61 + return rs;
  62 + }
  63 +
  64 + public void setRs(Rs rs) {
  65 + this.rs = rs;
  66 + }
  67 +
  68 + public static class Builder {
  69 + String devId;
  70 + Long packageNo;
  71 + String msgId;
  72 + String respId;
  73 + Rs rs;
  74 +
  75 + public Builder setDevId(String devId) {
  76 + this.devId = devId.replaceFirst("^0*", "");
  77 + return this;
  78 + }
  79 +
  80 + public Builder setPackageNo(Long packageNo) {
  81 + this.packageNo = packageNo;
  82 + return this;
  83 + }
  84 +
  85 + public Builder setMsgId(String msgId) {
  86 + this.msgId = msgId;
  87 + return this;
  88 + }
  89 +
  90 + public Builder setRespId(String respId) {
  91 + this.respId = respId;
  92 + return this;
  93 + }
  94 +
  95 + public Builder setRs(Rs re) {
  96 + this.rs = re;
  97 + return this;
  98 + }
  99 +
  100 + public Cmd build() {
  101 + return new Cmd(this);
  102 + }
  103 + }
  104 +
  105 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.factory;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import com.genersoft.iot.vmp.jt1078.proc.request.Re;
  5 +import com.genersoft.iot.vmp.jt1078.util.ClassUtil;
  6 +import org.slf4j.Logger;
  7 +import org.slf4j.LoggerFactory;
  8 +
  9 +import java.util.HashMap;
  10 +import java.util.List;
  11 +import java.util.Map;
  12 +
  13 +/**
  14 + * @author QingtaiJiang
  15 + * @date 2023/4/27 18:29
  16 + * @email qingtaij@163.com
  17 + */
  18 +
  19 +public class CodecFactory {
  20 + private final static Logger log = LoggerFactory.getLogger(CodecFactory.class);
  21 +
  22 + private static Map<String, Class<?>> protocolHash;
  23 +
  24 + public static void init() {
  25 + protocolHash = new HashMap<>();
  26 + List<Class<?>> classList = ClassUtil.getClassList("com.genersoft.iot.vmp.jt1078.proc", MsgId.class);
  27 + for (Class<?> handlerClass : classList) {
  28 + String id = handlerClass.getAnnotation(MsgId.class).id();
  29 + protocolHash.put(id, handlerClass);
  30 + }
  31 + if (log.isDebugEnabled()) {
  32 + log.debug("消息ID缓存表 protocolHash:{}", protocolHash);
  33 + }
  34 + }
  35 +
  36 + public static Re getHandler(String msgId) {
  37 + Class<?> aClass = protocolHash.get(msgId);
  38 + Object bean = ClassUtil.getBean(aClass);
  39 + if (bean instanceof Re) {
  40 + return (Re) bean;
  41 + }
  42 + return null;
  43 + }
  44 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.request;
  2 +
  3 +import com.alibaba.fastjson2.JSON;
  4 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  5 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  6 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  7 +import com.genersoft.iot.vmp.jt1078.session.Session;
  8 +import com.genersoft.iot.vmp.jt1078.session.SessionManager;
  9 +import io.netty.buffer.ByteBuf;
  10 +import io.netty.buffer.ByteBufUtil;
  11 +
  12 +/**
  13 + * 终端通用应答
  14 + *
  15 + * @author QingtaiJiang
  16 + * @date 2023/4/27 18:04
  17 + * @email qingtaij@163.com
  18 + */
  19 +@MsgId(id = "0001")
  20 +public class J0001 extends Re {
  21 + int respNo;
  22 + String respId;
  23 + int result;
  24 +
  25 + @Override
  26 + protected Rs decode0(ByteBuf buf, Header header, Session session) {
  27 + respNo = buf.readUnsignedShort();
  28 + respId = ByteBufUtil.hexDump(buf.readSlice(2));
  29 + result = buf.readUnsignedByte();
  30 + return null;
  31 + }
  32 +
  33 + @Override
  34 + protected Rs handler(Header header, Session session) {
  35 + SessionManager.INSTANCE.response(header.getDevId(), "0001", (long) respNo, JSON.toJSONString(this));
  36 + return null;
  37 + }
  38 +
  39 + public int getRespNo() {
  40 + return respNo;
  41 + }
  42 +
  43 + public String getRespId() {
  44 + return respId;
  45 + }
  46 +
  47 + public int getResult() {
  48 + return result;
  49 + }
  50 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.request;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  5 +import com.genersoft.iot.vmp.jt1078.proc.response.J8001;
  6 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  7 +import com.genersoft.iot.vmp.jt1078.session.Session;
  8 +import io.netty.buffer.ByteBuf;
  9 +
  10 +/**
  11 + * 终端心跳
  12 + *
  13 + * @author QingtaiJiang
  14 + * @date 2023/4/27 18:04
  15 + * @email qingtaij@163.com
  16 + */
  17 +@MsgId(id = "0002")
  18 +public class J0002 extends Re {
  19 + @Override
  20 + protected Rs decode0(ByteBuf buf, Header header, Session session) {
  21 + return null;
  22 + }
  23 +
  24 + @Override
  25 + protected Rs handler(Header header, Session session) {
  26 + J8001 j8001 = new J8001();
  27 + j8001.setRespNo(header.getSn());
  28 + j8001.setRespId(header.getMsgId());
  29 + j8001.setResult(J8001.SUCCESS);
  30 + return j8001;
  31 + }
  32 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.request;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  5 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  6 +import com.genersoft.iot.vmp.jt1078.session.Session;
  7 +import io.netty.buffer.ByteBuf;
  8 +
  9 +/**
  10 + * 查询服务器时间
  11 + *
  12 + * @author QingtaiJiang
  13 + * @date 2023/4/27 18:06
  14 + * @email qingtaij@163.com
  15 + */
  16 +@MsgId(id = "0004")
  17 +public class J0004 extends Re {
  18 + @Override
  19 + protected Rs decode0(ByteBuf buf, Header header, Session session) {
  20 + return null;
  21 + }
  22 +
  23 + @Override
  24 + protected Rs handler(Header header, Session session) {
  25 + return null;
  26 + }
  27 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.request;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  5 +import com.genersoft.iot.vmp.jt1078.proc.response.J8100;
  6 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  7 +import com.genersoft.iot.vmp.jt1078.session.Session;
  8 +import io.netty.buffer.ByteBuf;
  9 +
  10 +/**
  11 + * 终端注册
  12 + *
  13 + * @author QingtaiJiang
  14 + * @date 2023/4/27 18:06
  15 + * @email qingtaij@163.com
  16 + */
  17 +@MsgId(id = "0100")
  18 +public class J0100 extends Re {
  19 +
  20 + private int provinceId;
  21 +
  22 + private int cityId;
  23 +
  24 + private String makerId;
  25 +
  26 + private String deviceModel;
  27 +
  28 + private String deviceId;
  29 +
  30 + private int plateColor;
  31 +
  32 + private String plateNo;
  33 +
  34 + @Override
  35 + protected Rs decode0(ByteBuf buf, Header header, Session session) {
  36 + Short version = header.getVersion();
  37 + provinceId = buf.readUnsignedShort();
  38 + if (version > 1) {
  39 + cityId = buf.readUnsignedShort();
  40 + // decode as 2019
  41 + } else {
  42 + int i = buf.readUnsignedShort();
  43 + // decode as 2013
  44 + }
  45 + return null;
  46 + }
  47 +
  48 + @Override
  49 + protected Rs handler(Header header, Session session) {
  50 + J8100 j8100 = new J8100();
  51 + j8100.setRespNo(header.getSn());
  52 + j8100.setResult(J8100.SUCCESS);
  53 + j8100.setCode("WVP_YYDS");
  54 + return j8100;
  55 + }
  56 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.request;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  5 +import com.genersoft.iot.vmp.jt1078.proc.response.J8001;
  6 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  7 +import com.genersoft.iot.vmp.jt1078.session.Session;
  8 +import io.netty.buffer.ByteBuf;
  9 +
  10 +/**
  11 + * 终端鉴权
  12 + *
  13 + * @author QingtaiJiang
  14 + * @date 2023/4/27 18:06
  15 + * @email qingtaij@163.com
  16 + */
  17 +@MsgId(id = "0102")
  18 +public class J0102 extends Re {
  19 + @Override
  20 + protected Rs decode0(ByteBuf buf, Header header, Session session) {
  21 + int lenCode = buf.readUnsignedByte();
  22 +// String code = buf.readCharSequence(lenCode, CharsetUtil.UTF_8).toString();
  23 + // if 2019 to decode next
  24 + return null;
  25 + }
  26 +
  27 + @Override
  28 + protected Rs handler(Header header, Session session) {
  29 + J8001 j8001 = new J8001();
  30 + j8001.setRespNo(header.getSn());
  31 + j8001.setRespId(header.getMsgId());
  32 + j8001.setResult(J8001.SUCCESS);
  33 + return j8001;
  34 + }
  35 +
  36 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.request;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  5 +import com.genersoft.iot.vmp.jt1078.proc.response.J8001;
  6 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  7 +import com.genersoft.iot.vmp.jt1078.session.Session;
  8 +import io.netty.buffer.ByteBuf;
  9 +
  10 +/**
  11 + * 实时消息上报
  12 + *
  13 + * @author QingtaiJiang
  14 + * @date 2023/4/27 18:06
  15 + * @email qingtaij@163.com
  16 + */
  17 +@MsgId(id = "0200")
  18 +public class J0200 extends Re {
  19 + @Override
  20 + protected Rs decode0(ByteBuf buf, Header header, Session session) {
  21 + return null;
  22 + }
  23 +
  24 + @Override
  25 + protected Rs handler(Header header, Session session) {
  26 + J8001 j8001 = new J8001();
  27 + j8001.setRespNo(header.getSn());
  28 + j8001.setRespId(header.getMsgId());
  29 + j8001.setResult(J8001.SUCCESS);
  30 + return j8001;
  31 + }
  32 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.request;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  4 +import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
  5 +import com.genersoft.iot.vmp.jt1078.session.Session;
  6 +import io.netty.buffer.ByteBuf;
  7 +import org.slf4j.Logger;
  8 +import org.slf4j.LoggerFactory;
  9 +import org.springframework.util.StringUtils;
  10 +
  11 +/**
  12 + * @author QingtaiJiang
  13 + * @date 2023/4/27 18:50
  14 + * @email qingtaij@163.com
  15 + */
  16 +public abstract class Re {
  17 + private final static Logger log = LoggerFactory.getLogger(Re.class);
  18 +
  19 + protected abstract Rs decode0(ByteBuf buf, Header header, Session session);
  20 +
  21 + protected abstract Rs handler(Header header, Session session);
  22 +
  23 + public Rs decode(ByteBuf buf, Header header, Session session) {
  24 + if (session != null && !StringUtils.hasLength(session.getDevId())) {
  25 + session.register(header.getDevId(), (int) header.getVersion(), header);
  26 + }
  27 + Rs rs = decode0(buf, header, session);
  28 + Rs rsHand = handler(header, session);
  29 + if (rs == null && rsHand != null) {
  30 + rs = rsHand;
  31 + } else if (rs != null && rsHand != null) {
  32 + log.warn("decode0:{} 与 handler:{} 返回值冲突,采用decode0返回值", rs, rsHand);
  33 + }
  34 + if (rs != null) {
  35 + rs.setHeader(header);
  36 + }
  37 +
  38 + return rs;
  39 + }
  40 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.response;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import io.netty.buffer.ByteBuf;
  5 +import io.netty.buffer.ByteBufUtil;
  6 +import io.netty.buffer.Unpooled;
  7 +
  8 +/**
  9 + * @author QingtaiJiang
  10 + * @date 2023/4/27 18:48
  11 + * @email qingtaij@163.com
  12 + */
  13 +@MsgId(id = "8001")
  14 +public class J8001 extends Rs {
  15 + public static final Integer SUCCESS = 0;
  16 +
  17 + Integer respNo;
  18 + String respId;
  19 + Integer result;
  20 +
  21 + @Override
  22 + public ByteBuf encode() {
  23 + ByteBuf buffer = Unpooled.buffer();
  24 + buffer.writeShort(respNo);
  25 + buffer.writeBytes(ByteBufUtil.decodeHexDump(respId));
  26 + buffer.writeByte(result);
  27 +
  28 + return buffer;
  29 + }
  30 +
  31 +
  32 + public void setRespNo(Integer respNo) {
  33 + this.respNo = respNo;
  34 + }
  35 +
  36 + public void setRespId(String respId) {
  37 + this.respId = respId;
  38 + }
  39 +
  40 + public void setResult(Integer result) {
  41 + this.result = result;
  42 + }
  43 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.response;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import io.netty.buffer.ByteBuf;
  5 +import io.netty.buffer.Unpooled;
  6 +import io.netty.util.CharsetUtil;
  7 +
  8 +/**
  9 + * @author QingtaiJiang
  10 + * @date 2023/4/27 18:40
  11 + * @email qingtaij@163.com
  12 + */
  13 +@MsgId(id = "8100")
  14 +public class J8100 extends Rs {
  15 + public static final Integer SUCCESS = 0;
  16 +
  17 + Integer respNo;
  18 + Integer result;
  19 + String code;
  20 +
  21 + @Override
  22 + public ByteBuf encode() {
  23 + ByteBuf buffer = Unpooled.buffer();
  24 + buffer.writeShort(respNo);
  25 + buffer.writeByte(result);
  26 + buffer.writeCharSequence(code, CharsetUtil.UTF_8);
  27 + return buffer;
  28 + }
  29 +
  30 + public void setRespNo(Integer respNo) {
  31 + this.respNo = respNo;
  32 + }
  33 +
  34 + public void setResult(Integer result) {
  35 + this.result = result;
  36 + }
  37 +
  38 + public void setCode(String code) {
  39 + this.code = code;
  40 + }
  41 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.response;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
  4 +import io.netty.buffer.ByteBuf;
  5 +import io.netty.buffer.Unpooled;
  6 +import io.netty.util.CharsetUtil;
  7 +
  8 +/**
  9 + * @author QingtaiJiang
  10 + * @date 2023/4/27 18:25
  11 + * @email qingtaij@163.com
  12 + */
  13 +@MsgId(id = "9101")
  14 +public class J9101 extends Rs {
  15 + String ip;
  16 +
  17 + // TCP端口
  18 + Integer tcpPort;
  19 +
  20 + // UDP端口
  21 + Integer udpPort;
  22 +
  23 + // 逻辑通道号
  24 + Integer channel;
  25 +
  26 + // 数据类型
  27 + /**
  28 + * 0:音视频,1:视频,2:双向对讲,3:监听,4:中心广播,5:透传
  29 + */
  30 + Integer type;
  31 +
  32 + // 码流类型
  33 + /**
  34 + * 0:主码流,1:子码流
  35 + */
  36 + Integer rate;
  37 +
  38 + @Override
  39 + public ByteBuf encode() {
  40 + ByteBuf buffer = Unpooled.buffer();
  41 + buffer.writeByte(ip.getBytes().length);
  42 + buffer.writeCharSequence(ip, CharsetUtil.UTF_8);
  43 + buffer.writeShort(tcpPort);
  44 + buffer.writeShort(udpPort);
  45 + buffer.writeByte(channel);
  46 + buffer.writeByte(type);
  47 + buffer.writeByte(rate);
  48 + return buffer;
  49 + }
  50 +
  51 + public String getIp() {
  52 + return ip;
  53 + }
  54 +
  55 + public void setIp(String ip) {
  56 + this.ip = ip;
  57 + }
  58 +
  59 + public Integer getTcpPort() {
  60 + return tcpPort;
  61 + }
  62 +
  63 + public void setTcpPort(Integer tcpPort) {
  64 + this.tcpPort = tcpPort;
  65 + }
  66 +
  67 + public Integer getUdpPort() {
  68 + return udpPort;
  69 + }
  70 +
  71 + public void setUdpPort(Integer udpPort) {
  72 + this.udpPort = udpPort;
  73 + }
  74 +
  75 + public Integer getChannel() {
  76 + return channel;
  77 + }
  78 +
  79 + public void setChannel(Integer channel) {
  80 + this.channel = channel;
  81 + }
  82 +
  83 + public Integer getType() {
  84 + return type;
  85 + }
  86 +
  87 + public void setType(Integer type) {
  88 + this.type = type;
  89 + }
  90 +
  91 + public Integer getRate() {
  92 + return rate;
  93 + }
  94 +
  95 + public void setRate(Integer rate) {
  96 + this.rate = rate;
  97 + }
  98 +
  99 + @Override
  100 + public String toString() {
  101 + return "J9101{" +
  102 + "ip='" + ip + '\'' +
  103 + ", tcpPort=" + tcpPort +
  104 + ", udpPort=" + udpPort +
  105 + ", channel=" + channel +
  106 + ", type=" + type +
  107 + ", rate=" + rate +
  108 + '}';
  109 + }
  110 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.response;
  2 +
  3 +import io.netty.buffer.ByteBuf;
  4 +import io.netty.buffer.Unpooled;
  5 +
  6 +/**
  7 + * @author QingtaiJiang
  8 + * @date 2023/4/27 18:49
  9 + * @email qingtaij@163.com
  10 + */
  11 +public class J9102 extends Rs {
  12 +
  13 + // 通道号
  14 + Integer channel;
  15 +
  16 + // 控制指令
  17 + /**
  18 + * 0:关闭音视频传输指令;
  19 + * 1:切换码流(增加暂停和继续);
  20 + * 2:暂停该通道所有流的发送;
  21 + * 3:恢复暂停前流的发送,与暂停前的流类型一致;
  22 + * 4:关闭双向对讲
  23 + */
  24 + Integer command;
  25 +
  26 + // 数据类型
  27 + /**
  28 + * 0:关闭该通道有关的音视频数据;
  29 + * 1:只关闭该通道有关的音频,保留该通道
  30 + * 有关的视频;
  31 + * 2:只关闭该通道有关的视频,保留该通道
  32 + * 有关的音频
  33 + */
  34 + Integer closeType;
  35 +
  36 + // 数据类型
  37 + /**
  38 + * 0:主码流;
  39 + * 1:子码流
  40 + */
  41 + Integer streamType;
  42 +
  43 + @Override
  44 + public ByteBuf encode() {
  45 + ByteBuf buffer = Unpooled.buffer();
  46 + buffer.writeByte(channel);
  47 + buffer.writeByte(command);
  48 + buffer.writeByte(closeType);
  49 + buffer.writeByte(streamType);
  50 + return null;
  51 + }
  52 +
  53 +
  54 + public Integer getChannel() {
  55 + return channel;
  56 + }
  57 +
  58 + public void setChannel(Integer channel) {
  59 + this.channel = channel;
  60 + }
  61 +
  62 + public Integer getCommand() {
  63 + return command;
  64 + }
  65 +
  66 + public void setCommand(Integer command) {
  67 + this.command = command;
  68 + }
  69 +
  70 + public Integer getCloseType() {
  71 + return closeType;
  72 + }
  73 +
  74 + public void setCloseType(Integer closeType) {
  75 + this.closeType = closeType;
  76 + }
  77 +
  78 + public Integer getStreamType() {
  79 + return streamType;
  80 + }
  81 +
  82 + public void setStreamType(Integer streamType) {
  83 + this.streamType = streamType;
  84 + }
  85 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.proc.response;
  2 +
  3 +
  4 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  5 +import io.netty.buffer.ByteBuf;
  6 +
  7 +
  8 +/**
  9 + * @author QingtaiJiang
  10 + * @date 2021/8/30 18:54
  11 + * @email qingtaij@163.com
  12 + */
  13 +
  14 +public abstract class Rs {
  15 + private Header header;
  16 +
  17 + public abstract ByteBuf encode();
  18 +
  19 +
  20 + public Header getHeader() {
  21 + return header;
  22 + }
  23 +
  24 + public void setHeader(Header header) {
  25 + this.header = header;
  26 + }
  27 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.session;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.proc.Header;
  4 +import io.netty.channel.Channel;
  5 +import io.netty.util.AttributeKey;
  6 +import org.slf4j.Logger;
  7 +import org.slf4j.LoggerFactory;
  8 +
  9 +import java.util.concurrent.atomic.AtomicInteger;
  10 +
  11 +/**
  12 + * @author QingtaiJiang
  13 + * @date 2023/4/27 18:54
  14 + * @email qingtaij@163.com
  15 + */
  16 +public class Session {
  17 + private final static Logger log = LoggerFactory.getLogger(Session.class);
  18 +
  19 + public static final AttributeKey<Session> KEY = AttributeKey.newInstance(Session.class.getName());
  20 +
  21 + // Netty的channel
  22 + protected final Channel channel;
  23 +
  24 + // 原子类的自增ID
  25 + private final AtomicInteger serialNo = new AtomicInteger(0);
  26 +
  27 + // 是否注册成功
  28 + private boolean registered = false;
  29 +
  30 + // 设备ID
  31 + private String devId;
  32 +
  33 + // 创建时间
  34 + private final long creationTime;
  35 +
  36 + // 协议版本号
  37 + private Integer protocolVersion;
  38 +
  39 + private Header header;
  40 +
  41 + protected Session(Channel channel) {
  42 + this.channel = channel;
  43 + this.creationTime = System.currentTimeMillis();
  44 + }
  45 +
  46 + public void writeObject(Object message) {
  47 + log.info("<<<<<<<<<< cmd{},{}", this, message);
  48 + channel.writeAndFlush(message);
  49 + }
  50 +
  51 + /**
  52 + * 获得下一个流水号
  53 + *
  54 + * @return 流水号
  55 + */
  56 + public int nextSerialNo() {
  57 + int current;
  58 + int next;
  59 + do {
  60 + current = serialNo.get();
  61 + next = current > 0xffff ? 0 : current;
  62 + } while (!serialNo.compareAndSet(current, next + 1));
  63 + return next;
  64 + }
  65 +
  66 + /**
  67 + * 注册session
  68 + *
  69 + * @param devId 设备ID
  70 + */
  71 + public void register(String devId, Integer version, Header header) {
  72 + this.devId = devId;
  73 + this.registered = true;
  74 + this.protocolVersion = version;
  75 + this.header = header;
  76 + SessionManager.INSTANCE.put(devId, this);
  77 + }
  78 +
  79 + /**
  80 + * 获取设备号
  81 + *
  82 + * @return 设备号
  83 + */
  84 + public String getDevId() {
  85 + return devId;
  86 + }
  87 +
  88 +
  89 + public boolean isRegistered() {
  90 + return registered;
  91 + }
  92 +
  93 + public long getCreationTime() {
  94 + return creationTime;
  95 + }
  96 +
  97 + public Integer getProtocolVersion() {
  98 + return protocolVersion;
  99 + }
  100 +
  101 + public Header getHeader() {
  102 + return header;
  103 + }
  104 +
  105 + @Override
  106 + public String toString() {
  107 + return "[" +
  108 + "devId=" + devId +
  109 + ", reg=" + registered +
  110 + ", version=" + protocolVersion +
  111 + ",ip=" + channel.remoteAddress() +
  112 + ']';
  113 + }
  114 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.session;
  2 +
  3 +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
  4 +import io.netty.channel.Channel;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +
  8 +import java.util.Map;
  9 +import java.util.concurrent.ConcurrentHashMap;
  10 +import java.util.concurrent.SynchronousQueue;
  11 +import java.util.concurrent.TimeUnit;
  12 +
  13 +
  14 +/**
  15 + * @author QingtaiJiang
  16 + * @date 2023/4/27 19:54
  17 + * @email qingtaij@163.com
  18 + */
  19 +public enum SessionManager {
  20 + INSTANCE;
  21 + private final static Logger log = LoggerFactory.getLogger(SessionManager.class);
  22 +
  23 + // 用与消息的缓存
  24 + private final Map<String, SynchronousQueue<String>> topicSubscribers = new ConcurrentHashMap<>();
  25 +
  26 + // session的缓存
  27 + private final Map<Object, Session> sessionMap;
  28 +
  29 + SessionManager() {
  30 + this.sessionMap = new ConcurrentHashMap<>();
  31 + }
  32 +
  33 + /**
  34 + * 创建新的Session
  35 + *
  36 + * @param channel netty通道
  37 + * @return 创建的session对象
  38 + */
  39 + public Session newSession(Channel channel) {
  40 + return new Session(channel);
  41 + }
  42 +
  43 +
  44 + /**
  45 + * 获取指定设备的Session
  46 + *
  47 + * @param clientId 设备Id
  48 + * @return Session
  49 + */
  50 + public Session get(Object clientId) {
  51 + return sessionMap.get(clientId);
  52 + }
  53 +
  54 + /**
  55 + * 放入新设备连接的session
  56 + *
  57 + * @param clientId 设备ID
  58 + * @param newSession session
  59 + */
  60 + protected void put(Object clientId, Session newSession) {
  61 + sessionMap.put(clientId, newSession);
  62 + }
  63 +
  64 +
  65 + /**
  66 + * 发送同步消息,接收响应
  67 + * 默认超时时间6秒
  68 + */
  69 + public String request(Cmd cmd) {
  70 + // 默认6秒
  71 + int timeOut = 6000;
  72 + return request(cmd, timeOut);
  73 + }
  74 +
  75 + public String request(Cmd cmd, Integer timeOut) {
  76 + Session session = this.get(cmd.getDevId());
  77 + if (session == null) {
  78 + log.error("DevId: {} not online!", cmd.getDevId());
  79 + return "-1";
  80 + }
  81 + String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo());
  82 + SynchronousQueue<String> subscribe = subscribe(requestKey);
  83 + if (subscribe == null) {
  84 + log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey);
  85 + return "-1";
  86 + }
  87 + session.writeObject(cmd);
  88 + try {
  89 + return subscribe.poll(timeOut, TimeUnit.SECONDS);
  90 + } catch (InterruptedException e) {
  91 + log.warn("<<<<<<<<<< timeout" + session, e);
  92 + } finally {
  93 + this.unsubscribe(requestKey);
  94 + }
  95 + return null;
  96 + }
  97 +
  98 + public Boolean response(String devId, String respId, Long responseNo, String data) {
  99 + String requestKey = requestKey(devId, respId, responseNo);
  100 + SynchronousQueue<String> queue = topicSubscribers.get(requestKey);
  101 + if (queue != null) {
  102 + try {
  103 + return queue.offer(data, 2, TimeUnit.SECONDS);
  104 + } catch (InterruptedException e) {
  105 + log.error("{}", e.getMessage(), e);
  106 + }
  107 + }
  108 + log.warn("未找到对应回复指令,key:{} 消息:{} ", requestKey, data);
  109 + return false;
  110 + }
  111 +
  112 + private void unsubscribe(String key) {
  113 + topicSubscribers.remove(key);
  114 + }
  115 +
  116 + private SynchronousQueue<String> subscribe(String key) {
  117 + SynchronousQueue<String> queue = null;
  118 + if (!topicSubscribers.containsKey(key))
  119 + topicSubscribers.put(key, queue = new SynchronousQueue<String>());
  120 + return queue;
  121 + }
  122 +
  123 + private String requestKey(String devId, String respId, Long requestNo) {
  124 + return String.join("_", devId.replaceFirst("^0*", ""), respId, requestNo.toString());
  125 + }
  126 +
  127 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.util;
  2 +
  3 +/**
  4 + * 32位整型的二进制读写
  5 + */
  6 +public class Bin {
  7 +
  8 + private static final int[] bits = new int[32];
  9 +
  10 + static {
  11 + bits[0] = 1;
  12 + for (int i = 1; i < bits.length; i++) {
  13 + bits[i] = bits[i - 1] << 1;
  14 + }
  15 + }
  16 +
  17 + /**
  18 + * 读取n的第i位
  19 + *
  20 + * @param n int32
  21 + * @param i 取值范围0-31
  22 + */
  23 + public static boolean get(int n, int i) {
  24 + return (n & bits[i]) == bits[i];
  25 + }
  26 +
  27 + /**
  28 + * 不足位数从左边加0
  29 + */
  30 + public static String strHexPaddingLeft(String data, int length) {
  31 + int dataLength = data.length();
  32 + if (dataLength < length) {
  33 + StringBuilder dataBuilder = new StringBuilder(data);
  34 + for (int i = dataLength; i < length; i++) {
  35 + dataBuilder.insert(0, "0");
  36 + }
  37 + data = dataBuilder.toString();
  38 + }
  39 + return data;
  40 + }
  41 +}
... ...
src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java 0 → 100644
  1 +package com.genersoft.iot.vmp.jt1078.util;
  2 +
  3 +import org.slf4j.Logger;
  4 +import org.slf4j.LoggerFactory;
  5 +import org.springframework.core.io.Resource;
  6 +import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
  7 +import org.springframework.core.io.support.ResourcePatternResolver;
  8 +
  9 +import java.lang.annotation.Annotation;
  10 +import java.util.LinkedList;
  11 +import java.util.List;
  12 +
  13 +public class ClassUtil {
  14 +
  15 + private static final Logger logger = LoggerFactory.getLogger(ClassUtil.class);
  16 +
  17 +
  18 + public static Object getBean(Class<?> clazz) {
  19 + if (clazz != null) {
  20 + try {
  21 + return clazz.getDeclaredConstructor().newInstance();
  22 + } catch (Exception ex) {
  23 + logger.error("ClassUtil:找不到指定的类", ex);
  24 + }
  25 + }
  26 + return null;
  27 + }
  28 +
  29 +
  30 + public static Object getBean(String className) {
  31 + Class<?> clazz = null;
  32 + try {
  33 + clazz = Class.forName(className);
  34 + } catch (Exception ex) {
  35 + logger.error("ClassUtil:找不到指定的类");
  36 + }
  37 + if (clazz != null) {
  38 + try {
  39 + //获取声明的构造器--》创建实例
  40 + return clazz.getDeclaredConstructor().newInstance();
  41 + } catch (Exception ex) {
  42 + logger.error("ClassUtil:找不到指定的类", ex);
  43 + }
  44 + }
  45 + return null;
  46 + }
  47 +
  48 +
  49 + /**
  50 + * 获取包下所有带注解的class
  51 + *
  52 + * @param packageName 包名
  53 + * @param annotationClass 注解类型
  54 + * @return list
  55 + */
  56 + public static List<Class<?>> getClassList(String packageName, Class<? extends Annotation> annotationClass) {
  57 + List<Class<?>> classList = getClassList(packageName);
  58 + classList.removeIf(next -> !next.isAnnotationPresent(annotationClass));
  59 + return classList;
  60 + }
  61 +
  62 + public static List<Class<?>> getClassList(String... packageName) {
  63 + List<Class<?>> classList = new LinkedList<>();
  64 + for (String s : packageName) {
  65 + List<Class<?>> c = getClassList(s);
  66 + classList.addAll(c);
  67 + }
  68 + return classList;
  69 + }
  70 +
  71 + public static List<Class<?>> getClassList(String packageName) {
  72 + List<Class<?>> classList = new LinkedList<>();
  73 + try {
  74 + ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
  75 + Resource[] resources = resourcePatternResolver.getResources(packageName.replace(".", "/") + "/**/*.class");
  76 + for (Resource resource : resources) {
  77 + String url = resource.getURL().toString();
  78 +
  79 + String[] split = url.split(packageName.replace(".", "/"));
  80 + String s = split[split.length - 1];
  81 + String className = s.replace("/", ".");
  82 + className = className.substring(0, className.lastIndexOf("."));
  83 + doAddClass(classList, packageName + className);
  84 + }
  85 +
  86 + } catch (Exception e) {
  87 + throw new RuntimeException(e);
  88 + }
  89 + return classList;
  90 + }
  91 +
  92 + private static void doAddClass(List<Class<?>> classList, String className) {
  93 + Class<?> cls = loadClass(className, false);
  94 + classList.add(cls);
  95 + }
  96 +
  97 + public static Class<?> loadClass(String className, boolean isInitialized) {
  98 + Class<?> cls;
  99 + try {
  100 + cls = Class.forName(className, isInitialized, getClassLoader());
  101 + } catch (ClassNotFoundException e) {
  102 + throw new RuntimeException(e);
  103 + }
  104 + return cls;
  105 + }
  106 +
  107 +
  108 + public static ClassLoader getClassLoader() {
  109 + return Thread.currentThread().getContextClassLoader();
  110 + }
  111 +
  112 +}
... ...
src/main/resources/all-application.yml
... ... @@ -92,6 +92,15 @@ sip:
92 92 # 是否存储alarm信息
93 93 alarm: false
94 94  
  95 +# 做为JT1078服务器的配置
  96 +jt1078:
  97 + #[必须修改] 是否开启1078的服务
  98 + enable: true
  99 + #[必修修改] 1708设备接入的端口
  100 + port: 21078
  101 + #[可选] 设备鉴权的密码
  102 + password: admin123
  103 +
95 104 #zlm 默认服务器配置
96 105 media:
97 106 # [必须修改] zlm服务器唯一id,用于触发hook时区别是哪台服务器,general.mediaServerId
... ...