
【MQTT】 物联网 EMQX本地部署及消息发布/订阅
MQTT 是用于物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极其轻量级的发布/订阅消息传递传输,非常适合连接具有小代码占用空间和最小网络带宽的远程设备。如今,MQTT被广泛应用于各种行业,如汽车、制造、电信、石油和天然气等。EMQ X (简称 EMQ), 是一款完全开源,高度可伸缩,高可用的分布式 MQTT消息服务器,同时也支持 CoAP/LwM2M 一站式 IoT 协议接
MQTT概述
MQTT 是用于物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极其轻量级的发布/订阅消息传递传输,非常适合连接具有小代码占用空间和最小网络带宽的远程设备。 如今,MQTT被广泛应用于各种行业,如汽车、制造、电信、石油和天然气等。
为什么选择MQTT?
轻量化、高效
MQTT客户端非常小,需要最少的资源,因此可以在小型微控制器上使用。MQTT 消息头很小,可以优化网络带宽。
双向通信
MQTT 允许在设备到云以及云到设备之间进行消息传递。这样可以很容易地将消息广播到事物组。
扩展到数百万种事物
MQTT 可以扩展以连接数百万台物联网设备。
可靠的消息传递
消息传递的可靠性对于许多物联网用例都很重要。这就是为什么 MQTT 有 3 个定义的服务质量级别:0 - 最多一次,1- 至少一次,2 - 正好一次
支持不可靠的网络
许多物联网设备通过不可靠的蜂窝网络进行连接。MQTT 对持久会话的支持减少了将客户端与代理重新连接的时间。
启用安全性
MQTT 使使用 TLS 加密消息和使用现代身份验证协议(如 OAuth)对客户端进行身份验证变得容易。
MQTT 发布/订阅架构
EMQX概述
EMQ X (简称 EMQ), 是一款完全开源,高度可伸缩,高可用的分布式 MQTT消息服务器,同时也支持 CoAP/LwM2M 一站式 IoT 协议接入。EMQ 是 5G时代万物互联的消息引擎,适用于 IOT、M2M 和移动应用程序,可处理千万级别的并发客户端。
EMQX部署
本地部署
以ubuntu为例,
wget https://www.emqx.com/zh/downloads/broker/5.3.2/emqx-5.3.2-ubuntu22.04-amd64.deb
sudo dpkg -i emqx-5.3.2-ubuntu22.04-amd64.deb
启动服务
sudo service emqx start
访问http://localhost:18083/, 默认账号密码admin/public
云服务serverless部署
注册
建立连接并发送消息
方式一:MQTTX 测试发送消息
下载MQTTX
安装后建立连接
发送消息
方式二:利用代码实现
以PHP为例
安装扩展包
composer require php-mqtt/client
编写代码
<?php
require('vendor/autoload.php');
use \PhpMqtt\Client\MqttClient;
use \PhpMqtt\Client\ConnectionSettings;
$server = 'localhost';
$port = 1883;
$clientId = rand(5, 15);
$username = 'emqx_user';
$password = 'public';
$clean_session = false;
$mqtt_version = MqttClient::MQTT_3_1_1;
$connectionSettings = (new ConnectionSettings)
->setUsername($username)
->setPassword($password)
->setKeepAliveInterval(60)
->setLastWillTopic('emqx/test/last-will')
->setLastWillMessage('client disconnect')
->setLastWillQualityOfService(1);
$mqtt = new MqttClient($server, $port, $clientId, $mqtt_version);
$mqtt->connect($connectionSettings, $clean_session);
printf("client connected\n");
$mqtt->subscribe('emqx/test', function ($topic, $message) {
printf("Received message on topic [%s]: %s\n", $topic, $message);
}, 0);
for ($i = 0; $i< 10; $i++) {
$payload = array(
'protocol' => 'tcp',
'date' => date('Y-m-d H:i:s'),
'url' => 'https://github.com/emqx/MQTT-Client-Examples'
);
$mqtt->publish(
// topic
'emqx/test',
// payload
json_encode($payload),
// qos
0,
// retain
true
);
printf("msg $i send\n");
sleep(1);
}
$mqtt->loop(true);
执行代码
php mqtt.php
以Golang为例
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"package main
import (
"fmt"
"log"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("emqx_test_client")
opts.SetKeepAlive(60 * time.Second)
// 设置消息回调处理函数
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 订阅主题
if token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// 发布消息
token := c.Publish("testtopic/1", 0, false, "Hello World")
token.Wait()
time.Sleep(6 * time.Second)
// 取消订阅
if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// 断开连接
c.Disconnect(250)
time.Sleep(1 * time.Second)
}
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")
opts.SetKeepAlive(60 * time.Second)
// 设置消息回调处理函数
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 订阅主题
if token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// 发布消息
token := c.Publish("testtopic/1", 0, false, "Hello World")
token.Wait()
time.Sleep(6 * time.Second)
// 取消订阅
if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// 断开连接
c.Disconnect(250)
time.Sleep(1 * time.Second)
}
更多推荐
所有评论(0)