Browse Source

Merge branch 'master' of http://git.fast-fun.cn:92/ruili/zk-station-server

zhaoyuanqi 1 year ago
parent
commit
a045524f08

+ 20 - 4
src/main/java/com/zhili/zkstationserver/component/KafkaMessageHandler.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.zhili.zkstationserver.configuration.ServerConfig;
 import com.zhili.zkstationserver.dto.StationControlEvent;
 import com.zhili.zkstationserver.dto.StationMessage;
+import com.zhili.zkstationserver.util.BytesUtil;
 import com.zhili.zkstationserver.util.StationCenter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.curator.framework.CuratorFramework;
@@ -16,6 +17,7 @@ import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.io.UnsupportedEncodingException;
 
 @Component
 @Slf4j
@@ -33,13 +35,27 @@ public class KafkaMessageHandler {
     }
 
     @KafkaListener(topics = {"#{serverConfig.getInstanceId()}"}, groupId = "#{serverConfig.getServiceName()}")
-    public void processExchangeEvent(ConsumerRecord<String, String> record, Acknowledgment ack){
-        log.info("recv:"+record.value());
+    public void processExchangeEvent(ConsumerRecord<String, String> record, Acknowledgment ack) throws UnsupportedEncodingException {
+        log.info("recv************:"+record.value());
         StationControlEvent stationControlEvent = JSON.parseObject(record.value(), StationControlEvent.class);
         if(stationControlEvent.getType().equals("startExchange")){
             StationMessage stationMessage = new StationMessage();
-            stationMessage.setCommand(504);
-            stationMessage.setData(new byte[2]);
+            stationMessage.setCommand(StationMessage.CommandType.START_EXCHANGE);
+            byte[] data = new byte[144];
+            BytesUtil.setSubBytes(data,0, BytesUtil.fromString(stationControlEvent.getExchangeNo(),32,(byte)0));
+            BytesUtil.setSubBytes(data,32, BytesUtil.fromString(stationControlEvent.getCmdNo(),32,(byte)0));
+            BytesUtil.setSubBytes(data,64, BytesUtil.fromIntWithLowerFirst(stationControlEvent.getControlSource(),1));
+            BytesUtil.setSubBytes(data,65, BytesUtil.fromIntWithLowerFirst(Integer.valueOf(stationControlEvent.getDriverId()),4));
+            if(stationControlEvent.getMobile()!=null){
+                BytesUtil.setSubBytes(data,69, BytesUtil.fromString(stationControlEvent.getMobile(),11,(byte)0));
+            }
+            if(stationControlEvent.getName()!=null){
+                BytesUtil.setSubBytes(data,80, BytesUtil.fromStringUnicode(stationControlEvent.getName(),32,(byte)0));
+            }
+            if(stationControlEvent.getFleetName()!=null){
+                BytesUtil.setSubBytes(data,112, BytesUtil.fromStringUnicode(stationControlEvent.getFleetName(),32,(byte)0));
+            }
+            stationMessage.setData(data);
             StationCenter.sendToChannel(stationControlEvent.getStationConnInfo().getConnIdStr(),stationMessage);
         }
         ack.acknowledge();

+ 29 - 0
src/main/java/com/zhili/zkstationserver/dto/ExchangeStartReply.java

@@ -0,0 +1,29 @@
+package com.zhili.zkstationserver.dto;
+
+import com.zhili.zkstationserver.util.BytesUtil;
+import lombok.Data;
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * @author :jek
+ * @description:TODO
+ * @date :2023/2/8 15:28
+ */
+@Data
+public class ExchangeStartReply {
+    String stationCode;
+    String exchangeNo;
+    /**
+     * 指令编号
+     */
+    String cmdNo;
+    int result;
+    String msg;
+    public ExchangeStartReply(String stationCode, byte[] data) {
+        this.stationCode = stationCode;
+        exchangeNo = BytesUtil.parseString(ArrayUtils.subarray(data, 0, 32));
+        cmdNo = BytesUtil.parseString(ArrayUtils.subarray(data, 32, 64));
+        result = Byte.toUnsignedInt(data[64]);
+        msg = BytesUtil.deUnicode(ArrayUtils.subarray(data, 65, 129));
+    }
+}

+ 15 - 0
src/main/java/com/zhili/zkstationserver/dto/StationControlEvent.java

@@ -14,4 +14,19 @@ public class StationControlEvent {
     String type;
     String stationCode;
     String exchangeNo;
+    /**
+     * 指令编号
+     */
+    String cmdNo;
+    String driverId;
+    String mobile;
+    String name;
+    /**
+     * 车队名称
+     */
+    String fleetName;
+    /**
+     * 启动来源 0:司机小程序,1:后台
+     */
+    int controlSource;
 }

+ 2 - 0
src/main/java/com/zhili/zkstationserver/dto/StationMessage.java

@@ -17,6 +17,8 @@ public class StationMessage {
         public static final int CHARGER_STATE_REPLY = 104;
         public static final int EXCHANGE_EVENT = 201;
         public static final int EXCHANGE_EVENT_REPLY = 202;
+        public static final int REPLY_EXCHANGE = 301;
+        public static final int START_EXCHANGE = 302;
         public static final int SIGN_IN = 401;
         public static final int SIGN_IN_REPLY = 402;
         public static final int HEART_BEAT = 403;

+ 12 - 0
src/main/java/com/zhili/zkstationserver/handler/StationMessageHandler.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.zhili.zkstationserver.configuration.ServerConfig;
 import com.zhili.zkstationserver.dto.ChargerState;
 import com.zhili.zkstationserver.dto.ExchangeEvent;
+import com.zhili.zkstationserver.dto.ExchangeStartReply;
 import com.zhili.zkstationserver.dto.StationConnInfo;
 import com.zhili.zkstationserver.dto.StationMessage;
 import com.zhili.zkstationserver.dto.StationMessage.CommandType;
@@ -84,6 +85,7 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
 
                 stationRegisterkeys = new ArrayList();
                 stationRegisterkeys.add("StationConn:" + stationCode);
+                log.info("sign in stationCode:{}", stationCode);
                 registerSuccess = (Long) redisTemplate.execute(stationRegisterScript, stationRegisterkeys, stationConnInfo, 15);
                 StationCenter.markStation(ctx.channel(), stationCode);
                 stationMessageReply = new StationMessage();
@@ -166,6 +168,16 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
                 replyBytes[42] = (byte) 0;
                 ctx.writeAndFlush(replyBytes);
                 break;
+            case CommandType.REPLY_EXCHANGE:
+                stationCode = (String) Optional.ofNullable(ctx.channel().attr(AttributeKey.valueOf("station")).get()).orElse("");
+                if (StringUtils.isEmpty(stationCode)) {
+                    break;
+                }
+                ExchangeStartReply startSwapReply = new ExchangeStartReply(stationCode, data);
+                topic = serverConfig.getServiceCenterName() + "_ExchangeStartReply";
+                log.info("启动换电回复:{}",startSwapReply);
+                kafkaTemplate.send(topic, JSON.toJSONString(startSwapReply));
+                break;
             default:
                 break;
         }

+ 12 - 0
src/main/java/com/zhili/zkstationserver/util/BytesUtil.java

@@ -139,6 +139,18 @@ public class BytesUtil {
         return res;
     }
 
+    public static byte[] fromStringUnicode(String s, int len, byte fill) throws UnsupportedEncodingException {
+        byte[] dataBytes = s.getBytes("Unicode");
+        byte[] res = new byte[len];
+        for (int i = 0; i < dataBytes.length; i++) {
+            res[i] = dataBytes[i];
+        }
+        for (int j = dataBytes.length; j < len; j++) {
+            res[j] = fill;
+        }
+        return res;
+    }
+
     public static void setSubBytes(byte[] source, int index, byte[] target) {
         for (int i = 0; i < target.length; i++) {
             source[index + i] = target[i];