MessageProcessor.java 1.79 KB
package com.bsth.socket.manager;


import com.bsth.socket.protocol.Message;
import com.bsth.socket.protocol.Message02;
import com.bsth.util.ProtocolConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Map;

/**
 * @author Hill
 */
@Service
public class MessageProcessor {

	@Autowired
	private KafkaTemplate kafkaTemplate;
	
	private final static Logger log = LoggerFactory.getLogger(MessageProcessor.class);

	private ObjectMapper mapper = new ObjectMapper();

	private KafkaCallback callback = new KafkaCallback();
	
	public void process(Message msg) {
		try {
			if (msg.getCommand() == 0x02 || msg.getCommand() == 0x03) {
				Map<String, Object> data = ProtocolConverter.convert(msg);
				String json = mapper.writeValueAsString(data);
				ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("LGGJ_ELEC_VEHICLE", json);
				future.addCallback(callback);
			}
			MessageSender.getInstance().ack(msg);
		} catch (Exception e) {
			log.error("Processor msg:" + msg, e);
		}
	}

	private final static class KafkaCallback implements ListenableFutureCallback<SendResult<String, String>> {

		@Override
		public void onSuccess(SendResult<String, String> result) {
			log.info(result.getProducerRecord().value());
		}

		@Override
		public void onFailure(Throwable ex) {
			log.error("kafka发送电车数据异常", ex);
		}
	}
}