MQTT + EMQX消息中间件
参考
(76条消息) mqtt协议与emqx相关使用_emqx和 mqtt的关系_Wzzzzzzp的博客-CSDN博客
EMQX vs Mosquitto | 2023 MQTT Broker 对比 - 简书 (jianshu.com)
emqx集群+nginx负载均衡链接 https://blog.csdn.net/abc_cml/article/details/127801264
mqtt协议其实就是一个及时通讯协议,跟rocketMQ 类似,也可以说是一个消息中间件.
MQTT (message queuing telemetry transport) 是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。
作为一个传递消息的协议,mqtt是基于一个”发布者->代理服务器->消费者”的一个流程进行的
发布者 - ->发布(主题)信息给代理–>代理 –> 代理发送信息给(订阅者)订阅了主题客户端 ,有订阅主题的才会接收到信息
发布者负责消息的发布,定制好对应的消息就可以根据topic来把消息发送到服务器上,然后消费者就可以根据对应的topic来实现消息的读取,这样的一个流程就是mqtt发送消息到接收消费消息的一个过程.
EMQX emqx是实现mqtt的一个消息中间件,当然还有别的一些实现,笔者这里没有使用过,因此就不做记录,为什么会使用到emqx作为消息中间件呢,因为目前做的一个项目上,涉及到这样一个需求:“设备接入”,外部设备接入到目前开发的系统中,并且设备接入后,要保证设备发送的数据实时存储并更新到后台界面,有一个直观的展示,所以在设备接入完成后,就需要一个渠道来实现消息的发送.我们就采用了emqx作为设备和平台数据交互的一个中间件. 平台是基于springboot开发的一个maven项目,关于mqtt和springboot的集成,请自行百度,网上的例子很多,这里就不进行过多的赘述. 值得一提的是,关于生成mqtt的bean对象的过程,因为使用了springboot,并且也没有涉及到集群相关的内容,所以直接就把mqtt的初始化对象做成了单例并放入了springboot启动的过程中(springboot启动时就保证mqtt也注册到相应的服务器上),下面给出代码
config @Configuration//使用@Configuration 的注解类表示这个类可以使用 Spring IoC容器作为bean 定义的来源 public class MqttConfig { /** * 代理服务器ip地址 */ @Value("${mqtt.url}")//都是直接从配置文件中读数据(不会请自行百度) public String MQTT_BROKER_HOST; /** * qos */ @Value("${mqtt.qos}") public int QOS; /** * topic */ private static final String TOPIC = "xxxx/#";//topic 前缀可自定义 "/"可作为分隔符 "#"代表接收所有的数据 @Bean//@Bean注解告诉 Spring,一个带有 @Bean 的注解方法将返回一个对象,该对象应该被注册为在 Spring 应用程序上下文中的 bean public void startMqttPushClient() { MqttPushClient.MQTT_HOST = MQTT_BROKER_HOST; MqttPushClient.MQTT_CLIENTID = System.currentTimeMillis() + ""; MqttPushClient instance = MqttPushClient.getInstance(); instance.subscribe(TOPIC, QOS); } }
client public class MqttPushClient { private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); public static String MQTT_HOST = ""; public static String MQTT_CLIENTID = ""; public static String MQTT_USERNAME = ""; public static String MQTT_PASSWORD = ""; public static int MQTT_TIMEOUT = 10; public static int MQTT_KEEPALIVE = 10; private MqttClient client; private static volatile MqttPushClient mqttClient = null; //获得实例(单例) public static MqttPushClient getInstance() { if (mqttClient == null) { synchronized (MqttPushClient.class) { if (mqttClient == null) { mqttClient = new MqttPushClient(); } } } return mqttClient; } private MqttPushClient() { log.info("Connect MQTT: " + this); connect(); } private void connect() { try { client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence()); MqttConnectOptions option = new MqttConnectOptions(); option.setCleanSession(true); // 设置用户名 // option.setUserName(MQTT_USERNAME); // 设置密码 // option.setPassword(MQTT_PASSWORD.toCharArray()); option.setConnectionTimeout(MQTT_TIMEOUT); option.setKeepAliveInterval(MQTT_KEEPALIVE); option.setAutomaticReconnect(true); try { client.setCallback(new MqttPushCallback());//回调 client.connect(option); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 订阅某个主题 qos默认为1 * * @param topic */ public void subscribe(String topic) { subscribe(topic, 1); } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (Exception e) { e.printStackTrace(); } } }
callback mqtt回调 public class MqttPushCallback implements MqttCallback {//一定要实现MqttCallback接口 private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class); @Override public void connectionLost(Throwable cause) { log.info("连接断开,正在尝试重新连接"); cause.printStackTrace(); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } /** * 处理接收到的消息 * * @param topic * @param message * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { //接收到订阅的消息+topic 可以在这里进行消息的逻辑处理 } }