MessageSender.java 2.35 KB
package com.bsth.socket.manager;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bsth.socket.protocol.Message;
import com.bsth.util.AckUtil;

/**
 * @author Hill
 */
public class MessageSender {
	
	private final static Logger log = LoggerFactory.getLogger(MessageSender.class);
	
	private static MessageSender sender = new MessageSender();
	private ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<Message>();
	private ExecutorService exec = Executors.newFixedThreadPool(5, new ThreadFactory() {
		
		@Override
		public Thread newThread(Runnable r) {
			// TODO Auto-generated method stub
			Thread t = new Thread(r);
			t.setName("MessageSendPool");
			return t;
		}
	});
	
	private MessageSender() {
		exec.submit(new MessageSendWorker());
	}

	public static MessageSender getInstance() {
		return sender;
	}
	
	public void ack(Message msg) {
		Message ack = AckUtil.generateAckMessage(msg);
		if (ack.getMessageBody() != null) {
			queue.add(ack);
		}
	}
	
	public void send(Message msg) {
		queue.add(msg);
	}
	
	final class MessageSendWorker implements Runnable {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			while (true) {
				try {
					final Message msg = queue.poll();
					if (msg == null) {
						try {
							Thread.sleep(50);
						} catch (InterruptedException e) {
							log.error("sender sleep", e);
						}
						continue;
					}
					exec.submit(new Runnable() {
						
						@Override
						public void run() {
							// TODO Auto-generated method stub
							try {
								if (MessageSessionManager.getInstance().getSession(msg.getVin()) != null)
									MessageSessionManager.getInstance().getSession(msg.getVin()).write(msg.write());
								else {
									StringBuilder sb = new StringBuilder();
									sb.append(" vin:")
									.append(msg.getVin())
									.append(" 无会话信息,不发送消息.消息:")
									.append(msg.toString());
									
									log.warn(sb.toString());
								}
							} catch (Exception e) {
								log.error("MessageSendWorker handle exception|" + msg , e);
							}
						}
					});
				} catch (Exception e) {
					log.error("MessageSendPool handle exception");
				}
			}
		}
	}
}