Ver Fonte

添加消息处理

TitanWong há 2 anos atrás
pai
commit
9e92610871

+ 2 - 8
Dockerfile

@@ -1,13 +1,7 @@
-#FROM nas.fast-fun.cn:5000/common/java:8-jre-alpine
-#FROM java:8-jre-alpine
-#FROM openjdk:8-jre-alpine
-FROM 192.168.10.122:5000/common/java:8-jre-alpine
-
+FROM nas.fast-fun.cn:5000/common/java:8-jre-alpine
 
 ENV OP_ENV=prod
 
-ENV STATION_SERVER_ID=station_server_1
-
 COPY ./target/zk-station-server-1.0-SNAPSHOT.jar /home/App.jar
 
-CMD ["java", "-jar", "/home/App.jar", "--spring.profiles.active=${OP_ENV}", "--station_server.id=${STATION_SERVER_ID}"]
+CMD ["java", "-jar", "/home/App.jar", "--spring.profiles.active=${OP_ENV}"]

+ 1 - 1
pom.xml

@@ -45,7 +45,7 @@
         </dependency>
         <!--        netty客户端依赖,向云端传数据要用-->
         <dependency>
-            <groupId>org.openlabtesting.netty</groupId>
+            <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
             <version>4.1.48.Final</version>
         </dependency>

+ 1 - 1
src/main/java/com/zhili/zkstationserver/MainApp.java

@@ -19,7 +19,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 @MapperScan("com.zhili.zkstationserver.mapper")
 @Slf4j
 public class MainApp {
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         ApplicationContext ctx = SpringApplication.run(MainApp.class, args);
         StationServer stationServer = ctx.getBean(StationServer.class);
         stationServer.run();

+ 3 - 7
src/main/java/com/zhili/zkstationserver/component/StationServer.java

@@ -1,11 +1,9 @@
 package com.zhili.zkstationserver.component;
 
 import com.zhili.zkstationserver.configuration.ServerConfig;
-import com.zhili.zkstationserver.dto.ServerInfo;
 import com.zhili.zkstationserver.handler.StationMessageDecoder;
 import com.zhili.zkstationserver.handler.StationMessageEncoder;
 import com.zhili.zkstationserver.handler.StationMessageHandler;
-import com.zhili.zkstationserver.util.StationCenter;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -20,9 +18,6 @@ import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.stereotype.Component;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * @author :HuangBin
  * @description:TODO
@@ -36,8 +31,8 @@ public class StationServer {
     ServerConfig serverConfig;
 
     @Async
-    public void run(){
-        log.info("serverConfig:"+serverConfig);
+    public void run() {
+        log.info("serverConfig:" + serverConfig);
         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
         EventLoopGroup workerGroup = new NioEventLoopGroup();
         try {
@@ -57,6 +52,7 @@ public class StationServer {
                         }
                     });
             ChannelFuture channelFuture = server.bind(serverConfig.getPort()).sync();
+            log.info("station server running({})", serverConfig.getPort());
             channelFuture.channel().closeFuture().sync();
         } catch (InterruptedException e) {
             e.printStackTrace();

+ 1 - 1
src/main/java/com/zhili/zkstationserver/configuration/RedisConfiguration.java

@@ -22,7 +22,7 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
 @Configuration
 public class RedisConfiguration {
     @Bean
-    public RedisTemplate redisTemplate(RedisConnectionFactory factory){
+    public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
         RedisTemplate redisTemplate = new RedisTemplate();
         redisTemplate.setConnectionFactory(factory);
         //解决key的序列化问题,字符串序列化,可读

+ 3 - 0
src/main/java/com/zhili/zkstationserver/configuration/ServerConfig.java

@@ -23,6 +23,8 @@ public class ServerConfig {
 
     String serviceName;
 
+    String serviceCenterName;
+
     @Value("${station-server.port}")
     //服务器启动端口
     Integer port;
@@ -43,6 +45,7 @@ public class ServerConfig {
     @PostConstruct
     public void init() throws UnknownHostException {
         serviceName = "StationServer";
+        serviceCenterName = "ServiceCenter";
         zkRoot = "/servers";
         ipAddress = null;
         System.out.println("advertisedNetwork("+ advertisedNetwork.length()+"):" + advertisedNetwork);

+ 40 - 35
src/main/java/com/zhili/zkstationserver/controller/StationServierInfoController.java

@@ -1,37 +1,42 @@
-//package com.zhili.zkstationserver.controller;
-//
-//import com.zhili.zkstationserver.component.StationServer;
-//import com.zhili.zkstationserver.configuration.ServerConfig;
-//import com.zhili.zkstationserver.controller.advice.CommonResponse;
-//import com.zhili.zkstationserver.dto.ServerInfo;
-//import com.zhili.zkstationserver.util.StationCenter;
-//import lombok.extern.slf4j.Slf4j;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.web.bind.annotation.GetMapping;
-//import org.springframework.web.bind.annotation.PathVariable;
-//import org.springframework.web.bind.annotation.RestController;
-//import org.springframework.web.client.RestClientException;
-//import org.springframework.web.client.RestTemplate;
-//
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Map;
-//import java.util.Set;
-//
-///**
-// * @author :HuangBin
-// * @description:TODO
-// * @date :2023/1/17 17:18
-// */
-//@RestController
-//@Slf4j
-//public class StationServierInfoController {
-//    @Autowired
-//    RestTemplate template;
-//
-//    @Autowired
-//    ServerConfig serverConfig;
-//
+package com.zhili.zkstationserver.controller;
+
+import com.zhili.zkstationserver.component.StationServer;
+import com.zhili.zkstationserver.configuration.ServerConfig;
+import com.zhili.zkstationserver.controller.advice.CommonResponse;
+import com.zhili.zkstationserver.dto.ServerInfo;
+import com.zhili.zkstationserver.util.StationCenter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/17 17:18
+ */
+@RestController
+@Slf4j
+public class StationServierInfoController {
+    @Autowired
+    RestTemplate template;
+
+    @Autowired
+    ServerConfig serverConfig;
+
+    @GetMapping("/test")
+    public String test(){
+        return "123";
+    }
+
 //    //获取本服务器上的所有站
 //    @GetMapping("/getServerStations")
 //    public Set<String> getServerStations() {
@@ -100,4 +105,4 @@
 //        }
 //        return res;
 //    }
-//}
+}

+ 20 - 21
src/main/java/com/zhili/zkstationserver/dto/ExchangeEvent.java

@@ -67,27 +67,26 @@ public class ExchangeEvent {
         serialNo = BytesUtil.parseString(ArrayUtils.subarray(data, 32, 64));
         seq = Byte.toUnsignedInt(data[64]);
         status = Byte.toUnsignedInt(data[65]);
-        plate = BytesUtil.parseStringFromGb2312Bytes(ArrayUtils.subarray(data, 66, 76));
-        vin = BytesUtil.parseString(ArrayUtils.subarray(data, 76, 93));
-        model = BytesUtil.parseString(ArrayUtils.subarray(data, 93, 109));
-        sourceSn = BytesUtil.parseString(ArrayUtils.subarray(data, 109, 136));
-        sourceRatedEnergy = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 136, 138)) * 0.1f;
-        sourceSoc = Byte.toUnsignedInt(data[138]) * 0.4f;
-        sourceSoh = Byte.toUnsignedInt(data[139]) * 0.4f;
-        sourceVol = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 140, 142)) * 0.1f;
-        sourceCur = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 142, 144)) * 0.1f - 1000f;
-        sourceTotalMiletage = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 144, 148));
-        targetIndex = Byte.toUnsignedInt(data[148]);
-        targetSn = BytesUtil.parseString(ArrayUtils.subarray(data, 149, 176));
-        targetRatedEnergy = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 176, 178)) * 0.1f;
-        targetSoc = Byte.toUnsignedInt(data[178]) * 0.4f;
-        targetSoh = Byte.toUnsignedInt(data[179]) * 0.4f;
-        frontDistance = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 180, 184)) * 0.0001f;
-        batteryHeight = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 184, 188)) * 0.0001f;
-        timeStr = BytesUtil.composeTimeString(ArrayUtils.subarray(data, 188, 196));
-        step = Byte.toUnsignedInt(data[196]);
-        startSource = Byte.toUnsignedInt(data[197]);
-        String timeStr = BytesUtil.composeTimeString(ArrayUtils.subarray(data, 188, 196));
+        plate = BytesUtil.parseStringFromGb2312Bytes(ArrayUtils.subarray(data, 66, 86));
+        vin = BytesUtil.parseString(ArrayUtils.subarray(data, 86, 103));
+        model = BytesUtil.parseString(ArrayUtils.subarray(data, 103, 119));
+        sourceSn = BytesUtil.parseString(ArrayUtils.subarray(data, 119, 146));
+        sourceRatedEnergy = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 146, 148)) * 0.1f;
+        sourceSoc = Byte.toUnsignedInt(data[148]) * 0.4f;
+        sourceSoh = Byte.toUnsignedInt(data[149]) * 0.4f;
+        sourceVol = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 150, 152)) * 0.1f;
+        sourceCur = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 152, 154)) * 0.1f - 1000f;
+        sourceTotalMiletage = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 154, 158));
+        targetIndex = Byte.toUnsignedInt(data[158]);
+        targetSn = BytesUtil.parseString(ArrayUtils.subarray(data, 159, 186));
+        targetRatedEnergy = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 186, 188)) * 0.1f;
+        targetSoc = Byte.toUnsignedInt(data[188]) * 0.4f;
+        targetSoh = Byte.toUnsignedInt(data[189]) * 0.4f;
+        frontDistance = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 190, 194)) * 0.0001f;
+        batteryHeight = BytesUtil.toIntWithLowerFirst(ArrayUtils.subarray(data, 194, 198)) * 0.0001f;
+        timeStr = BytesUtil.composeTimeString(ArrayUtils.subarray(data, 198, 206));
+        step = Byte.toUnsignedInt(data[206]);
+        startSource = Byte.toUnsignedInt(data[207]);
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         Date timeD = null;
         try {

+ 14 - 0
src/main/java/com/zhili/zkstationserver/dto/StationConnInfo.java

@@ -0,0 +1,14 @@
+package com.zhili.zkstationserver.dto;
+
+import lombok.Data;
+
+/**
+ * @author :HuangBin
+ * @description:TODO
+ * @date :2023/1/29 21:10
+ */
+@Data
+public class StationConnInfo {
+    String serverInstanceId;
+    String connIdStr;
+}

+ 36 - 7
src/main/java/com/zhili/zkstationserver/handler/StationMessageHandler.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.zhili.zkstationserver.configuration.ServerConfig;
 import com.zhili.zkstationserver.dto.ChargerState;
 import com.zhili.zkstationserver.dto.ExchangeEvent;
+import com.zhili.zkstationserver.dto.StationConnInfo;
 import com.zhili.zkstationserver.dto.StationMessage;
 import com.zhili.zkstationserver.dto.StationMessage.CommandType;
 import com.zhili.zkstationserver.util.ApplicationContextUtils;
@@ -13,11 +14,13 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.AttributeKey;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
@@ -28,15 +31,22 @@ import java.util.concurrent.TimeUnit;
  */
 @Slf4j
 public class StationMessageHandler extends ChannelInboundHandlerAdapter {
-    KafkaTemplate kafkaTemplate = ApplicationContextUtils.getApplicationContext().getBean(KafkaTemplate.class);
+    KafkaTemplate kafkaTemplate = (KafkaTemplate)ApplicationContextUtils.getApplicationContext().getBean("kafkaTemplate");
     ServerConfig serverConfig = ApplicationContextUtils.getApplicationContext().getBean(ServerConfig.class);
-    RedisTemplate redisTemplate = ApplicationContextUtils.getApplicationContext().getBean(RedisTemplate.class);
+    RedisTemplate redisTemplate = (RedisTemplate) ApplicationContextUtils.getApplicationContext().getBean("redisTemplate");
+
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        super.channelActive(ctx);
         log.info("channel connected: " + ctx.channel().remoteAddress());
         StationCenter.add(ctx.channel());
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        log.info("channel disconnected: " + ctx.channel().remoteAddress());
+        super.channelInactive(ctx);
     }
 
     @Override
@@ -51,6 +61,8 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
         String stationCode;
         StationMessage stationMessageReply;
         byte[] replyBytes;
+        String topic;
+        StationConnInfo stationConnInfo;
         switch (command) {
             case CommandType.SIGN_IN:
                 //签入命令,将连接打上站的标记, 并计入redis, 代表此站在这台服务器上{"stationConn:st001": "serverId/channelId"}, 12s超时
@@ -65,7 +77,10 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
                 replyBytes[8] = (byte) 0;
                 stationMessageReply.setData(replyBytes);
                 ctx.channel().writeAndFlush(stationMessageReply);
-                redisTemplate.opsForValue().set("stationConn:" + stationCode, serverConfig.getInstanceId()+"/"+StationCenter.getChannelIdStr(ctx.channel()), 12, TimeUnit.SECONDS);
+                stationConnInfo = new StationConnInfo();
+                stationConnInfo.setServerInstanceId(serverConfig.getInstanceId());
+                stationConnInfo.setConnIdStr(StationCenter.getChannelIdStr(ctx.channel()));
+                redisTemplate.opsForValue().set("stationConn:" + stationCode, stationConnInfo, 12, TimeUnit.SECONDS);
                 break;
             case CommandType.HEART_BEAT:
                 //心跳命令,计入redis,代表此站在这台服务器上{"stationConn:st001": "serverId/channelId"},12s超时
@@ -73,7 +88,13 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
                 if (StringUtils.isEmpty(stationCode)) {
                     break;
                 }
-                redisTemplate.opsForValue().set("stationConn:" + stationCode, serverConfig.getInstanceId()+"/"+StationCenter.getChannelIdStr(ctx.channel()), 12, TimeUnit.SECONDS);
+                stationConnInfo = new StationConnInfo();
+                stationConnInfo.setServerInstanceId(serverConfig.getInstanceId());
+                stationConnInfo.setConnIdStr(StationCenter.getChannelIdStr(ctx.channel()));
+                redisTemplate.opsForValue().set("stationConn:" + stationCode, stationConnInfo, 12, TimeUnit.SECONDS);
+                StationConnInfo stationConnInfo1 = new StationConnInfo();
+                BeanUtils.populate(stationConnInfo1, (Map)redisTemplate.opsForValue().get("stationConn:" + stationCode));
+                log.info("get from redis: {}",stationConnInfo1);
                 break;
             case CommandType.EXCHANGE_EVENT:
                 stationCode = (String) Optional.ofNullable(ctx.channel().attr(AttributeKey.valueOf("station")).get()).orElse("");
@@ -84,7 +105,9 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
                 //为了削峰,将对象构建后,放到kafka中,由业务中心处理。
                 ExchangeEvent exchangeEvent = new ExchangeEvent(stationCode, data);
                 log.info("exchangeEvent:" + exchangeEvent);
-                kafkaTemplate.send(serverConfig.getServiceName(), JSON.toJSONString(exchangeEvent));
+                topic = serverConfig.getServiceCenterName()+"_ExchangeEvent";
+                log.info("send to {}",topic);
+                kafkaTemplate.send(topic, JSON.toJSONString(exchangeEvent));
                 break;
             case CommandType.CHARGER_STATE:
                 stationCode = (String) Optional.ofNullable(ctx.channel().attr(AttributeKey.valueOf("station")).get()).orElse("");
@@ -94,10 +117,16 @@ public class StationMessageHandler extends ChannelInboundHandlerAdapter {
                 }
                 ChargerState chargerState = new ChargerState(stationCode, data);
                 log.info("chargerState:" + chargerState);
-                kafkaTemplate.send(serverConfig.getServiceName(), JSON.toJSONString(chargerState));
                 break;
             default:
                 break;
         }
     }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        super.exceptionCaught(ctx, cause);
+        cause.printStackTrace();
+        log.error(cause.getMessage());
+    }
 }

+ 1 - 1
src/main/java/com/zhili/zkstationserver/util/BytesUtil.java

@@ -71,7 +71,7 @@ public class BytesUtil {
         }
         String rawString = sb.toString();
         int k = rawString.indexOf(0);
-        return rawString.substring(0, k);
+        return k>=0?rawString.substring(0, k):rawString;
     }
 
     public static byte[] fromString(String s, int len, byte fill) {

+ 1 - 11
src/main/resources/application-dev.yml

@@ -12,7 +12,7 @@ spring:
     password: '123456'
     driverClassName: com.mysql.cj.jdbc.Driver
   redis:
-    host: 192.168.10.25
+    host: 127.0.0.1
     port: 6379
     #password: jek123
   kafka:
@@ -26,11 +26,6 @@ spring:
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       max-poll-records: 50
-      properties:
-        spring:
-          json:
-            trusted:
-              packages: com.zhili.stationcontrol.dto.station
     listener:
       ack-mode: MANUAL_IMMEDIATE
       missing-topics-fatal: false
@@ -49,11 +44,6 @@ mybatis-plus:
 #        log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
     log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
 
-wx:
-  miniapp:
-    appid: wx9790f2f1843d1a5d
-    secret: 5b9841eb455227d9647e5b9c046a7348
-    msgDataFormat: JSON
 
 station-server:
   port: 9987

+ 0 - 12
src/main/resources/application-prod.yml

@@ -25,11 +25,6 @@ spring:
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       max-poll-records: 50
-      properties:
-        spring:
-          json:
-            trusted:
-              packages: com.zhili.stationcontrol.dto.station
     listener:
       ack-mode: MANUAL_IMMEDIATE
       missing-topics-fatal: false
@@ -48,13 +43,6 @@ mybatis-plus:
     #        log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
     log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
 
-wx:
-  miniapp:
-    appid: wx9790f2f1843d1a5d
-    secret: 5b9841eb455227d9647e5b9c046a7348
-    msgDataFormat: JSON
 
 station-server:
-  id: s001
   port: 9987
-