|
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
|
|
|
import com.zhili.zkstationserver.configuration.ServerConfig;
|
|
|
import com.zhili.zkstationserver.dto.ChargerState;
|
|
|
import com.zhili.zkstationserver.dto.ExchangeEvent;
|
|
|
+import com.zhili.zkstationserver.dto.ExchangeRecordInfo;
|
|
|
import com.zhili.zkstationserver.dto.ExchangeStartReply;
|
|
|
import com.zhili.zkstationserver.dto.StationConnInfo;
|
|
|
import com.zhili.zkstationserver.dto.StationMessage;
|
|
@@ -65,7 +66,7 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
|
|
|
StationMessage stationMessage = (StationMessage) msg;
|
|
|
int command = stationMessage.getCommand();
|
|
|
byte[] data = stationMessage.getData();
|
|
|
- log.info("command:" + command + ",data:" + ArrayUtils.toString(data));
|
|
|
+ //log.info("command:" + command + ",data:" + ArrayUtils.toString(data));
|
|
|
String stationCode;
|
|
|
StationMessage stationMessageReply;
|
|
|
byte[] replyBytes;
|
|
@@ -138,9 +139,9 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
|
|
|
}
|
|
|
//为了削峰,将对象构建后,放到kafka中,由业务中心处理。
|
|
|
ExchangeEvent exchangeEvent = new ExchangeEvent(stationCode, data);
|
|
|
- log.info("exchangeEvent:" + exchangeEvent);
|
|
|
+ //log.info("exchangeEvent:" + exchangeEvent);
|
|
|
topic = serverConfig.getServiceCenterName() + "_ExchangeEvent";
|
|
|
- log.info("send to {}", topic);
|
|
|
+ //log.info("send to {}", topic);
|
|
|
kafkaTemplate.send(topic, JSON.toJSONString(exchangeEvent));
|
|
|
break;
|
|
|
case CommandType.CHARGER_STATE:
|
|
@@ -178,6 +179,25 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
|
|
|
log.info("启动换电回复:{}",startSwapReply);
|
|
|
kafkaTemplate.send(topic, JSON.toJSONString(startSwapReply));
|
|
|
break;
|
|
|
+ case CommandType.EXCHANGE_RECORD:
|
|
|
+ stationCode = (String) Optional.ofNullable(ctx.channel().attr(AttributeKey.valueOf("station")).get()).orElse("");
|
|
|
+ if (StringUtils.isEmpty(stationCode)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ //log.info("上传换电记录:{}",data);
|
|
|
+ ExchangeRecordInfo exchangeRecordInfo = new ExchangeRecordInfo(stationCode,data);
|
|
|
+ topic = serverConfig.getServiceCenterName() + "_UploadExchangeRecord";
|
|
|
+ log.info("换电记录上传:{}",exchangeRecordInfo);
|
|
|
+ kafkaTemplate.send(topic, JSON.toJSONString(exchangeRecordInfo));
|
|
|
+ stationMessageReply = new StationMessage();
|
|
|
+ stationMessageReply.setCommand(CommandType.EXCHANGE_RECORD_REPLY);
|
|
|
+ replyBytes = new byte[65];
|
|
|
+ BytesUtil.setSubBytes(replyBytes,0, BytesUtil.fromString(exchangeRecordInfo.getExchangeNo(),32,(byte)0));
|
|
|
+ BytesUtil.setSubBytes(replyBytes,32, BytesUtil.fromString(exchangeRecordInfo.getSerialNo(),32,(byte)0));
|
|
|
+ replyBytes[64] = (byte) 0;
|
|
|
+ stationMessageReply.setData(replyBytes);
|
|
|
+ ctx.writeAndFlush(stationMessageReply);
|
|
|
+ break;
|
|
|
default:
|
|
|
break;
|
|
|
}
|