Browse Source

增加和业务中心交互

TitanWong 2 years ago
parent
commit
cd32109614

+ 12 - 1
src/main/java/com/zhili/zkstationserver/component/KafkaMessageHandler.java

@@ -1,6 +1,10 @@
 package com.zhili.zkstationserver.component;
 
+import com.alibaba.fastjson.JSON;
 import com.zhili.zkstationserver.configuration.ServerConfig;
+import com.zhili.zkstationserver.dto.StationControlEvent;
+import com.zhili.zkstationserver.dto.StationMessage;
+import com.zhili.zkstationserver.util.StationCenter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -28,9 +32,16 @@ public class KafkaMessageHandler {
         log.info("KafkaMessageHandler PostConstruct:" + serverConfig);
     }
 
-    @KafkaListener(topics = "#{serverConfig.getTopics()}",groupId = "#{serverConfig.getServiceName()}")
+    @KafkaListener(topics = {"#{serverConfig.getInstanceId()}"}, groupId = "#{serverConfig.getServiceName()}")
     public void processExchangeEvent(ConsumerRecord<String, String> record, Acknowledgment ack){
         log.info("recv:"+record.value());
+        StationControlEvent stationControlEvent = JSON.parseObject(record.value(), StationControlEvent.class);
+        if(stationControlEvent.getType().equals("startExchange")){
+            StationMessage stationMessage = new StationMessage();
+            stationMessage.setCommand(504);
+            stationMessage.setData(new byte[2]);
+            StationCenter.sendToChannel(stationControlEvent.getStationConnInfo().getConnIdStr(),stationMessage);
+        }
         ack.acknowledge();
     }
 }

+ 11 - 0
src/main/java/com/zhili/zkstationserver/configuration/RedisConfiguration.java

@@ -2,10 +2,14 @@ package com.zhili.zkstationserver.configuration;
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.ClassPathResource;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
+import org.springframework.scripting.support.ResourceScriptSource;
 
 /**
  * @author :HuangBin
@@ -37,4 +41,11 @@ public class RedisConfiguration {
         redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
         return redisTemplate;
     }
+    @Bean
+    public RedisScript<Long> stationRegisterScript(){
+        DefaultRedisScript defaultRedisScript = new DefaultRedisScript();
+        defaultRedisScript.setResultType(Long.class);
+        defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/StationRegister.lua")));
+        return defaultRedisScript;
+    }
 }

+ 5 - 0
src/main/java/com/zhili/zkstationserver/configuration/RestTemplateConfig.java

@@ -3,6 +3,10 @@ package com.zhili.zkstationserver.configuration;
 import org.springframework.boot.web.client.RestTemplateBuilder;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
+import org.springframework.scripting.support.ResourceScriptSource;
 import org.springframework.web.client.RestTemplate;
 
 /**
@@ -17,4 +21,5 @@ public class RestTemplateConfig {
     public RestTemplate restTemplate(RestTemplateBuilder builder){
         return builder.build();
     }
+
 }

+ 15 - 0
src/main/java/com/zhili/zkstationserver/dto/DriverConnInfo.java

@@ -0,0 +1,15 @@
+package com.zhili.zkstationserver.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/31 18:34
+ */
+@Data
+public class DriverConnInfo {
+    String serverInstanceId;
+    String sessionId;
+    Object userId;
+}

+ 17 - 0
src/main/java/com/zhili/zkstationserver/dto/StationControlEvent.java

@@ -0,0 +1,17 @@
+package com.zhili.zkstationserver.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/2/2 15:10
+ */
+@Data
+public class StationControlEvent {
+    DriverConnInfo connInfo;
+    StationConnInfo stationConnInfo;
+    String type;
+    String stationCode;
+    String exchangeNo;
+}

+ 47 - 18
src/main/java/com/zhili/zkstationserver/handler/StationMessageHandler.java

@@ -14,15 +14,16 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.AttributeKey;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.core.script.RedisScript;
 import org.springframework.kafka.core.KafkaTemplate;
 
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 /**
  * @author :HuangBin
@@ -31,10 +32,10 @@ import java.util.concurrent.TimeUnit;
  */
 @Slf4j
 public class StationMessageHandler extends ChannelInboundHandlerAdapter {
-    KafkaTemplate kafkaTemplate = (KafkaTemplate)ApplicationContextUtils.getApplicationContext().getBean("kafkaTemplate");
+    KafkaTemplate kafkaTemplate = (KafkaTemplate) ApplicationContextUtils.getApplicationContext().getBean("kafkaTemplate");
     ServerConfig serverConfig = ApplicationContextUtils.getApplicationContext().getBean(ServerConfig.class);
     RedisTemplate redisTemplate = (RedisTemplate) ApplicationContextUtils.getApplicationContext().getBean("redisTemplate");
-
+    RedisScript stationRegisterScript = (DefaultRedisScript) ApplicationContextUtils.getApplicationContext().getBean("stationRegisterScript");
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -47,6 +48,12 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         log.info("channel disconnected: " + ctx.channel().remoteAddress());
         super.channelInactive(ctx);
+        //断连后及时删除站的注册信息
+        Object stationCodeObj = ctx.channel().attr(AttributeKey.valueOf("station")).get();
+        if (stationCodeObj == null || StringUtils.isBlank(stationCodeObj.toString())) {
+            return;
+        }
+        redisTemplate.delete("StationConn:" + stationCodeObj);
     }
 
     @Override
@@ -63,24 +70,38 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
         byte[] replyBytes;
         String topic;
         StationConnInfo stationConnInfo;
+        List<String> stationRegisterkeys;
+        Long registerSuccess;
         switch (command) {
             case CommandType.SIGN_IN:
                 //签入命令,将连接打上站的标记, 并计入redis, 代表此站在这台服务器上{"stationConn:st001": "serverId/channelId"}, 12s超时
                 byte[] stationCodeBytes = ArrayUtils.subarray(data, 0, 8);
                 stationCode = BytesUtil.parseString(stationCodeBytes);
                 log.info("SIGN_IN:" + stationCode);
+                stationConnInfo = new StationConnInfo();
+                stationConnInfo.setServerInstanceId(serverConfig.getInstanceId());
+                stationConnInfo.setConnIdStr(StationCenter.getChannelIdStr(ctx.channel()));
+
+                stationRegisterkeys = new ArrayList();
+                stationRegisterkeys.add("StationConn:" + stationCode);
+                registerSuccess = (Long) redisTemplate.execute(stationRegisterScript, stationRegisterkeys, stationConnInfo, 15);
                 StationCenter.markStation(ctx.channel(), stationCode);
                 stationMessageReply = new StationMessage();
                 stationMessageReply.setCommand(CommandType.SIGN_IN_REPLY);
                 replyBytes = new byte[9];
                 BytesUtil.setSubBytes(replyBytes, 0, stationCodeBytes);
-                replyBytes[8] = (byte) 0;
-                stationMessageReply.setData(replyBytes);
-                ctx.channel().writeAndFlush(stationMessageReply);
-                stationConnInfo = new StationConnInfo();
-                stationConnInfo.setServerInstanceId(serverConfig.getInstanceId());
-                stationConnInfo.setConnIdStr(StationCenter.getChannelIdStr(ctx.channel()));
-                redisTemplate.opsForValue().set("stationConn:" + stationCode, stationConnInfo, 12, TimeUnit.SECONDS);
+                log.info("sign in result:{}", registerSuccess);
+                if (registerSuccess.equals(0l)) {
+                    replyBytes[8] = (byte) 0;
+                    stationMessageReply.setData(replyBytes);
+                    ctx.channel().writeAndFlush(stationMessageReply);
+                } else {
+                    //注册失败,已经有这个站了
+                    replyBytes[8] = (byte) 1;
+                    stationMessageReply.setData(replyBytes);
+                    ctx.channel().writeAndFlush(stationMessageReply);
+                    ctx.channel().close();
+                }
                 break;
             case CommandType.HEART_BEAT:
                 //心跳命令,计入redis,代表此站在这台服务器上{"stationConn:st001": "serverId/channelId"},12s超时
@@ -91,10 +112,18 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
                 stationConnInfo = new StationConnInfo();
                 stationConnInfo.setServerInstanceId(serverConfig.getInstanceId());
                 stationConnInfo.setConnIdStr(StationCenter.getChannelIdStr(ctx.channel()));
-                redisTemplate.opsForValue().set("stationConn:" + stationCode, stationConnInfo, 12, TimeUnit.SECONDS);
-                StationConnInfo stationConnInfo1 = new StationConnInfo();
-                BeanUtils.populate(stationConnInfo1, (Map)redisTemplate.opsForValue().get("stationConn:" + stationCode));
-                log.info("get from redis: {}",stationConnInfo1);
+
+                stationRegisterkeys = new ArrayList();
+                stationRegisterkeys.add("StationConn:" + stationCode);
+                registerSuccess = (Long) redisTemplate.execute(stationRegisterScript, stationRegisterkeys, stationConnInfo, 15);
+//                stationMessageReply = new StationMessage();
+                if (registerSuccess.equals(0l)) {
+                    //成功
+                } else {
+                    //注册失败,有可能已经有站注册了,一个站只能有一个连接,关闭
+                    log.info("心跳失败,stationCode:{}", stationCode);
+                    ctx.channel().close();
+                }
                 break;
             case CommandType.EXCHANGE_EVENT:
                 stationCode = (String) Optional.ofNullable(ctx.channel().attr(AttributeKey.valueOf("station")).get()).orElse("");
@@ -105,8 +134,8 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
                 //为了削峰,将对象构建后,放到kafka中,由业务中心处理。
                 ExchangeEvent exchangeEvent = new ExchangeEvent(stationCode, data);
                 log.info("exchangeEvent:" + exchangeEvent);
-                topic = serverConfig.getServiceCenterName()+"_ExchangeEvent";
-                log.info("send to {}",topic);
+                topic = serverConfig.getServiceCenterName() + "_ExchangeEvent";
+                log.info("send to {}", topic);
                 kafkaTemplate.send(topic, JSON.toJSONString(exchangeEvent));
                 break;
             case CommandType.CHARGER_STATE:

+ 13 - 0
src/main/resources/lua/StationRegister.lua

@@ -0,0 +1,13 @@
+local stationKey = KEYS[1]
+local newValue = ARGV[1]
+local ttl = tonumber(ARGV[2])
+
+local oldValue = redis.call("get", stationKey)
+
+if not(oldValue) or newValue == oldValue then
+    --如果没有注册或者和目前的连接值相等
+    redis.call("setex", stationKey, ttl, newValue);
+    return 0
+else
+    return 1
+end