MessageService.java 6.45 KB
package com.bsth.service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import com.bsth.socket.protocol.Message;

/**
 * @author Hill
 */
@Service
public class MessageService implements InitializingBean, DisposableBean {
	
	private final static Logger log = LoggerFactory.getLogger(MessageService.class);
	
	private Map<String, Queue<Message>> device_msgs = new ConcurrentHashMap<String, Queue<Message>>();
	private ScheduledExecutorService sexec = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
		
		@Override
		public Thread newThread(Runnable r) {
			// TODO Auto-generated method stub
			Thread t = new Thread(r);
			t.setName("MessageServiceExecutor");
			return t;
		}
	});
	private ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();
	private ExecutorService exec = Executors.newFixedThreadPool(5, new ThreadFactory() {
		
		@Override
		public Thread newThread(Runnable r) {
			// TODO Auto-generated method stub
			Thread t = new Thread(r);
			t.setName("MessageUpPool");
			return t;
		}
	});
	
	/**
	 * 消息下发
	 * @param msg
	 */
	public void down(Message msg) {
		// 0x64协议在下发时缓存此消息 以便确认时对应
		/*if (msg.getCommandType() == 0x64) {
			String deviceId = msg.getMessageBody().getDeviceId();
			Queue<Message> msgs = device_msgs.get(deviceId);
			if (msgs == null) {
				msgs = new ConcurrentLinkedQueue<>();
				device_msgs.put(deviceId, msgs);
			}
			msgs.add(msg);
		}
		MessageSender.getInstance().send(msg);*/
	}
	
	public void up(Message msg) {
		if (msg != null) queue.add(msg);
	}
	
	/**
	 * 消息上传(包括应答)
	 * @param msg
	 */
	private void upHandle(Message msg) {

	}
	
	@SuppressWarnings("unchecked")
	public Map<String, Object> request(Map<String, Object> map, String url) {
		InputStream in = null;
		OutputStream out = null;
		HttpURLConnection con = null;
		try {
			con = (HttpURLConnection)new URL(url).openConnection();
			con.setRequestMethod("POST");
			con.setRequestProperty("keep-alive", "true");
			con.setRequestProperty("Connection", "Close");
			con.setRequestProperty("accept", "*/*");
			con.setDoInput(true);
			con.setDoOutput(true);
			con.setReadTimeout(2500);
			con.setConnectTimeout(2500);
			out = con.getOutputStream();
			out.write("json=".getBytes());
			out.write(new ObjectMapper().writeValueAsBytes(map));
			out.flush();
			if (con.getResponseCode() == 200) {
				in = con.getInputStream();
				ByteArrayOutputStream bout = new ByteArrayOutputStream();
				IOUtils.copy(in, bout); bout.close();
				log.info("Message:" + new String(bout.toByteArray()));
				return new ObjectMapper().readValue(bout.toByteArray(), Map.class);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			log.error("调用消息上传接口时发生异常|" + map, e);
		} finally {
			try {
				if (in != null) in.close();
				if (out != null) out.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				log.error("", e);
			}
			con.disconnect();
		}
		return null;
	}
	
	@SuppressWarnings("unchecked")
	public Map<String, Object> requestJson(Map<String, Object> map, String url) {
		InputStream in = null;
		OutputStream out = null;
		HttpURLConnection con = null;
		try {
			con = (HttpURLConnection)new URL(url).openConnection();
			con.setRequestMethod("POST");
			con.setRequestProperty("keep-alive", "true");
			con.setRequestProperty("Connection", "Close");
			con.setRequestProperty("accept", "application/json");
			con.setRequestProperty("content-type", "application/json");
			con.setDoInput(true);
			con.setDoOutput(true);
			con.setReadTimeout(2500);
			con.setConnectTimeout(2500);
			out = con.getOutputStream();
			out.write(new ObjectMapper().writeValueAsBytes(map));
			out.flush();
			if (con.getResponseCode() == 200) {
				in = con.getInputStream();
				ByteArrayOutputStream bout = new ByteArrayOutputStream();
				IOUtils.copy(in, bout); bout.close();
				log.info("Message:" + new String(bout.toByteArray()));
				return new ObjectMapper().readValue(bout.toByteArray(), Map.class);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			log.error("调用消息上传接口时发生异常|" + map, e);
		} finally {
			try {
				if (in != null) in.close();
				if (out != null) out.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				log.error("", e);
			}
			con.disconnect();
		}
		return null;
	}
	
	public Queue<Message> getMessageByDevice(String deviceId) {
		return device_msgs.get(deviceId);
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		// TODO Auto-generated method stub
		//ShareExecutors.count.set(5);
		sexec.scheduleAtFixedRate(new MessageWorker(), 30, 30, TimeUnit.SECONDS);
		exec.submit(new MessageUpWorker());
	}
	
	final class MessageWorker implements Runnable {
		
		@Override
		public void run() {
			// TODO Auto-generated method stub

		}
	}
	
	final class MessageUpWorker implements Runnable {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			while (true) {
				try {
					final Message msg = queue.poll();
					if (msg == null) {
						try {
							Thread.sleep(50);
						} catch (InterruptedException e) {
							log.error("sleep", e);
						}
						continue;
					}
					exec.submit(new Runnable() {
						
						@Override
						public void run() {
							// TODO Auto-generated method stub
							try {
								upHandle(msg);
							} catch(Exception e) {
								log.error("MessageUpWorker handle exception|" + msg, e);
							}
						}
					});
				} catch (Exception e) {
					log.error("MessageUpPool handle exception", e);
				}
			}
		}
	}

	@Override
	public void destroy() throws Exception {
		// TODO Auto-generated method stub
		exec.shutdown();
		sexec.shutdown();
	}
}