Browse Source

增加和站点服务器与微信服务器交互

TitanWong 2 years ago
parent
commit
65978d4ceb
21 changed files with 870 additions and 29 deletions
  1. 5 0
      .idea/jarRepositories.xml
  2. 1 1
      pom.xml
  3. 110 7
      src/main/java/com/zhili/zkservicecenter/component/KafkaMessageHandler.java
  4. 43 0
      src/main/java/com/zhili/zkservicecenter/component/RedisHelper.java
  5. 12 1
      src/main/java/com/zhili/zkservicecenter/component/ServiceCenter.java
  6. 54 0
      src/main/java/com/zhili/zkservicecenter/configuration/RedisConfiguration.java
  7. 3 0
      src/main/java/com/zhili/zkservicecenter/configuration/ServerConfig.java
  8. 37 0
      src/main/java/com/zhili/zkservicecenter/controller/TestController.java
  9. 15 0
      src/main/java/com/zhili/zkservicecenter/dto/DriverConnInfo.java
  10. 99 0
      src/main/java/com/zhili/zkservicecenter/dto/ExchangeEvent.java
  11. 14 0
      src/main/java/com/zhili/zkservicecenter/dto/SessionExchangeEvent.java
  12. 14 0
      src/main/java/com/zhili/zkservicecenter/dto/StationConnInfo.java
  13. 17 0
      src/main/java/com/zhili/zkservicecenter/dto/StationControlEvent.java
  14. 16 0
      src/main/java/com/zhili/zkservicecenter/dto/SubscribeExchangeEvent.java
  15. 411 0
      src/main/java/com/zhili/zkservicecenter/util/BytesUtil.java
  16. 5 10
      src/main/resources/application-dev.yml
  17. 9 0
      src/main/resources/lua/SubscribeExchange.lua
  18. 5 10
      target/classes/application-dev.yml
  19. BIN
      target/classes/com/zhili/zkservicecenter/component/KafkaMessageHandler.class
  20. BIN
      target/classes/com/zhili/zkservicecenter/component/ServiceCenter.class
  21. BIN
      target/classes/com/zhili/zkservicecenter/configuration/ServerConfig.class

+ 5 - 0
.idea/jarRepositories.xml

@@ -1,6 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="RemoteRepositoriesConfiguration">
+    <remote-repository>
+      <option name="id" value="central" />
+      <option name="name" value="Central Repository" />
+      <option name="url" value="https://repo.maven.apache.org/maven2" />
+    </remote-repository>
     <remote-repository>
       <option name="id" value="central" />
       <option name="name" value="Maven Central repository" />

+ 1 - 1
pom.xml

@@ -44,7 +44,7 @@
         </dependency>
         <!--        netty客户端依赖,向云端传数据要用-->
         <dependency>
-            <groupId>org.openlabtesting.netty</groupId>
+            <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
             <version>4.1.48.Final</version>
         </dependency>

+ 110 - 7
src/main/java/com/zhili/zkservicecenter/component/KafkaMessageHandler.java

@@ -1,16 +1,26 @@
 package com.zhili.zkservicecenter.component;
 
 
+import com.alibaba.fastjson.JSON;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.zhili.zkservicecenter.configuration.ServerConfig;
+import com.zhili.zkservicecenter.dto.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 @Component
 @Slf4j
@@ -19,17 +29,110 @@ public class KafkaMessageHandler {
     @Autowired
     ServerConfig serverConfig;
 
+    @Autowired
+    RedisTemplate redisTemplate;
+
+    @Autowired
+    RedisHelper redisHelper;
+
+    @Autowired
+    ServiceCenter serviceCenter;
+
+    @Autowired
+    KafkaTemplate kafkaTemplate;
+
+    @Resource(name = "subscribeExchangeScript")
+    RedisScript<List> subscribeScript;
+
     public KafkaMessageHandler(CuratorFramework client) {
     }
 
-    @PostConstruct
-    public void init(){
-        log.info("KafkaMessageHandler PostConstruct:" + serverConfig);
+    @KafkaListener(topics = {"#{serverConfig.getServiceName()}_ExchangeEvent"}, groupId = "#{serverConfig.getServiceName()}")
+    public void processExchangeEvent(ConsumerRecord<String, String> record, Acknowledgment ack) {
+        String value = record.value();
+        ExchangeEvent exchangeEvent = JSON.parseObject(value, ExchangeEvent.class);
+        log.info("recv exchangeEvent:" + exchangeEvent);
+        String exchangeNo = exchangeEvent.getExchangeNo();
+        String stationCode = exchangeEvent.getStationCode();
+        //注册站的当前换电号
+        //{"StationCurrentExchange_st123":"ex_abcdef"}
+        redisTemplate.opsForValue().set("StationCurrentExchange:" + stationCode, exchangeEvent, 12, TimeUnit.HOURS);
+        //换电详情缓存成list
+        redisTemplate.opsForList().rightPush("ExchangeDetail:" + exchangeNo, exchangeEvent);
+        redisTemplate.expire("ExchangeDetail:" + exchangeNo, 12, TimeUnit.HOURS);
+        //循环正在监听当前换电号的wx服务器连接
+        Set<String> serverSessions = redisHelper.scan("SessionsSubscribeExchange:" + exchangeNo + ":");
+        log.info("serverSessions:" + serverSessions);
+        for (String key : serverSessions) {
+            String[] split = key.split(":");
+            DriverConnInfo driverConnInfo = new DriverConnInfo();
+            driverConnInfo.setServerInstanceId(split[2]);
+            driverConnInfo.setSessionId(split[3]);
+            Boolean exist = serviceCenter.driverServerExist(driverConnInfo.getServerInstanceId());
+            if (exist) {
+                SessionExchangeEvent sEvent = new SessionExchangeEvent();
+                sEvent.setConnInfo(driverConnInfo);
+                sEvent.setEvent(exchangeEvent);
+                kafkaTemplate.send(driverConnInfo.getServerInstanceId(), JSON.toJSONString(sEvent));
+            }
+        }
+        ack.acknowledge();
+    }
+
+    @KafkaListener(topics = {"#{serverConfig.getServiceName()}_subscribeExchangeEvent"}, groupId = "#{serverConfig.getServiceName()}")
+    public void driverSubscribeExchangeEvent(ConsumerRecord<String, String> record, Acknowledgment ack) {
+        String value = record.value();
+        SubscribeExchangeEvent event = JSON.parseObject(value, SubscribeExchangeEvent.class);
+        switch (event.getType()) {
+            case "on":
+//                开启监听
+                log.info("DriverSubscribeExchangeEvent on: {}", event);
+                List<String> keys = new ArrayList<>();
+                keys.add("ExchangeDetail:" + event.getExchangeNo());
+                keys.add("SessionsSubscribeExchange:" + event.getExchangeNo() + ":" + event.getConnInfo().getServerInstanceId() + ":" + event.getConnInfo().getSessionId());
+//                采用Lua脚本确定原子性,1.设置监听实时消息 2.读取换电历史详情,预先推送
+                List res = (List) redisTemplate.execute(subscribeScript, keys, event.getConnInfo());
+                log.info("res:{}", res);
+                for (Object s : res) {
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    SessionExchangeEvent sEvent = new SessionExchangeEvent();
+                    ExchangeEvent exchangeEvent = objectMapper.convertValue(s, ExchangeEvent.class);
+                    sEvent.setEvent(exchangeEvent);
+                    sEvent.setConnInfo(event.getConnInfo());
+                    kafkaTemplate.send(event.getConnInfo().getServerInstanceId(), JSON.toJSONString(sEvent));
+                }
+                break;
+            case "off":
+//                关闭监听
+                redisTemplate.delete("SessionsSubscribeExchange:" + event.getExchangeNo() + ":" + event.getConnInfo().getServerInstanceId() + ":" + event.getConnInfo().getSessionId());
+                break;
+            default:
+                break;
+        }
+        ack.acknowledge();
     }
 
-    @KafkaListener(topics = {"#{serverConfig.getServiceName()}"}, groupId = "#{serverConfig.getServiceName()}")
-    public void processExchangeEvent(ConsumerRecord<String, String> record, Acknowledgment ack){
-        log.info("recv:"+record.value());
+    @KafkaListener(topics = {"#{serverConfig.getServiceName()}_stationControlEvent"}, groupId = "#{serverConfig.getServiceName()}")
+    public void driverControlStationEvent(ConsumerRecord<String, String> record, Acknowledgment ack) {
+        String value = record.value();
+        log.info("driverControlStationEvent:" + value);
+        StationControlEvent event = JSON.parseObject(value, StationControlEvent.class);
+        switch (event.getType()) {
+            case "startExchange":
+                String stationCode = event.getStationCode();
+                Object o = redisTemplate.opsForValue().get("StationConn:" + stationCode);
+                ObjectMapper objectMapper = new ObjectMapper();
+                StationConnInfo stationConnInfo = objectMapper.convertValue(o, StationConnInfo.class);
+                if (stationConnInfo == null) {
+                    log.info("stationConnInfo({}) not exist", stationCode);
+                    break;
+                }
+                event.setStationConnInfo(stationConnInfo);
+                kafkaTemplate.send(stationConnInfo.getServerInstanceId(), JSON.toJSONString(event));
+                break;
+            default:
+                break;
+        }
         ack.acknowledge();
     }
 }

+ 43 - 0
src/main/java/com/zhili/zkservicecenter/component/RedisHelper.java

@@ -0,0 +1,43 @@
+package com.zhili.zkservicecenter.component;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ScanOptions;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.stereotype.Component;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/31 14:44
+ */
+
+@Component
+@Slf4j
+public class RedisHelper {
+    @Autowired
+    RedisTemplate redisTemplate;
+
+    /**
+     * scan 实现
+     *
+     * @param prefix       表达式,如:abc,找出所有以abc开始的键
+     */
+    public Set<String> scan(String prefix) {
+        Set<String> keys = (Set<String>) redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
+            Set<String> keysTmp = new HashSet<>();
+            Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder().match(prefix+"*").count(1000).build());
+            while (cursor.hasNext()) {
+                keysTmp.add(new String(cursor.next()));
+            }
+            return keysTmp;
+        });
+        return keys;
+    }
+}

+ 12 - 1
src/main/java/com/zhili/zkservicecenter/component/ServiceCenter.java

@@ -37,7 +37,7 @@ public class ServiceCenter {
         });
     }
 
-    @Scheduled(fixedRate = 2000)
+    @Scheduled(fixedRate = 10000)
     public void printServers() {
         log.info("monitor--------");
         servers.forEach((k, m) -> {
@@ -48,4 +48,15 @@ public class ServiceCenter {
         });
         log.info("--------");
     }
+
+    public Boolean driverServerExist(String serverInstanceId){
+        try{
+            Map<String, ServerConfig> stationServers = servers.get("DriverServer");
+            return stationServers.keySet().contains(serverInstanceId);
+        }catch (Exception e){
+            e.printStackTrace();
+            return false;
+        }
+
+    }
 }

+ 54 - 0
src/main/java/com/zhili/zkservicecenter/configuration/RedisConfiguration.java

@@ -0,0 +1,54 @@
+package com.zhili.zkservicecenter.configuration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+import org.springframework.scripting.support.ResourceScriptSource;
+
+import java.util.List;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2022/9/28 10:44
+ */
+
+//redis使用方式:
+//@Autowired
+//RedisTemplate redisTemplate;
+//
+//redisTemplate.opsForValue().xxxxx
+
+@Configuration
+public class RedisConfiguration {
+    @Bean
+    public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
+        RedisTemplate redisTemplate = new RedisTemplate();
+        redisTemplate.setConnectionFactory(factory);
+        //解决key的序列化问题,字符串序列化,可读
+        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
+        redisTemplate.setKeySerializer(stringRedisSerializer);
+        //解决hashkey的序列化问题,字符串序列化,可读
+        redisTemplate.setHashKeySerializer(stringRedisSerializer);
+        //解决value的序列化问题,json序列化,可读
+        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
+        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
+        //解决hashvalue的序列化问题,json序列化,可读
+        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
+        return redisTemplate;
+    }
+
+    @Bean
+    public RedisScript<List> subscribeExchangeScript(){
+        DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript();
+        defaultRedisScript.setResultType(List.class);
+        defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/SubscribeExchange.lua")));
+        return defaultRedisScript;
+    }
+}

+ 3 - 0
src/main/java/com/zhili/zkservicecenter/configuration/ServerConfig.java

@@ -23,6 +23,8 @@ public class ServerConfig {
 
     String serviceName;
 
+    String serviceCenterName;
+
     @Value("${service-center.port:0}")
     //服务器启动端口
     Integer port;
@@ -43,6 +45,7 @@ public class ServerConfig {
     @PostConstruct
     public void init() throws UnknownHostException {
         serviceName = "ServiceCenter";
+        serviceCenterName = "ServiceCenter";
         zkRoot = "/servers";
         ipAddress = null;
         System.out.println("advertisedNetwork("+ advertisedNetwork.length()+"):" + advertisedNetwork);

+ 37 - 0
src/main/java/com/zhili/zkservicecenter/controller/TestController.java

@@ -0,0 +1,37 @@
+package com.zhili.zkservicecenter.controller;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.RedisScript;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/2/1 18:59
+ */
+@RestController
+@Slf4j
+public class TestController {
+
+    @Autowired
+    RedisTemplate redisTemplate;
+
+    @Autowired
+    RedisScript<List> redisScript;
+
+    @GetMapping("/test")
+    public Object test(){
+        List<String> keys = new ArrayList<String>();
+        keys.add("x");
+        keys.add("y");
+        List<String> a = (List) redisTemplate.execute(redisScript, keys, "a");
+        log.info("a:{}",a);
+        return a;
+    }
+}

+ 15 - 0
src/main/java/com/zhili/zkservicecenter/dto/DriverConnInfo.java

@@ -0,0 +1,15 @@
+package com.zhili.zkservicecenter.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/31 18:34
+ */
+@Data
+public class DriverConnInfo {
+    String serverInstanceId;
+    String sessionId;
+    Object userId;
+}

+ 99 - 0
src/main/java/com/zhili/zkservicecenter/dto/ExchangeEvent.java

@@ -0,0 +1,99 @@
+package com.zhili.zkservicecenter.dto;
+
+import com.zhili.zkservicecenter.util.BytesUtil;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang.ArrayUtils;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2022/12/21 18:26
+ */
+@Data
+@NoArgsConstructor
+public class ExchangeEvent {
+    String stationCode;
+    String exchangeNo;
+    String serialNo;
+    Integer seq;
+    Integer status;
+    String plate;
+    String vin;
+    String model;
+    String sourceSn;
+    Float sourceRatedEnergy;
+    Float sourceSoc;
+    Float sourceSoh;
+    Float sourceVol;
+    Float sourceCur;
+    Integer sourceTotalMiletage;
+    Integer targetIndex;
+    String targetSn;
+    Float targetRatedEnergy;
+    Float targetSoc;
+    Float targetSoh;
+    Float frontDistance;
+    Float batteryHeight;
+    String timeStr;
+    Integer step;
+    Integer startSource;
+    Date time;
+
+    public class ExchangeSeq {
+        public static final int PLATE_RECOGNIZE = 0;
+        public static final int VEH_CONNECT = 1;
+        public static final int CHOOSE_TARGET_BATTERY = 2;
+        public static final int VEH_INFO_VERIFY = 3;
+        public static final int VEH_MODEL_VERIFY = 4;
+        public static final int READ_SOURCE_BATTERY = 5;
+        public static final int SOURCE_BATTERY_UNLOCK = 6;
+        public static final int TARGET_BATTERY_READY = 7;
+        public static final int SERVO_VERIFY = 8;
+        public static final int ROBOT_READY = 9;
+
+        public static final int EXCHANGE_BEGIN = 10;
+        public static final int ROBOT_ONGOING = 11;
+        public static final int EXCHANGE_END = 12;
+    }
+
+    public ExchangeEvent(String stationCode, byte[] data) {
+        this.stationCode = stationCode;
+        exchangeNo = BytesUtil.parseString(ArrayUtils.subarray(data, 0, 32));
+        serialNo = BytesUtil.parseString(ArrayUtils.subarray(data, 32, 64));
+        seq = Byte.toUnsignedInt(data[64]);
+        status = Byte.toUnsignedInt(data[65]);
+        plate = BytesUtil.parseStringFromGb2312Bytes(ArrayUtils.subarray(data, 66, 86));
+        vin = BytesUtil.parseString(ArrayUtils.subarray(data, 86, 103));
+        model = BytesUtil.parseString(ArrayUtils.subarray(data, 103, 119));
+        sourceSn = BytesUtil.parseString(ArrayUtils.subarray(data, 119, 146));
+        sourceRatedEnergy = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 146, 148)) * 0.1f;
+        sourceSoc = Byte.toUnsignedInt(data[148]) * 0.4f;
+        sourceSoh = Byte.toUnsignedInt(data[149]) * 0.4f;
+        sourceVol = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 150, 152)) * 0.1f;
+        sourceCur = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 152, 154)) * 0.1f - 1000f;
+        sourceTotalMiletage = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 154, 158));
+        targetIndex = Byte.toUnsignedInt(data[158]);
+        targetSn = BytesUtil.parseString(ArrayUtils.subarray(data, 159, 186));
+        targetRatedEnergy = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 186, 188)) * 0.1f;
+        targetSoc = Byte.toUnsignedInt(data[188]) * 0.4f;
+        targetSoh = Byte.toUnsignedInt(data[189]) * 0.4f;
+        frontDistance = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 190, 194)) * 0.0001f;
+        batteryHeight = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 194, 198)) * 0.0001f;
+        timeStr = BytesUtil.composeTimeString(ArrayUtils.subarray(data, 198, 206));
+        step = Byte.toUnsignedInt(data[206]);
+        startSource = Byte.toUnsignedInt(data[207]);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date timeD = null;
+        try {
+            timeD = sdf.parse(timeStr);
+        } catch (ParseException e) {
+            e.printStackTrace();
+        }
+        time = timeD;
+    }
+}

+ 14 - 0
src/main/java/com/zhili/zkservicecenter/dto/SessionExchangeEvent.java

@@ -0,0 +1,14 @@
+package com.zhili.zkservicecenter.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/31 21:47
+ */
+@Data
+public class SessionExchangeEvent {
+    DriverConnInfo connInfo;
+    ExchangeEvent event;
+}

+ 14 - 0
src/main/java/com/zhili/zkservicecenter/dto/StationConnInfo.java

@@ -0,0 +1,14 @@
+package com.zhili.zkservicecenter.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/29 21:10
+ */
+@Data
+public class StationConnInfo {
+    String serverInstanceId;
+    String connIdStr;
+}

+ 17 - 0
src/main/java/com/zhili/zkservicecenter/dto/StationControlEvent.java

@@ -0,0 +1,17 @@
+package com.zhili.zkservicecenter.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/2/2 15:10
+ */
+@Data
+public class StationControlEvent {
+    DriverConnInfo connInfo;
+    StationConnInfo stationConnInfo;
+    String type;
+    String stationCode;
+    String exchangeNo;
+}

+ 16 - 0
src/main/java/com/zhili/zkservicecenter/dto/SubscribeExchangeEvent.java

@@ -0,0 +1,16 @@
+package com.zhili.zkservicecenter.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/31 22:43
+ */
+@Data
+public class SubscribeExchangeEvent {
+    DriverConnInfo connInfo;
+    String ExchangeNo;
+    // on / off
+    String type;
+}

+ 411 - 0
src/main/java/com/zhili/zkservicecenter/util/BytesUtil.java

@@ -0,0 +1,411 @@
+package com.zhili.zkservicecenter.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.UnsupportedEncodingException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2022/10/18 14:00
+ */
+public class BytesUtil {
+
+    public static final int BIT = 8;
+
+    public static byte[] fromIntWithHigherFirst(int intVal, int bytesLen) {
+        byte[] bytes5 = new byte[bytesLen];
+        while (bytesLen > 0) {
+            bytesLen--;
+            bytes5[bytesLen] = (byte) (intVal >> 8 * (bytes5.length - bytesLen - 1) & 0xFF);
+        }
+        return bytes5;
+    }
+
+    public static byte[] fromIntWithLowerFirst(int intVal, int bytesLen) {
+        byte[] bytes5 = new byte[bytesLen];
+        while (bytesLen > 0) {
+            bytesLen--;
+            bytes5[bytes5.length - bytesLen - 1] = (byte) (intVal >> 8 * (bytes5.length - bytesLen - 1) & 0xFF);
+        }
+        return bytes5;
+    }
+
+    public static int toIntWithLowerFirst(byte[] bytes) {
+        int length = bytes.length;
+        int res = 0;
+        for (int i = 0; i < length; i++) {
+            res = res * 256;
+            res += Byte.toUnsignedInt(bytes[length - i - 1]);
+        }
+        return res;
+    }
+
+    public static byte sumCheck(byte[] b) {
+        int sum = 0;
+        for (int i = 0; i < b.length; i++) {
+            sum = sum + b[i];
+            sum = 0xff & sum;
+        }
+        return (byte) sum;
+    }
+
+    public static String parseStringFromGb2312Bytes(byte[] bytes){
+        try{
+            String gb2312Str = new String(bytes, "gb2312");
+            return gb2312Str.substring(0,gb2312Str.indexOf((char)0));
+        }catch (Exception e){
+            return "";
+        }
+    }
+
+    public static String parseString(byte[] b) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < b.length; i++) {
+            sb.append((char) b[i]);
+        }
+        String rawString = sb.toString();
+        int k = rawString.indexOf(0);
+        return rawString.substring(0, k);
+    }
+
+    public static byte[] fromString(String s, int len, byte fill) {
+        byte[] res = new byte[len];
+        for (int i = 0; i < s.length(); i++) {
+            byte b = (byte) s.charAt(i);
+            res[i] = b;
+        }
+        for (int j = s.length(); j < len; j++) {
+            res[j] = fill;
+        }
+        return res;
+    }
+
+    public static void setSubBytes(byte[] source, int index, byte[] target) {
+        for (int i = 0; i < target.length; i++) {
+            source[index + i] = target[i];
+        }
+    }
+
+    public static byte[] composeNowTimeBytes() {
+        LocalDateTime now = LocalDateTime.now();
+        String timeStr = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(now);
+        byte[] timeBytes = new byte[8];
+        for (int i = 0; i < 7; i++) {
+            String partStr = timeStr.substring(2 * i, 2 * (i + 1));
+            timeBytes[i] = BytesUtil.fromIntWithLowerFirst(Integer.valueOf(partStr, 16), 1)[0];
+        }
+        timeBytes[7] = (byte) 0xff;
+        return timeBytes;
+    }
+
+    public static byte[] crc(byte[] bytes) {
+        String crcStr = getCRCStr(bytes);
+        String[] ss = crcStr.split(" ");
+        byte[] crc = new byte[ss.length];
+        for (int i = 0; i < ss.length; i++) {
+            crc[i] = (byte) Integer.valueOf(ss[i], 16).intValue();
+        }
+        return crc;
+    }
+
+    public static String getCRCStr(byte[] bytes) {
+        //CRC寄存器全为1
+        int CRC = 0x0000ffff;
+        //多项式校验值
+        int POLYNOMIAL = 0x0000a001;
+        int i, j;
+        for (i = 0; i < bytes.length; i++) {
+            CRC ^= ((int) bytes[i] & 0x000000ff);
+            for (j = 0; j < 8; j++) {
+                if ((CRC & 0x00000001) != 0) {
+                    CRC >>= 1;
+                    CRC ^= POLYNOMIAL;
+                } else {
+                    CRC >>= 1;
+                }
+            }
+        }
+        //结果转换为16进制
+        String result = Integer.toHexString(CRC).toUpperCase();
+        if (result.length() != 4) {
+            StringBuffer sb = new StringBuffer("0000");
+            result = sb.replace(4 - result.length(), 4, result).toString();
+        }
+        //交换高低位
+        return result.substring(2, 4) + " " + result.substring(0, 2);
+    }
+
+    /**
+     * led data封装
+     *
+     * @param str
+     * @return
+     * @throws UnsupportedEncodingException
+     */
+    public static byte[] getDataByte(String str, int type) throws UnsupportedEncodingException {
+        System.out.println(str);
+        byte[] dataBytes = str.getBytes(type == 1 ? "gb2312" : "Unicode");
+        dataBytes = removeTheElement(dataBytes, 0);
+        dataBytes = removeTheElement(dataBytes, 0);
+        //System.out.println(bytesToHexString(dataBytes));
+        int l = 8 + dataBytes.length;
+        if (type == 1) {
+            l = 8 + dataBytes.length * 2;
+        }
+        byte[] data = new byte[l];
+        BytesUtil.setSubBytes(data, 0, BytesUtil.fromIntWithLowerFirst(1, 2));
+        data[2] = (byte) type;
+        data[3] = (byte) 2;
+        data[4] = (byte) 0;
+        data[5] = (byte) 1;
+        BytesUtil.setSubBytes(data, 6, BytesUtil.fromIntWithLowerFirst(dataBytes.length, 2));
+        if (type == 1) {
+            BytesUtil.setSubBytes(data, 6, BytesUtil.fromIntWithLowerFirst(dataBytes.length * 2, 2));
+        }
+        BytesUtil.setSubBytes(data, 8, dataBytes);
+        return data;
+    }
+
+    public static byte[] removeTheElement(byte[] arr, int index) {
+        if (arr == null || index < 0 || index >= arr.length) {
+            return arr;
+        }
+        byte[] anotherArray = new byte[arr.length - 1];
+        for (int i = 0, k = 0; i < arr.length; i++) {
+            if (i == index) {
+                continue;
+            }
+            anotherArray[k++] = arr[i];
+        }
+        return anotherArray;
+    }
+
+    //byte数组转String
+    public static String bytesToHexString(byte[] bArray) {
+        StringBuffer sb = new StringBuffer(bArray.length);
+        String sTemp;
+        for (int i = 0; i < bArray.length; i++) {
+            sTemp = Integer.toHexString(0xFF & bArray[i]);
+            if (sTemp.length() < 2)
+                sb.append(0);
+            sb.append(sTemp.toUpperCase());
+        }
+        int length = sb.length();
+        if (length == 1 || length == 0) {
+            return sb.toString();
+        }
+        if (length % 2 == 1) {
+            sb.insert(length - 1, " ");
+            length = length - 1;
+        }
+        for (int i = length; i > 0; i = i - 2) {
+            sb.insert(i, " ");
+        }
+        return sb.toString();
+    }
+
+    /**
+     * description: 字符串转 Unicode --- 原生方法
+     *
+     * @param str
+     * @return String
+     * @version v1.0
+     * @author w
+     * @date 2021年4月21日 上午10:18:25
+     */
+    public static String stringToUnicode(String str) {
+        StringBuffer sb = new StringBuffer();
+        char[] c = str.toCharArray();
+        for (int i = 0; i < c.length; i++) {
+            sb.append("\\u" + Integer.toHexString(c[i]));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * description: Unicode 转字符串  --- 原生方法
+     *
+     * @param unicode
+     * @return String
+     * @version v1.0
+     * @author w
+     * @date 2021年4月21日 上午10:18:57
+     */
+    public static String unicodeToString(String unicode) {
+        StringBuffer sb = new StringBuffer();
+        String[] hex = unicode.split("\\\\u");
+        for (int i = 1; i < hex.length; i++) {
+            int index = Integer.parseInt(hex[i], 16);
+            sb.append((char) index);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * @param data1
+     * @param data2
+     * @return data1 与 data2拼接的结果
+     */
+    public static byte[] addBytes(byte[] data1, byte[] data2) {
+        byte[] data3 = new byte[data1.length + data2.length];
+        System.arraycopy(data1, 0, data3, 0, data1.length);
+        System.arraycopy(data2, 0, data3, data1.length, data2.length);
+        return data3;
+    }
+
+    /**
+     * @param data1
+     * @param data2
+     * @return data1 与 data2拼接的结果
+     */
+    public static byte[] addByte(byte[] data1, byte data2) {
+        byte[] datax = {data2};
+        byte[] data3 = new byte[data1.length + datax.length];
+        System.arraycopy(data1, 0, data3, 0, data1.length);
+        System.arraycopy(datax, 0, data3, data1.length, datax.length);
+        return data3;
+    }
+
+    /**
+     * 返回8个字节日期时间16进制
+     */
+    public static byte[] getBytesTime() {
+        Calendar cal = Calendar.getInstance();
+        // 当前年
+        int year = cal.get(Calendar.YEAR);
+        // 当前月
+        int month = cal.get(Calendar.MONTH) + 1;
+        // 当前日
+        int day = cal.get(Calendar.DATE);
+        // 当前小时
+        int hour = cal.get(Calendar.HOUR_OF_DAY);
+        // 当前分钟
+        int minute = cal.get(Calendar.MINUTE);
+        // 当前秒
+        int second = cal.get(Calendar.SECOND);
+        int yearOne = Integer.valueOf(StringUtils.substring(year + "", 0, 2));
+        int yearTwo = Integer.valueOf(StringUtils.substring(year + "", 2, 4));
+        byte[] time = new byte[8];
+        time[0] = (byte) yearOne;
+        time[1] = (byte) yearTwo;
+        time[2] = (byte) month;
+        time[3] = (byte) day;
+        time[4] = (byte) hour;
+        time[5] = (byte) minute;
+        time[6] = (byte) second;
+        time[7] = (byte) 0;
+        return time;
+    }
+
+    public static String composeTimeString(byte[] timeBytes) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < timeBytes.length - 1; i++) {
+            byte b = timeBytes[i];
+            String partStr = Integer.toHexString(Byte.toUnsignedInt(b));
+            if (partStr.length() == 1) {
+                partStr = "0" + partStr;
+            }
+            sb.append(partStr);
+//            2015-02-13 12:00:12
+            if (i < 1 || i >= 6) {
+
+            } else if (i < 3) {
+                sb.append("-");
+            } else if (i < 4) {
+                sb.append(" ");
+            } else if (i < 6) {
+                sb.append(":");
+            }
+        }
+        return sb.toString();
+    }
+
+    public static List<Integer> getFault(byte[] alarmData) {
+        int count = alarmData.length * BIT;
+        List<Integer> faultList = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            int bitSkip = i / BIT;
+            int bitIndex = i % BIT;
+            byte alarmDatum = alarmData[bitSkip];
+            byte success = (byte) 1;
+            success <<= bitIndex;
+            byte res = (byte) (alarmDatum & success);
+            if (Byte.valueOf(res).equals(success)) {
+                faultList.add(i);
+            }
+        }
+        return faultList;
+    }
+
+    public static boolean checkSum(byte[] data) {
+        byte sum = 0;
+        for (int i = 1; i < data.length; i++) {
+            if (i == 1) {
+                sum = data[i];
+                continue;
+            }
+            sum = (byte) (sum ^ data[i]);
+        }
+        byte datum = data[0];
+        return sum == datum;
+    }
+
+    public static boolean sensorCheckSum(byte[] data){
+        byte sum=0;
+        for (int i=0;i<data.length;i++){
+            if (i == 0) {
+                sum = data[i];
+            }else{
+                sum += data[i];
+            }
+        }
+        return sum==0;
+    }
+
+    public static String toSnString(byte[] data) {
+        StringBuilder sb = new StringBuilder();
+        for (byte datum : data) {
+            char sn = (char) datum;
+            sb.append(sn);
+        }
+        return sb.toString();
+    }
+
+    public static boolean bytesEqual(byte[] bytes1, byte[] bytes2) {
+        boolean res = true;
+        for (int i = 0; i < bytes1.length; i++) {
+            if (bytes1[i] != bytes2[i]) {
+                res = false;
+                break;
+            }
+        }
+        return res;
+    }
+
+    public static int parseBits(byte data, int start, int end) {
+        byte mask = 0;
+        byte one = 1;
+        for (int i = 0; i < 8; i++) {
+            if (i >= start && i < end) {
+                mask += (byte) (one << i);
+            }
+        }
+//        System.out.println("mask:" + mask);
+        byte byteRes = (byte) (((mask & data) & 0xff) >> start);
+        return Byte.toUnsignedInt(byteRes);
+    }
+
+    public static String getSn(byte[] data, int len) {
+        String sn = "";
+        for (int i = 8; i <= len; i++) {
+            sn = sn + data[i];
+        }
+        return sn;
+    }
+}

+ 5 - 10
src/main/resources/application-dev.yml

@@ -2,21 +2,21 @@ server:
   port: 8900
 
 zookeeper:
-  server: 192.168.10.25:2181
+  server: 127.0.0.1:2181
 
 spring:
   datasource:
-    url: jdbc:mysql://192.168.10.25:3306/station_control_cloud?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
+    url: jdbc:mysql://127.0.0.1:3306/station_control_cloud?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
     username: root
     #    password: 'jek123'
     password: '123456'
     driverClassName: com.mysql.cj.jdbc.Driver
   redis:
-    host: 192.168.10.25
+    host: 127.0.0.1
     port: 6379
     #password: jek123
   kafka:
-    bootstrap-servers: 192.168.10.25:9092
+    bootstrap-servers: 192.168.0.89:9092
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.apache.kafka.common.serialization.StringSerializer
@@ -26,11 +26,6 @@ spring:
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       max-poll-records: 50
-      properties:
-        spring:
-          json:
-            trusted:
-              packages: com.zhili.stationcontrol.dto.station
     listener:
       ack-mode: MANUAL_IMMEDIATE
       missing-topics-fatal: false
@@ -56,4 +51,4 @@ wx:
     msgDataFormat: JSON
 
 service-center:
-  advertised-network: 192.168.10.*
+  advertised-network: 192.168.0.*

+ 9 - 0
src/main/resources/lua/SubscribeExchange.lua

@@ -0,0 +1,9 @@
+--从这个里面拿出来历史详情
+local detailKey = KEYS[1]
+--注册为监听状态
+local subscribeKey = KEYS[2]
+
+local value = ARGV[1]
+
+redis.call("setex", subscribeKey, 1800, value)
+return redis.call("lrange", detailKey, 0, -1)

+ 5 - 10
target/classes/application-dev.yml

@@ -2,21 +2,21 @@ server:
   port: 8900
 
 zookeeper:
-  server: 192.168.10.25:2181
+  server: 127.0.0.1:2181
 
 spring:
   datasource:
-    url: jdbc:mysql://192.168.10.25:3306/station_control_cloud?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
+    url: jdbc:mysql://127.0.0.1:3306/station_control_cloud?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
     username: root
     #    password: 'jek123'
     password: '123456'
     driverClassName: com.mysql.cj.jdbc.Driver
   redis:
-    host: 192.168.10.25
+    host: 127.0.0.1
     port: 6379
     #password: jek123
   kafka:
-    bootstrap-servers: 192.168.10.25:9092
+    bootstrap-servers: 192.168.0.89:9092
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.apache.kafka.common.serialization.StringSerializer
@@ -26,11 +26,6 @@ spring:
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       max-poll-records: 50
-      properties:
-        spring:
-          json:
-            trusted:
-              packages: com.zhili.stationcontrol.dto.station
     listener:
       ack-mode: MANUAL_IMMEDIATE
       missing-topics-fatal: false
@@ -56,4 +51,4 @@ wx:
     msgDataFormat: JSON
 
 service-center:
-  advertised-network: 192.168.10.*
+  advertised-network: 192.168.0.*

BIN
target/classes/com/zhili/zkservicecenter/component/KafkaMessageHandler.class


BIN
target/classes/com/zhili/zkservicecenter/component/ServiceCenter.class


BIN
target/classes/com/zhili/zkservicecenter/configuration/ServerConfig.class