Commit 3d2aeb890b0d6bc13ea44ff6e0d5764bcf7aa529

Authored by 648540858
1 parent 1a827a4d

使用线程安全的map存储订阅信息

修改点播消息内容,提升兼容性
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -113,6 +113,7 @@ public class SipLayer implements SipListener { @@ -113,6 +113,7 @@ public class SipLayer implements SipListener {
113 */ 113 */
114 @Override 114 @Override
115 public void processRequest(RequestEvent evt) { 115 public void processRequest(RequestEvent evt) {
  116 + logger.debug(evt.getRequest().toString());
116 // 由于jainsip是单线程程序,为提高性能并发处理 117 // 由于jainsip是单线程程序,为提高性能并发处理
117 processThreadPool.execute(() -> { 118 processThreadPool.execute(() -> {
118 processorFactory.createRequestProcessor(evt).process(); 119 processorFactory.createRequestProcessor(evt).process();
@@ -122,6 +123,7 @@ public class SipLayer implements SipListener { @@ -122,6 +123,7 @@ public class SipLayer implements SipListener {
122 @Override 123 @Override
123 public void processResponse(ResponseEvent evt) { 124 public void processResponse(ResponseEvent evt) {
124 Response response = evt.getResponse(); 125 Response response = evt.getResponse();
  126 + logger.debug(evt.getResponse().toString());
125 int status = response.getStatusCode(); 127 int status = response.getStatusCode();
126 if ((status >= 200) && (status < 300)) { // Success! 128 if ((status >= 200) && (status < 300)) { // Success!
127 ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt); 129 ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -84,8 +84,7 @@ public class SIPRequestHeaderProvider { @@ -84,8 +84,7 @@ public class SIPRequestHeaderProvider {
84 SipURI requestLine = sipFactory.createAddressFactory().createSipURI(channelId, host.getAddress()); 84 SipURI requestLine = sipFactory.createAddressFactory().createSipURI(channelId, host.getAddress());
85 //via 85 //via
86 ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); 86 ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
87 - // ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getSipIp(), sipConfig.getSipPort(), device.getTransport(), viaTag);  
88 - ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getHost().getIp(), device.getHost().getPort(), device.getTransport(), viaTag); 87 + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getSipIp(), sipConfig.getSipPort(), device.getTransport(), viaTag);
89 viaHeader.setRPort(); 88 viaHeader.setRPort();
90 viaHeaders.add(viaHeader); 89 viaHeaders.add(viaHeader);
91 //from 90 //from
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java
@@ -21,14 +21,12 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { @@ -21,14 +21,12 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
21 * 处理 ACK请求 21 * 处理 ACK请求
22 * 22 *
23 * @param evt 23 * @param evt
24 - * @param layer  
25 - * @param transaction  
26 - * @param config  
27 - */ 24 + */
28 @Override 25 @Override
29 public void process(RequestEvent evt) { 26 public void process(RequestEvent evt) {
30 Request request = evt.getRequest(); 27 Request request = evt.getRequest();
31 Dialog dialog = evt.getDialog(); 28 Dialog dialog = evt.getDialog();
  29 + if (dialog == null) return;
32 try { 30 try {
33 Request ackRequest = null; 31 Request ackRequest = null;
34 CSeq csReq = (CSeq) request.getHeader(CSeq.NAME); 32 CSeq csReq = (CSeq) request.getHeader(CSeq.NAME);
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/InviteResponseProcessor.java
@@ -12,6 +12,7 @@ import javax.sip.header.ViaHeader; @@ -12,6 +12,7 @@ import javax.sip.header.ViaHeader;
12 import javax.sip.message.Request; 12 import javax.sip.message.Request;
13 import javax.sip.message.Response; 13 import javax.sip.message.Response;
14 14
  15 +import gov.nist.javax.sip.header.CSeq;
15 import org.slf4j.Logger; 16 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory; 17 import org.slf4j.LoggerFactory;
17 import org.springframework.stereotype.Component; 18 import org.springframework.stereotype.Component;
@@ -23,9 +24,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor; @@ -23,9 +24,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor;
23 24
24 25
25 /** 26 /**
26 - * @Description:处理INVITE响应 27 + * @Description:处理INVITE响应
27 * @author: swwheihei 28 * @author: swwheihei
28 - * @date: 2020年5月3日 下午4:43:52 29 + * @date: 2020年5月3日 下午4:43:52
29 */ 30 */
30 @Component 31 @Component
31 public class InviteResponseProcessor implements ISIPResponseProcessor { 32 public class InviteResponseProcessor implements ISIPResponseProcessor {
@@ -49,49 +50,11 @@ public class InviteResponseProcessor implements ISIPResponseProcessor { @@ -49,49 +50,11 @@ public class InviteResponseProcessor implements ISIPResponseProcessor {
49 // 成功响应 50 // 成功响应
50 // 下发ack 51 // 下发ack
51 if (statusCode == Response.OK) { 52 if (statusCode == Response.OK) {
52 - // ClientTransaction clientTransaction = evt.getClientTransaction();  
53 - // if(clientTransaction == null){  
54 - // logger.error("回复ACK时,clientTransaction为null >>> {}",response);  
55 - // return;  
56 - // }  
57 - // Dialog clientDialog = clientTransaction.getDialog();  
58 -  
59 - // CSeqHeader clientCSeqHeader = (CSeqHeader)  
60 - // response.getHeader(CSeqHeader.NAME);  
61 - // long cseqId = clientCSeqHeader.getSeqNumber();  
62 - // /*  
63 - // createAck函数,创建的ackRequest,会采用Invite响应的200OK,中的contact字段中的地址,作为目标地址。  
64 - // 有的终端传上来的可能还是内网地址,会造成ack发送不出去。接受不到音视频流  
65 - // 所以在此处统一替换地址。和响应消息的Via头中的地址保持一致。  
66 - // */  
67 - // Request ackRequest = clientDialog.createAck(cseqId);  
68 - // SipURI requestURI = (SipURI) ackRequest.getRequestURI();  
69 - // ViaHeader viaHeader = (ViaHeader) response.getHeader(ViaHeader.NAME);  
70 - // try {  
71 - // requestURI.setHost(viaHeader.getHost());  
72 - // } catch (Exception e) {  
73 - // e.printStackTrace();  
74 - // }  
75 - // requestURI.setPort(viaHeader.getPort());  
76 - // clientDialog.sendAck(ackRequest);  
77 -  
78 Dialog dialog = evt.getDialog(); 53 Dialog dialog = evt.getDialog();
79 CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME); 54 CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
80 Request reqAck = dialog.createAck(cseq.getSeqNumber()); 55 Request reqAck = dialog.createAck(cseq.getSeqNumber());
81 -  
82 - SipURI requestURI = (SipURI) reqAck.getRequestURI();  
83 - ViaHeader viaHeader = (ViaHeader) response.getHeader(ViaHeader.NAME);  
84 - // String viaHost =viaHeader.getHost();  
85 - //getHost()函数取回的IP地址是“[xxx.xxx.xxx.xxx:yyyy]”的格式,需用正则表达式截取为“xxx.xxx.xxx.xxx"格式  
86 - // Pattern p = Pattern.compile("(?<=//|)((\\w)+\\.)+\\w+");  
87 - // Matcher matcher = p.matcher(viaHeader.getHost());  
88 - // if (matcher.find()) {  
89 - // requestURI.setHost(matcher.group());  
90 - // }  
91 - requestURI.setHost(viaHeader.getHost());  
92 - requestURI.setPort(viaHeader.getPort());  
93 - reqAck.setRequestURI(requestURI);  
94 dialog.sendAck(reqAck); 56 dialog.sendAck(reqAck);
  57 +
95 } 58 }
96 } catch (InvalidArgumentException | SipException e) { 59 } catch (InvalidArgumentException | SipException e) {
97 e.printStackTrace(); 60 e.printStackTrace();
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
@@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Value; @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Value;
13 import org.springframework.http.HttpStatus; 13 import org.springframework.http.HttpStatus;
14 import org.springframework.http.ResponseEntity; 14 import org.springframework.http.ResponseEntity;
15 import org.springframework.stereotype.Component; 15 import org.springframework.stereotype.Component;
  16 +import org.springframework.util.ConcurrentReferenceHashMap;
16 import org.springframework.web.bind.annotation.*; 17 import org.springframework.web.bind.annotation.*;
17 18
18 import javax.servlet.http.HttpServletRequest; 19 import javax.servlet.http.HttpServletRequest;
@@ -20,6 +21,7 @@ import java.math.BigInteger; @@ -20,6 +21,7 @@ import java.math.BigInteger;
20 import java.text.DecimalFormat; 21 import java.text.DecimalFormat;
21 import java.util.HashMap; 22 import java.util.HashMap;
22 import java.util.Map; 23 import java.util.Map;
  24 +import java.util.concurrent.ConcurrentHashMap;
23 25
24 /** 26 /**
25 * @Description:针对 ZLMediaServer的hook事件订阅 27 * @Description:针对 ZLMediaServer的hook事件订阅
@@ -50,7 +52,7 @@ public class ZLMHttpHookSubscribe { @@ -50,7 +52,7 @@ public class ZLMHttpHookSubscribe {
50 void response(JSONObject response); 52 void response(JSONObject response);
51 } 53 }
52 54
53 - private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new HashMap<>(); 55 + private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
54 56
55 public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) { 57 public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {
56 Map<JSONObject, Event> eventMap = allSubscribes.get(type); 58 Map<JSONObject, Event> eventMap = allSubscribes.get(type);