Commit fef6c767 authored by 耿迪迪's avatar 耿迪迪

设备上报协议 gengdidi

parent 06947ed3
package com.zehong.gassdevicereport.netty; package com.zehong.gassdevicereport.netty;
import com.zehong.gassdevicereport.netty.handler.TCPServerHandler;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
...@@ -21,16 +22,16 @@ import javax.annotation.Resource; ...@@ -21,16 +22,16 @@ import javax.annotation.Resource;
* 物联网 开启检测 并写入数据库 * 物联网 开启检测 并写入数据库
*/ */
@Component @Component
public class NettyStart { public class NettyTCPStart {
private Logger logger = LoggerFactory.getLogger(NettyStart.class); private Logger logger = LoggerFactory.getLogger(NettyTCPStart.class);
@Resource @Resource
private ServerHandler serverHandler; private TCPServerHandler TCPServerHandler;
private EventLoopGroup bossGroup = new NioEventLoopGroup(); private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workGroup = new NioEventLoopGroup(); private EventLoopGroup workGroup = new NioEventLoopGroup();
@Value("${netty.ports}") @Value("${TCPNetty.ports}")
private int[] ports; private int[] ports;
/** /**
* 启动netty服务 * 启动netty服务
...@@ -45,14 +46,14 @@ public class NettyStart { ...@@ -45,14 +46,14 @@ public class NettyStart {
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel socketChannel) throws Exception { protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler); socketChannel.pipeline().addLast(TCPServerHandler);
} }
}); });
//开启需要监听 的端口 //开启需要监听 的端口
for(int port : ports){ for(int port : ports){
ChannelFuture future = b.bind(port).sync(); ChannelFuture future = b.bind(port).sync();
if (future.isSuccess()) { if (future.isSuccess()) {
logger.info("启动 "+ port +" 成功"); logger.info("TCP server启动 "+ port +" 成功");
} }
} }
} }
...@@ -64,6 +65,6 @@ public class NettyStart { ...@@ -64,6 +65,6 @@ public class NettyStart {
public void destroy() { public void destroy() {
bossGroup.shutdownGracefully().syncUninterruptibly(); bossGroup.shutdownGracefully().syncUninterruptibly();
workGroup.shutdownGracefully().syncUninterruptibly(); workGroup.shutdownGracefully().syncUninterruptibly();
logger.info("关闭 Netty 成功"); logger.info("TCP server关闭 Netty 成功");
} }
} }
package com.zehong.gassdevicereport.netty;
import com.zehong.gassdevicereport.netty.handler.UDPServerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
/**
* 物联网 开启检测 并写入数据库
*/
@Component
public class NettyUDPStart {
private Logger logger = LoggerFactory.getLogger(NettyUDPStart.class);
@Resource
private UDPServerHandler UDPServerHandler;
private EventLoopGroup bossGroup = new NioEventLoopGroup();
@Value("${UDPNetty.ports}")
private int[] ports;
/**
* 启动netty服务
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
Bootstrap b=new Bootstrap ();
b.group(bossGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.handler(UDPServerHandler);
//开启需要监听 的端口
for(int port : ports){
ChannelFuture future = b.bind(port).sync();
if (future.isSuccess()) {
logger.info("UDP server启动 "+ port +" 成功");
}
}
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully().syncUninterruptibly();
logger.info("UDP server关闭 Netty 成功");
}
}
...@@ -64,16 +64,16 @@ public class PressureDecryptReportedData { ...@@ -64,16 +64,16 @@ public class PressureDecryptReportedData {
private void getDeviceType(){ private void getDeviceType(){
String deviceTypeNum = reportedDataStr.substring(30,32); String deviceTypeNum = reportedDataStr.substring(30,32);
switch (deviceTypeNum){ switch (deviceTypeNum){
case "A0": case "a0":
this.deviceType = "ZH11-NB家用报警器"; this.deviceType = "ZH11-NB家用报警器";
break; break;
case "A1": case "a1":
this.deviceType = "JT-ZH85N家用报警器"; this.deviceType = "JT-ZH85N家用报警器";
break; break;
case "A2": case "a2":
this.deviceType = "燃气压力监测仪"; this.deviceType = "燃气压力监测仪";
break; break;
case "A3": case "a3":
this.deviceType = "消防水压监测仪"; this.deviceType = "消防水压监测仪";
break; break;
default: default:
......
...@@ -8,11 +8,11 @@ public class FlowmeterDecryReportedDataStrategy { ...@@ -8,11 +8,11 @@ public class FlowmeterDecryReportedDataStrategy {
private static DecryptReportedData decryptReportedData(String hexStr,String reportedDataStr){ private static DecryptReportedData decryptReportedData(String hexStr,String reportedDataStr){
switch (hexStr){ switch (hexStr){
case "B0": case "b0":
return new FlowmeterDecryptReportedData(reportedDataStr,"苍南EVC300体积修正仪"); return new FlowmeterDecryptReportedData(reportedDataStr,"苍南EVC300体积修正仪");
case "B1": case "b1":
return new FlowmeterDecryptReportedData(reportedDataStr,"天信CPU卡式体积修正仪"); return new FlowmeterDecryptReportedData(reportedDataStr,"天信CPU卡式体积修正仪");
case "B2": case "b2":
return new Tian5cFlowmeterDecryptReportedData(reportedDataStr,"天津市第五机床厂C型体积积算仪(天五C型)"); return new Tian5cFlowmeterDecryptReportedData(reportedDataStr,"天津市第五机床厂C型体积积算仪(天五C型)");
default: default:
return null; return null;
......
package com.zehong.gassdevicereport.netty; package com.zehong.gassdevicereport.netty.handler;
import com.zehong.gassdevicereport.constant.Constant; import com.zehong.gassdevicereport.constant.Constant;
import com.zehong.gassdevicereport.entity.ReciveReportData; import com.zehong.gassdevicereport.entity.ReciveReportData;
import com.zehong.gassdevicereport.entity.TDeviceReportData; import com.zehong.gassdevicereport.entity.TDeviceReportData;
import com.zehong.gassdevicereport.netty.decryptreported.flowmeter.DecryptReportedData; import com.zehong.gassdevicereport.netty.decryptreported.flowmeter.DecryptReportedData;
import com.zehong.gassdevicereport.netty.decryptreported.flowmeter.FlowmeterDecryReportedDataStrategy; import com.zehong.gassdevicereport.netty.decryptreported.flowmeter.FlowmeterDecryReportedDataStrategy;
import com.zehong.gassdevicereport.netty.decryptreported.PressureDecryptReportedData;
import com.zehong.gassdevicereport.service.ITDeviceReportDataService; import com.zehong.gassdevicereport.service.ITDeviceReportDataService;
import com.zehong.gassdevicereport.utils.CrcUtil; import com.zehong.gassdevicereport.utils.CrcUtil;
import com.zehong.gassdevicereport.utils.HexUtils; import com.zehong.gassdevicereport.utils.HexUtils;
...@@ -15,7 +14,6 @@ import io.netty.channel.ChannelHandler.Sharable; ...@@ -15,7 +14,6 @@ import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -29,9 +27,9 @@ import java.util.Date; ...@@ -29,9 +27,9 @@ import java.util.Date;
*/ */
@Component @Component
@Sharable @Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter { public class TCPServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(ServerHandler.class); private Logger logger = LoggerFactory.getLogger(TCPServerHandler.class);
@Autowired @Autowired
private RedisUtil redisUtil; private RedisUtil redisUtil;
...@@ -72,12 +70,14 @@ public class ServerHandler extends ChannelInboundHandlerAdapter { ...@@ -72,12 +70,14 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//msg为接收到的客户端传递的数据 个人这边直接传的json 数据 //msg为接收到的客户端传递的数据 个人这边直接传的json 数据
ByteBuf readMessage= (ByteBuf) msg; //ByteBuf readMessage= (ByteBuf) msg;
String msgStr = readMessage.toString(CharsetUtil.UTF_8); //String msgStr = readMessage.toString(CharsetUtil.UTF_8);
logger.info("receiveMsgstr======="+msgStr); ByteBuf buf = (ByteBuf)msg;
/*if(msgStr.length() != 124){ byte [] bytes1 = new byte[buf.readableBytes()];
return; //复制内容到字节数组bytes
}*/ buf.readBytes(bytes1);
String msgStr = HexUtils.bytesToHex(bytes1);
logger.info("receiveTCPMsgstr======="+msgStr);
byte[] bytes = HexUtils.hexToByteArray( msgStr.substring(0,msgStr.length()-4)); byte[] bytes = HexUtils.hexToByteArray( msgStr.substring(0,msgStr.length()-4));
String checkVule = CrcUtil.getCrcToHex(bytes,true); String checkVule = CrcUtil.getCrcToHex(bytes,true);
String checkCode = msgStr.substring(msgStr.length()-4,msgStr.length()); String checkCode = msgStr.substring(msgStr.length()-4,msgStr.length());
...@@ -86,20 +86,12 @@ public class ServerHandler extends ChannelInboundHandlerAdapter { ...@@ -86,20 +86,12 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
throw new Exception("接收数据校验失败"); throw new Exception("接收数据校验失败");
} }
SocketChannel channel=(SocketChannel)ctx.channel(); DecryptReportedData decryptReportedData = FlowmeterDecryReportedDataStrategy.getDecryptReportedData(msgStr);
int port = channel.localAddress().getPort(); if(null == decryptReportedData){
TDeviceReportData tDeviceReportData = new TDeviceReportData(); logger.error("未获取到流量计类型");
if(2396 == port){ throw new Exception("未获取到流量计类型");
DecryptReportedData decryptReportedData = FlowmeterDecryReportedDataStrategy.getDecryptReportedData(msgStr);
if(null == decryptReportedData){
logger.error("未获取到流量计类型");
throw new Exception("未获取到流量计类型");
}
tDeviceReportData = decryptReportedData.getReportedData();
}
if (65011 == port){
tDeviceReportData = new PressureDecryptReportedData(msgStr).getReportedData();
} }
TDeviceReportData tDeviceReportData = decryptReportedData.getReportedData();
//缓存设备信息 //缓存设备信息
ReciveReportData reciveReportData = new ReciveReportData(); ReciveReportData reciveReportData = new ReciveReportData();
reciveReportData.setDeviceCode(tDeviceReportData.getDeviceNum()); reciveReportData.setDeviceCode(tDeviceReportData.getDeviceNum());
......
package com.zehong.gassdevicereport.netty.handler;
import com.zehong.gassdevicereport.constant.Constant;
import com.zehong.gassdevicereport.entity.ReciveReportData;
import com.zehong.gassdevicereport.entity.TDeviceReportData;
import com.zehong.gassdevicereport.netty.decryptreported.PressureDecryptReportedData;
import com.zehong.gassdevicereport.service.ITDeviceReportDataService;
import com.zehong.gassdevicereport.utils.CrcUtil;
import com.zehong.gassdevicereport.utils.HexUtils;
import com.zehong.gassdevicereport.utils.RedisUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Sharable
public class UDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private Logger logger = LoggerFactory.getLogger(UDPServerHandler.class);
@Autowired
private RedisUtil redisUtil;
@Autowired
private ITDeviceReportDataService itDeviceReportDataService;
private final static String METER = "4";
private final static String METER_ALARM = "1";
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
//msg为接收到的客户端传递的数据 个人这边直接传的json 数据
ByteBuf buf = msg.content();
byte [] bytes1 = new byte[buf.readableBytes()];
//复制内容到字节数组bytes
buf.readBytes(bytes1);
String msgStr = HexUtils.bytesToHex(bytes1);
logger.info("receiveUPDMsgstr======="+msgStr);
byte[] bytes = HexUtils.hexToByteArray( msgStr.substring(0,msgStr.length()-4));
String checkVule = CrcUtil.getCrcToHex(bytes,true);
String checkCode = msgStr.substring(msgStr.length()-4,msgStr.length());
if(!checkVule.equals(checkCode)){
logger.error("接收数据校验失败");
throw new Exception("接收数据校验失败");
}
TDeviceReportData tDeviceReportData = new PressureDecryptReportedData(msgStr).getReportedData();
//缓存设备信息
ReciveReportData reciveReportData = new ReciveReportData();
reciveReportData.setDeviceCode(tDeviceReportData.getDeviceNum());
reciveReportData.setDeviceType(METER);
reciveReportData.setAlarmType(METER_ALARM);
reciveReportData.setPressure(tDeviceReportData.getPressure());
redisUtil.set(Constant.DEVICE_REDIS_KEY + reciveReportData.getDeviceCode(),tDeviceReportData);
//设备上报数据入库
itDeviceReportDataService.insertTDeviceReportData(tDeviceReportData);
}
}
...@@ -64,5 +64,8 @@ logging: ...@@ -64,5 +64,8 @@ logging:
#webSocektUrl #webSocektUrl
webSocektUrl: http://36.148.23.59:8901/gassafety/websocket/send webSocektUrl: http://36.148.23.59:8901/gassafety/websocket/send
netty: TCPNetty:
ports: 2396,65011 ports: 2396
UDPNetty:
ports: 65011
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment