StreamProxyTask.java 2.6 KB
package com.genersoft.iot.vmp.conf;

import com.genersoft.iot.vmp.service.StremProxyService1078;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public class StreamProxyTask {
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private StremProxyService1078 stremProxyService1078;
    private Logger logger = LoggerFactory.getLogger(StreamProxyTask.class);

    public static final Long TIME_OUT = 180L;

    @Scheduled(cron = "0 15 * * * ? ")
    public void work() {
        Set<String> keys = redisTemplate.keys("tag:history:port:*");
        if (CollectionUtils.isEmpty(keys)) {
            return;
        }
        Long nowDate = new Date().getTime();
        for (String key : keys) {
            String stream = StringUtils.substringAfter(key, "tag:history:port:");
            if (StringUtils.isEmpty(stream)) {
                continue;
            }

            Object value = redisTemplate.opsForValue().get("tag:history:httpPort:time:"+stream);
            if(Objects.isNull(value)){
                sendIORequestStop(stream);
                continue;
            }
            try {
                Long val = (Long) value;
                if(val == 0L){
                    sendIORequestStop(stream);
                    continue;
                }

                Long val1 = (nowDate-val)/1000;
                if(val1 > TIME_OUT){
                    sendIORequestStop(stream);
                }

            }catch (Exception e){
                logger.error("[{}]停流失败",stream,e);
            }
        }

        // task execution logic
    }

    private void sendIORequestStop(String stream){
        try {
            String sim = StringUtils.substringBeforeLast(stream,"-");
            String channel =  StringUtils.substringAfterLast(stream,"-");
            Object port = redisTemplate.opsForValue().get("tag:history:port:" + stream);
            Object portHtt = redisTemplate.opsForValue().get("tag:history:httpPort:" + stream);
            stremProxyService1078.sendIORequestStop(sim,channel,stream, (Integer) port, (Integer) portHtt);

        }catch (Exception e){
            logger.error("[{}]停流失败",stream,e);
        }

    }
}