Commit 29f7a6b6eba350f2c49b744f110d1aa033f77d02

Authored by 648540858
1 parent c25a99d6

修复多wvp模式推流时信息存储错误

src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java
@@ -29,12 +29,12 @@ public class WvpRedisMsg { @@ -29,12 +29,12 @@ public class WvpRedisMsg {
29 * 消息的ID 29 * 消息的ID
30 */ 30 */
31 private String serial; 31 private String serial;
32 - private Object content; 32 + private String content;
33 33
34 private final static String requestTag = "req"; 34 private final static String requestTag = "req";
35 private final static String responseTag = "res"; 35 private final static String responseTag = "res";
36 36
37 - public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, Object content) { 37 + public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, String content) {
38 WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); 38 WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
39 wvpRedisMsg.setType(requestTag); 39 wvpRedisMsg.setType(requestTag);
40 wvpRedisMsg.setFromId(fromId); 40 wvpRedisMsg.setFromId(fromId);
@@ -51,7 +51,7 @@ public class WvpRedisMsg { @@ -51,7 +51,7 @@ public class WvpRedisMsg {
51 return wvpRedisMsg; 51 return wvpRedisMsg;
52 } 52 }
53 53
54 - public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, Object content) { 54 + public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, String content) {
55 WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); 55 WvpRedisMsg wvpRedisMsg = new WvpRedisMsg();
56 wvpRedisMsg.setType(responseTag); 56 wvpRedisMsg.setType(responseTag);
57 wvpRedisMsg.setFromId(fromId); 57 wvpRedisMsg.setFromId(fromId);
@@ -106,11 +106,11 @@ public class WvpRedisMsg { @@ -106,11 +106,11 @@ public class WvpRedisMsg {
106 this.cmd = cmd; 106 this.cmd = cmd;
107 } 107 }
108 108
109 - public Object getContent() { 109 + public String getContent() {
110 return content; 110 return content;
111 } 111 }
112 112
113 - public void setContent(Object content) { 113 + public void setContent(String content) {
114 this.content = content; 114 this.content = content;
115 } 115 }
116 } 116 }
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java
@@ -113,8 +113,8 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -113,8 +113,8 @@ public class RedisGbPlayMsgListener implements MessageListener {
113 while (!taskQueue.isEmpty()) { 113 while (!taskQueue.isEmpty()) {
114 Message msg = taskQueue.poll(); 114 Message msg = taskQueue.poll();
115 try { 115 try {
116 - JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);  
117 - WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); 116 + WvpRedisMsg wvpRedisMsg = JSON.parseObject(msg.getBody(), WvpRedisMsg.class);
  117 + logger.info("[收到REDIS通知] 消息: {}", JSON.toJSONString(wvpRedisMsg));
118 if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { 118 if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
119 continue; 119 continue;
120 } 120 }
@@ -123,7 +123,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -123,7 +123,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
123 123
124 switch (wvpRedisMsg.getCmd()){ 124 switch (wvpRedisMsg.getCmd()){
125 case WvpRedisMsgCmd.GET_SEND_ITEM: 125 case WvpRedisMsgCmd.GET_SEND_ITEM:
126 - RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); 126 + RequestSendItemMsg content = JSON.parseObject(wvpRedisMsg.getContent(), RequestSendItemMsg.class);
127 requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); 127 requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
128 break; 128 break;
129 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: 129 case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
@@ -242,7 +242,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -242,7 +242,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
242 result.setData(content); 242 result.setData(content);
243 243
244 WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, 244 WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
245 - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); 245 + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
246 JSONObject jsonObject = (JSONObject)JSON.toJSON(response); 246 JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
247 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); 247 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
248 } 248 }
@@ -260,7 +260,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -260,7 +260,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
260 result.setMsg("流媒体不存在"); 260 result.setMsg("流媒体不存在");
261 261
262 WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, 262 WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
263 - WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); 263 + WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result));
264 264
265 JSONObject jsonObject = (JSONObject)JSON.toJSON(response); 265 JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
266 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); 266 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -283,7 +283,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -283,7 +283,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
283 WVPResult<SendRtpItem> result = new WVPResult<>(); 283 WVPResult<SendRtpItem> result = new WVPResult<>();
284 result.setCode(ERROR_CODE_TIMEOUT); 284 result.setCode(ERROR_CODE_TIMEOUT);
285 WvpRedisMsg response = WvpRedisMsg.getResponseInstance( 285 WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
286 - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result 286 + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
287 ); 287 );
288 JSONObject jsonObject = (JSONObject)JSON.toJSON(response); 288 JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
289 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); 289 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -324,7 +324,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -324,7 +324,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
324 result.setData(responseSendItemMsg); 324 result.setData(responseSendItemMsg);
325 325
326 WvpRedisMsg response = WvpRedisMsg.getResponseInstance( 326 WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
327 - userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result 327 + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
328 ); 328 );
329 JSONObject jsonObject = (JSONObject)JSON.toJSON(response); 329 JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
330 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); 330 redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -350,7 +350,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -350,7 +350,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
350 requestSendItemMsg.setServerId(serverId); 350 requestSendItemMsg.setServerId(serverId);
351 String key = UUID.randomUUID().toString(); 351 String key = UUID.randomUUID().toString();
352 WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, 352 WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM,
353 - key, requestSendItemMsg); 353 + key, JSON.toJSONString(requestSendItemMsg));
354 354
355 JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); 355 JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
356 logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject); 356 logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject);
@@ -375,7 +375,7 @@ public class RedisGbPlayMsgListener implements MessageListener { @@ -375,7 +375,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
375 public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { 375 public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
376 String key = UUID.randomUUID().toString(); 376 String key = UUID.randomUUID().toString();
377 WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, 377 WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
378 - WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); 378 + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, JSON.toJSONString(param));
379 379
380 JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); 380 JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
381 logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject); 381 logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject);
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
@@ -13,9 +13,9 @@ import java.util.List; @@ -13,9 +13,9 @@ import java.util.List;
13 public interface StreamPushMapper { 13 public interface StreamPushMapper {
14 14
15 @Insert("INSERT INTO wvp_stream_push (app, stream, total_reader_count, origin_type, origin_type_str, " + 15 @Insert("INSERT INTO wvp_stream_push (app, stream, total_reader_count, origin_type, origin_type_str, " +
16 - "push_time, alive_second, media_server_id, update_time, create_time, push_ing, self) VALUES" + 16 + "push_time, alive_second, media_server_id, server_id, update_time, create_time, push_ing, self) VALUES" +
17 "(#{app}, #{stream}, #{totalReaderCount}, #{originType}, #{originTypeStr}, " + 17 "(#{app}, #{stream}, #{totalReaderCount}, #{originType}, #{originTypeStr}, " +
18 - "#{pushTime}, #{aliveSecond}, #{mediaServerId} , #{updateTime} , #{createTime}, " + 18 + "#{pushTime}, #{aliveSecond}, #{mediaServerId} , #{serverId} , #{updateTime} , #{createTime}, " +
19 "#{pushIng}, #{self} )") 19 "#{pushIng}, #{self} )")
20 int add(StreamPushItem streamPushItem); 20 int add(StreamPushItem streamPushItem);
21 21
@@ -24,6 +24,7 @@ public interface StreamPushMapper { @@ -24,6 +24,7 @@ public interface StreamPushMapper {
24 "UPDATE wvp_stream_push " + 24 "UPDATE wvp_stream_push " +
25 "SET update_time=#{updateTime}" + 25 "SET update_time=#{updateTime}" +
26 "<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" + 26 "<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" +
  27 + "<if test=\"serverId != null\">, server_id=#{serverId}</if>" +
27 "<if test=\"totalReaderCount != null\">, total_reader_count=#{totalReaderCount}</if>" + 28 "<if test=\"totalReaderCount != null\">, total_reader_count=#{totalReaderCount}</if>" +
28 "<if test=\"originType != null\">, origin_type=#{originType}</if>" + 29 "<if test=\"originType != null\">, origin_type=#{originType}</if>" +
29 "<if test=\"originTypeStr != null\">, origin_type_str=#{originTypeStr}</if>" + 30 "<if test=\"originTypeStr != null\">, origin_type_str=#{originTypeStr}</if>" +
@@ -89,10 +90,10 @@ public interface StreamPushMapper { @@ -89,10 +90,10 @@ public interface StreamPushMapper {
89 90
90 @Insert("<script>" + 91 @Insert("<script>" +
91 "Insert INTO wvp_stream_push (app, stream, total_reader_count, origin_type, origin_type_str, " + 92 "Insert INTO wvp_stream_push (app, stream, total_reader_count, origin_type, origin_type_str, " +
92 - "create_time, alive_second, media_server_id, status, push_ing) " + 93 + "create_time, alive_second, media_server_id, server_id, status, push_ing) " +
93 "VALUES <foreach collection='streamPushItems' item='item' index='index' separator=','>" + 94 "VALUES <foreach collection='streamPushItems' item='item' index='index' separator=','>" +
94 "( #{item.app}, #{item.stream}, #{item.totalReaderCount}, #{item.originType}, " + 95 "( #{item.app}, #{item.stream}, #{item.totalReaderCount}, #{item.originType}, " +
95 - "#{item.originTypeStr},#{item.createTime}, #{item.aliveSecond}, #{item.mediaServerId}, #{item.status} ," + 96 + "#{item.originTypeStr},#{item.createTime}, #{item.aliveSecond}, #{item.mediaServerId},#{item.serverId}, #{item.status} ," +
96 " #{item.pushIng} )" + 97 " #{item.pushIng} )" +
97 " </foreach>" + 98 " </foreach>" +
98 "</script>") 99 "</script>")
数据库/初始化-mysql-2.6.9.sql
@@ -261,6 +261,7 @@ create table wvp_stream_push ( @@ -261,6 +261,7 @@ create table wvp_stream_push (
261 create_time character varying(50), 261 create_time character varying(50),
262 alive_second integer, 262 alive_second integer,
263 media_server_id character varying(50), 263 media_server_id character varying(50),
  264 + server_id character varying(50),
264 push_time character varying(50), 265 push_time character varying(50),
265 status bool default false, 266 status bool default false,
266 update_time character varying(50), 267 update_time character varying(50),
数据库/初始化-postgresql-kingbase-2.6.9.sql
@@ -261,6 +261,7 @@ create table wvp_stream_push ( @@ -261,6 +261,7 @@ create table wvp_stream_push (
261 create_time character varying(50), 261 create_time character varying(50),
262 alive_second integer, 262 alive_second integer,
263 media_server_id character varying(50), 263 media_server_id character varying(50),
  264 + server_id character varying(50),
264 push_time character varying(50), 265 push_time character varying(50),
265 status bool default false, 266 status bool default false,
266 update_time character varying(50), 267 update_time character varying(50),
数据库/更新-mysql-2.6.9.sql
@@ -497,4 +497,7 @@ alter table wvp_media_server @@ -497,4 +497,7 @@ alter table wvp_media_server
497 alter table wvp_media_server 497 alter table wvp_media_server
498 add record_day integer default 7; 498 add record_day integer default 7;
499 499
  500 +alter table wvp_stream_push
  501 + add server_id character varying(50);
  502 +
500 503
数据库/更新-postgresql-kingbase-2.6.9.sql
@@ -498,5 +498,8 @@ alter table wvp_media_server @@ -498,5 +498,8 @@ alter table wvp_media_server
498 alter table wvp_media_server 498 alter table wvp_media_server
499 add record_day integer default 7; 499 add record_day integer default 7;
500 500
  501 +alter table wvp_stream_push
  502 + add server_id character varying(50);
  503 +
501 504
502 505