MQTT + EMQX消息中间件

参考

(76条消息) mqtt协议与emqx相关使用_emqx和 mqtt的关系_Wzzzzzzp的博客-CSDN博客

EMQX vs Mosquitto | 2023 MQTT Broker 对比 - 简书 (jianshu.com)

emqx集群+nginx负载均衡链接https://blog.csdn.net/abc_cml/article/details/127801264

mqtt协议其实就是一个及时通讯协议,跟rocketMQ类似,也可以说是一个消息中间件.

MQTT (message queuing telemetry transport) 是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。

作为一个传递消息的协议,mqtt是基于一个”发布者->代理服务器->消费者”的一个流程进行的

发布者 - ->发布(主题)信息给代理–>代理 –> 代理发送信息给(订阅者)订阅了主题客户端 ,有订阅主题的才会接收到信息

发布者负责消息的发布,定制好对应的消息就可以根据topic来把消息发送到服务器上,然后消费者就可以根据对应的topic来实现消息的读取,这样的一个流程就是mqtt发送消息到接收消费消息的一个过程.

EMQX
emqx是实现mqtt的一个消息中间件,当然还有别的一些实现,笔者这里没有使用过,因此就不做记录,为什么会使用到emqx作为消息中间件呢,因为目前做的一个项目上,涉及到这样一个需求:“设备接入”,外部设备接入到目前开发的系统中,并且设备接入后,要保证设备发送的数据实时存储并更新到后台界面,有一个直观的展示,所以在设备接入完成后,就需要一个渠道来实现消息的发送.我们就采用了emqx作为设备和平台数据交互的一个中间件.
平台是基于springboot开发的一个maven项目,关于mqtt和springboot的集成,请自行百度,网上的例子很多,这里就不进行过多的赘述.
值得一提的是,关于生成mqtt的bean对象的过程,因为使用了springboot,并且也没有涉及到集群相关的内容,所以直接就把mqtt的初始化对象做成了单例并放入了springboot启动的过程中(springboot启动时就保证mqtt也注册到相应的服务器上),下面给出代码

config

@Configuration//使用@Configuration 的注解类表示这个类可以使用 Spring IoC容器作为bean 定义的来源
public class MqttConfig {
/**
* 代理服务器ip地址
*/
@Value("${mqtt.url}")//都是直接从配置文件中读数据(不会请自行百度)
public String MQTT_BROKER_HOST;

/**
* qos
*/
@Value("${mqtt.qos}")
public int QOS;

/**
* topic
*/
private static final String TOPIC = "xxxx/#";//topic 前缀可自定义 "/"可作为分隔符 "#"代表接收所有的数据

@Bean//@Bean注解告诉 Spring,一个带有 @Bean 的注解方法将返回一个对象,该对象应该被注册为在 Spring 应用程序上下文中的 bean
public void startMqttPushClient() {
MqttPushClient.MQTT_HOST = MQTT_BROKER_HOST;
MqttPushClient.MQTT_CLIENTID = System.currentTimeMillis() + "";
MqttPushClient instance = MqttPushClient.getInstance();
instance.subscribe(TOPIC, QOS);
}
}

client

public class MqttPushClient {

private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
public static String MQTT_HOST = "";
public static String MQTT_CLIENTID = "";
public static String MQTT_USERNAME = "";
public static String MQTT_PASSWORD = "";
public static int MQTT_TIMEOUT = 10;
public static int MQTT_KEEPALIVE = 10;

private MqttClient client;
private static volatile MqttPushClient mqttClient = null;

//获得实例(单例)
public static MqttPushClient getInstance() {
if (mqttClient == null) {
synchronized (MqttPushClient.class) {
if (mqttClient == null) {
mqttClient = new MqttPushClient();
}
}
}
return mqttClient;
}

private MqttPushClient() {
log.info("Connect MQTT: " + this);
connect();
}

private void connect() {
try {
client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
MqttConnectOptions option = new MqttConnectOptions();
option.setCleanSession(true);
// 设置用户名
// option.setUserName(MQTT_USERNAME);
// 设置密码
// option.setPassword(MQTT_PASSWORD.toCharArray());
option.setConnectionTimeout(MQTT_TIMEOUT);
option.setKeepAliveInterval(MQTT_KEEPALIVE);
option.setAutomaticReconnect(true);
try {
client.setCallback(new MqttPushCallback());//回调
client.connect(option);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 订阅某个主题 qos默认为1
*
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 1);
}

/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}

callback mqtt回调

public class MqttPushCallback implements MqttCallback {//一定要实现MqttCallback接口

private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);

@Override
public void connectionLost(Throwable cause) {
log.info("连接断开,正在尝试重新连接");
cause.printStackTrace();
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}

/**
* 处理接收到的消息
*
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//接收到订阅的消息+topic 可以在这里进行消息的逻辑处理

}
}