Ví dụ mqtt client với moquette broker FutureConnection

Example  : mqtt client sử dụng  future kết nối broker

 

package com.callback.connect.future;

import java.security.SecureRandom;

import java.security.cert.CertificateException;

import java.security.cert.X509Certificate;

import javax.net.ssl.KeyManager;

import javax.net.ssl.SSLContext;

import javax.net.ssl.TrustManager;

import javax.net.ssl.X509TrustManager;

import org.apache.log4j.Logger;

import org.fusesource.mqtt.client.FutureConnection;

import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.Message;

import org.fusesource.mqtt.client.QoS;

import org.fusesource.mqtt.client.Topic;

 

public class AppClientMqtts {

 

    private static final Logger logger = Logger.getLogger(AppClientMqtts.class);

    static String MQTT_URL = "ssl://localhost:8883";

 

    public static void main(String args[]) {

        AppClientMqtts test_MQTTS = new AppClientMqtts();

        test_MQTTS.ConnectCamAapp();

    }

 

    public FutureConnection getMQTT(String clientId, String userName, String password, String udid) {

        try {

            MQTT mqtt = new MQTT();

            mqtt.setHost(MQTT_URL);

            mqtt.setCleanSession(true);

            mqtt.setUserName(userName);

            mqtt.setPassword(password);

            mqtt.setClientId(clientId);

            SSLContext sslContext = SSLContext.getInstance("TLS");

            sslContext.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());

            mqtt.setSslContext(sslContext);

            FutureConnection connection = mqtt.futureConnection();

            connection.connect().await();

            if (connection.isConnected()) {

                logger.info("connect success");

            } else {

                logger.info("connect fail");

            }

            return connection;

        } catch (Exception e) {

            logger.error("Error Init Mqtt Connection " + e);

            return null;

        }

 

    }

 

    public void ConnectCamAapp() {

        logger.info("start ConnectCamAapp");

        String appConf = "01100114999CHMQTTSCAMAPP,"

                + "4999CHMQTTS,"

                + "eacf750b0ab3ee76f9f78c76e34bd11ba1e9e05138dc96d4226885b36ac5bfa3,"

                + "app_gcm/a4b1dea25f4e86c330e2e8684a90e51e513ecfdd2c3edecc1d06d9f8a54c81ed/sub,"

                + "device_001/e3540cfa0e3d8e14c0777ca7bc17c86ef5b409a1da30a14b35a470f5d4b895c9/sub,"

                + "01100114999CHMQTTSCAMAPP";

        AppClient appClient = new AppClient(appConf);

        appClient.start();

 

    }

 

    static class DefaultTrustManager implements X509TrustManager {

 

        @Override

        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

        }

 

        @Override

        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

        }

 

        @Override

        public X509Certificate[] getAcceptedIssuers() {

            return new X509Certificate[0];

        }

    }

 

    class PublicMessage extends Thread {

 

        FutureConnection subConnection = null;

        String topicCam = "";

        String clientApp = "";

        String topicApp = "";

        String udid = "";

 

        public PublicMessage(FutureConnection mqtt, String topicCamPub, String clientId, String topicAppSub, String udid) {

            subConnection = mqtt;

            topicCam = topicCamPub;

            clientApp = clientId;

            topicApp = topicAppSub;

            this.udid = udid;

        }

 

        @Override

        public void run() {

 

            int count = 0;

            try {

 

                QoS qoSSubscrice = QoS.AT_MOST_ONCE;

 

                Topic topic = new Topic(topicApp, qoSSubscrice);

                Topic[] topics = {topic};

                subConnection.subscribe(topics);

 

                long startTime = System.currentTimeMillis();

                while (true) {

 

                    String msg = "";

                    msg = "2app_topic_sub=" + topicApp + "&time=" + System.currentTimeMillis() + "&req=get_session_key&mode=remote&port1=50915"

+"&ip=115.78.13.114&streamname=CC3A61D7DEC6_50915 ";

 

                    QoS qoSPublish = QoS.AT_MOST_ONCE;

 

                    long startSendMsg = System.currentTimeMillis();

                    subConnection.publish(topicCam, (msg + count).getBytes(), qoSPublish, false);

                    count++;

                    logger.info("App publish: " + msg);

 

                    if (count == 50000) {

                        logger.info("stop send");

                        break;

                    }

 

                }

            } catch (Exception ex) {

 

                logger.error("error publish messsage " + ex);

            }

            logger.info(clientApp + " total send: " + count);

 

        }

    }

 

    class ReceiveMessage extends Thread {

 

        FutureConnection receiveMqtt = null;

        String topicSub = "";

        String clientIdRecevie = "";

        long startTime;

        String udid = "";

 

        public ReceiveMessage(FutureConnection mqtt, String topicApp, String clientId, String udid) {

            receiveMqtt = mqtt;

            clientIdRecevie = clientId;

            topicSub = topicApp;

            this.udid = udid;

        }

 

        @Override

        public void run() {

             try {

               

                logger.info("subscribe topic " + topicSub);

                logger.info("wait received message from " + topicSub);

                int count = 0;

                while (true) {

                    Message message = receiveMqtt.receive().await();

                    String responseMsg = new String(message.getPayload());

                    logger.info("App receive :" + responseMsg);

                    count++;

                    logger.info("count: " + count);

                }

               

            } catch (Exception ex) {

                logger.error("error timeout received " + ex);

            }

        }

    }

 

    class AppClient extends Thread {

 

        String clientId = "";

        String userName = "";

        String password = "";

        String topicApp = "";

        String topicCam = "";

        String udid = "";

        public long totalLatency = 0;

        public int totalReceive = 0;

        String formatAppConf = ",";

 

        public AppClient(String appClient) {

            clientId = appClient.split(formatAppConf)[0];

            userName = appClient.split(formatAppConf)[1];

            password = appClient.split(formatAppConf)[2];

            topicApp = appClient.split(formatAppConf)[3];

            topicCam = appClient.split(formatAppConf)[4];

            udid = appClient.split(formatAppConf)[5];

        }

 

        @Override

        public void run() {

 

            FutureConnection futureConnection = getMQTT(clientId, userName, password, udid);

            ReceiveMessage receiveMessage = new ReceiveMessage(futureConnection, topicApp, clientId, udid);

            receiveMessage.start();

            PublicMessage publicMessage = new PublicMessage(futureConnection, topicCam, clientId, topicApp, udid);

            publicMessage.start();

 

        }

    }

}

 

Chú ý: mặc định  mqtt client sẽ tự động kết nối lại nếu mất kết nối. nếu muốn kết nối 1 lần ta sử dụng  setConnectAttemptsMax(0).