Iot推送系统
Iot
物联网( IoT ,Internet of things )即“万物相连的互联网”,是互联网基础上的延伸和扩展的网络,将各种信息传感设备与网络结合起来而形成的一个巨大网络,实现任何时间、任何地点,人、机、物的互联互通.
也就是设备与设备 或 设备与人之间的交互。
物联网推送系统设计
物联网中的推送系统和互联网的推送系统很相似;但对于物联网有他的特性,由于要接入海量的硬件设备和传感器,且协议多样化,同时还要在极短的时间内处理大量的数据,所以对服务端的协议接入和处理能力要求极高。
-
app去连接推送服务器
-
设备连接注册中心并获得到注册中心地址
-
添加多服务器来解决大批量连接问题
-
添加注册中心解决服务器的管理 所有设备去访问注册中心获取地址,做一个代理 ;
服务器去注册更新
-
建立连接,以保证数据的实时性,设备订阅需要设备需要的推送消息 ,每个设备可能想要的不同
-
创建redis 用于推送消息,缓存起来消息,记录设备连接服务器的信息 订阅的主题信息
-
使用httpAPI接口 rpc接口来作为消息推送件
-
mq消息中间件存储消息,用服务器去订阅消息
利用netty来构建出一个iot消息推送系统 。
@PostConstruct
public void init() throws Exception {
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
log.info("Starting MQTT transport server");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
NioEventLoopGroup bizGroup = new NioEventLoopGroup(100, new DefaultEventExecutor(new DefaultThreadFactory("biz-pool")));
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
//pipeline.addLast("idleStateHandler", new IdleStateHandler(10,2,12, TimeUnit.SECONDS));
MqttTransportHandler handler = new MqttTransportHandler(protocolProcess);
pipeline.addLast(bizGroup, handler);
}
});
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
}
消息处理机制实现
这里使用 ProtocolProcess 来作为工厂类创建多个工厂创建类
然后使用的mqtt协议来实现的消息间传输
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
网络应用层的协议。也是广泛应用在物联网中,也是得益于他自己的格式,头部很简单,方便使用,占用宽带小也是他的特性,具体的协议格式可以看看下面的文章
MQTT协议
Netty中心跳检测机制
在针对海量设备连接iot服务端时,一定要添加完整的服务端策略,并且及时检测失效连接。防止无效的连接继续存在;需要设置合理的心跳周期,防止心跳任务挤压。使用netty提供链路空闲检测机制。不要自己创建定时任务群,以免过多消耗服务端的资源。
心跳检测周期通常不要超过60s,心跳检测超时通常为心跳检测周期的2倍
在netty中使用方式 ,在启动时获取到 channel
//处理连接心跳包
if (msg.variableHeader().keepAliveTimeSeconds() > 0) {
if (channel.pipeline().names().contains("idle")) {
channel.pipeline().remove("idle");
}
channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
}
这里其中包括的参数
- 第一个参数设置请求参数超时心跳检测时间
- 第二个参数设置响应参数超时心跳检测时间
- 第三个参数 将在既不执行读取也不执行写入时触发
/**
* Creates a new instance firing {@link IdleStateEvent}s.
*
* @param readerIdleTimeSeconds
* 将在未对指定对象执行读取时触发
* @param writerIdleTimeSeconds
* 将在未对指定的执行写入时触发
* @param allIdleTimeSeconds
将在既不执行读取也不执行写入时触发
*/
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
怎么去实现这个心跳机制 ,在netty的注解中找到答案。
* // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
* public class MyHandler extends {@link ChannelDuplexHandler} {
* {@code @Override}
* public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
* if (evt instanceof {@link IdleStateEvent}) {
* {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
* if (e.state() == {@link IdleState}.READER_IDLE) {
* ctx.close();
* } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
* ctx.writeAndFlush(new PingMessage());
* }
* }
* }
* }
这里的触发程序一定要从IdleStateEvent 中去检测,而使用的方式,例如
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
Channel channel = ctx.channel();
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
// 发送遗嘱消息
if (this.protocolProcess.getGrozaSessionStoreService().containsKey(clientId)) {
SessionStore sessionStore = this.protocolProcess.getGrozaSessionStoreService().get(clientId);
if (sessionStore.getWillMessage() != null) {
this.protocolProcess.publish().processPublish(ctx.channel(), sessionStore.getWillMessage());
}
}
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
心跳检测的过程
也就是检测客户端是否失联。
说一点并不是所有的网络应用程序都有心跳检测机制,根据业务判断的。
百万连接优化
测试环境
计算机区分不同连接的方式:TCP连接四元组->服务器 IP +服务器port+客户端ip +客户端port
一台服务器和一台客户端最多支撑6万个连接。 而在服务端怎么区分连接的也就是服务器 IP +服务器port+客户端ip +客户端port。
调整为
这样就能解决百万连接的端口号数据不够的情况 服务端建立
//这里开启 10000到100099这100个端口
for (int i = 0; i < nPort; i++) {
int port = beginPort + i;
bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
System.out.println("端口绑定成功: " + port);
});
}
这里启动服务端时,有可能会出现下面的问题
文件句柄数不够的情况
在操作系统中有个 文件中可以控制打开文件的句柄数,通过 cat /proc/sys/fs/file-max 可以看到
这就需要修改一改一下配置。 使用 vim /etc/sysctl.conf
使用 sysclt -p 使之生效
以及 单个文件打开数 使用 ulimit -a open -file 都需要修改
也会报错的。
使用 vim /etc/security/limits.conf 去调整
以及 tail -f 这些linux基本命令大家应该都会,这里就不深入讲解了。
下面对linux进行配置,在/etc/sysctl.conf 添加配置 一个连接大约在系统中占7.5kb
设置tcp连接内存 tcp_mem 单位为pg 页 一个页为 4kb, getconf pagesize 查看page占用,第一个数最小值 第二个运行 默认值,第三个 最大连接值
tcp_wmem tcp写入缓冲区
tcp_rmem tcp读缓冲区
包括保持时间等等 按照这个配置就行
一定要执行sysclt -p 然后生效。