MQTT(MQ Telemetry Transport)是IBM开发的一种网络应用层的协议

使用场景:

1、不可靠、网络带宽小的网络

2、运行的设备CPU、内存非常有限

特点:

1、基于发布/订阅模型的协议

2、他是二进制协议,二进制的特点就是紧凑、占用空间小。他的协议头只有2个字节

3、提供了三种消息可能性保障:最多一次 0、最少一次 1、只有一次 2

maven依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

发送消息示例

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 发送数据到mqtt服务器
 *
 * @author max.sun
 * @created on 2018-04-17 11:12
 */
public class PublishMsg {
  private static int qos = 2; // 只有一次
  private static String broker = "tcp://192.168.51.232:1883";
  private static String userName = "admin";
  private static String passWord = "password";

  private static MqttClient connect(String clientId, String userName, String password) throws MqttException {
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setCleanSession(true);
    connOpts.setUserName(userName);
    connOpts.setPassword(password.toCharArray());
    connOpts.setConnectionTimeout(10);
    connOpts.setKeepAliveInterval(20);
    // String[] uris = {"tcp://192.168.51.232:1883","tcp://192.168.51.233:1883"};
    // connOpts.setServerURIs(uris); //起到负载均衡和高可用的作用
    MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
    mqttClient.setCallback(new PushCallback("test"));
    mqttClient.connect(connOpts);
    return mqttClient;
  }

  private static void pub(MqttClient sampleClient, String msg, String topic) throws MqttException {
    MqttMessage message = new MqttMessage("ertwersdfas".getBytes());
    message.setQos(qos);
    message.setRetained(false);
    sampleClient.publish(topic, message);
  }

  private static void publish(String str, String clientId, String topic) throws MqttException {
    MqttClient mqttClient = connect(clientId, userName, passWord);

    if (mqttClient != null) {
      pub(mqttClient, str, topic);
      System.out.println("pub-->" + str);
    }

    if (mqttClient != null) {
      mqttClient.disconnect();
    }
  }

  public static void main(String[] args) throws MqttException {
    publish("message content", "client-id-0", "$share/edge/server/public/a");
  }
}

class PushCallback implements MqttCallback {
  private String threadId;

  public PushCallback(String threadId) {
    this.threadId = threadId;
  }

  public void connectionLost(Throwable cause) {

  }

  public void deliveryComplete(IMqttDeliveryToken token) {
    // System.out.println("deliveryComplete---------" + token.isComplete());
  }

  public void messageArrived(String topic, MqttMessage message) throws Exception {
    String msg = new String(message.getPayload());
    System.out.println(threadId + " " + msg);
  }
}

消费消息示例

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 发送数据到mqtt服务器
 *
 * @author max.sun
 * @created on 2018-04-17 11:12
 */

public class SubscribeMsg {

  // private static String topic = "$share/testgroup/wyptest1";
  // private static String topic = "$queue/wyptest1";
  // private static String topic = "wyptest1";
  private static int qos = 2;
  private static String broker = "tcp://192.168.51.232:1883";
  private static String userName = "admin";
  private static String passWord = "password";

  private static MqttClient connect(String clientId) throws MqttException {
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConnectOptions connOpts = new MqttConnectOptions();
    // String[] uris = {"tcp://192.168.51.232:1883","tcp://192.168.51.233:1883"};
    connOpts.setCleanSession(false);
    connOpts.setUserName(userName);
    connOpts.setPassword(passWord.toCharArray());
    connOpts.setConnectionTimeout(10);
    connOpts.setKeepAliveInterval(20);
    // connOpts.setServerURIs(uris);
    // connOpts.setWill(topic, "close".getBytes(), 2, true);
    MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
    mqttClient.connect(connOpts);
    return mqttClient;
  }

  public static void sub(MqttClient mqttClient, String topic) throws MqttException {
    int[] Qos = { qos };
    String[] topics = { topic };
    mqttClient.subscribe(topics, Qos);
  }

  private static void runsub(String clientId, String topic) throws MqttException {
    MqttClient mqttClient = connect(clientId);
    if (mqttClient != null) {
      sub(mqttClient, topic);
    }
  }

  public static void main(String[] args) throws MqttException {

    runsub("client-id-1", "$share/testgroupa/edge/server/private/+");
  }
}

 

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注