Ví dụ mqtt client với moquette broker Callback

Example: mqtt client sử dụng callback kết nối broker

 

package com.callback.connect.callback;

import java.security.SecureRandom;

import java.security.cert.CertificateException;

import java.security.cert.X509Certificate;

import javax.net.ssl.KeyManager;

import javax.net.ssl.SSLContext;

import javax.net.ssl.TrustManager;

import javax.net.ssl.X509TrustManager;

import org.apache.log4j.Logger;

import org.fusesource.hawtbuf.Buffer;

import org.fusesource.hawtbuf.UTF8Buffer;

import org.fusesource.mqtt.client.Callback;

import org.fusesource.mqtt.client.CallbackConnection;

import org.fusesource.mqtt.client.Listener;

import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.QoS;

import org.fusesource.mqtt.client.Topic;

 

public class AppClientMqtts {

 

    private static final Logger logger = Logger.getLogger(AppClientMqtts.class);

 

    static String MQTT_URL = "ssl://localhost:8883";

 

    public static void main(String args[]) {

        AppClientMqtts test_MQTTS = new AppClientMqtts();

        test_MQTTS.ConnectCamAapp();

    }

 

    public CallbackConnection getConnect(String clientId, String userName, String password, String udid) {

        try {

            MQTT mqtt = new MQTT();

            mqtt.setHost(MQTT_URL);

            mqtt.setCleanSession(true);

            mqtt.setUserName(userName);

            mqtt.setPassword(password);

            mqtt.setClientId(clientId);

            SSLContext sslContext = SSLContext.getInstance("TLS");

            sslContext.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());

            mqtt.setSslContext(sslContext);

            CallbackConnection subscribeConnection = mqtt.callbackConnection();

 

            return subscribeConnection;

        } catch (Exception e) {

            logger.error("Error Init Mqtt Connection " + e);

            return null;

        }

 

    }

 

    public void ConnectCamAapp() {

        logger.info("start ConnectCamAapp");

        String appConf = "01100114999CHMQTTSCAMAPP,"

                + "4999CHMQTTS,"

                + "eacf750b0ab3ee76f9f78c76e34bd11ba1e9e05138dc96d4226885b36ac5bfa3,"

                + "app_gcm/a4b1dea25f4e86c330e2e8684a90e51e513ecfdd2c3edecc1d06d9f8a54c81ed/sub,"

                + "device_001/e3540cfa0e3d8e14c0777ca7bc17c86ef5b409a1da30a14b35a470f5d4b895c9/sub,"

                + "01100114999CHMQTTSCAMAPP";

        AppClient appClient = new AppClient(appConf);

        appClient.start();

 

    }

 

    static class DefaultTrustManager implements X509TrustManager {

 

        @Override

        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

        }

 

        @Override

        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

        }

 

        @Override

        public X509Certificate[] getAcceptedIssuers() {

            return new X509Certificate[0];

        }

    }

 

    class PublicMessage extends Thread {

 

        CallbackConnection subConnection = null;

        String topicCam = "";

        String clientApp = "";

        String topicApp = "";

        String udid = "";

 

        public PublicMessage(CallbackConnection blockingConnection, String topicCamPub, String clientId, String topicAppSub, String udid) {

            subConnection = blockingConnection;

            topicCam = topicCamPub;

            clientApp = clientId;

            topicApp = topicAppSub;

            this.udid = udid;

        }

 

        @Override

        public void run() {

 

            int count = 0;

 

            while (true) {

                try {

                    String msg = "2app_topic_sub="

                            + topicApp + "&time=" + System.currentTimeMillis() + "&req=get_session_key&mode=remote&port1=50915"

+"&ip=115.78.13.114&streamname=CC3A61D7DEC6_50915 ";

                    subConnection.publish(topicCam, msg.getBytes(), QoS.AT_MOST_ONCE, false, new Callback<Void>() {

                        @Override

                        public void onSuccess(Void arg0) {

                            logger.info("publish msg success ");

                        }

 

                        @Override

                        public void onFailure(Throwable arg0) {

                            logger.info("publish fail");

                        }

                    });

                    count++;

                    if (count == 1) {

                        logger.info("stop send " + count);

                        break;

                    }

                } catch (Exception ex) {

                    logger.error("error publish messsage cutx ");

                }

            }

 

            logger.info(clientApp + " total send: " + count);

 

        }

    }

 

    class ReceiveMessage extends Thread {

 

        CallbackConnection subConnection = null;

        String topicSub = "";

        String clientIdRecevie = "";

        long startTime;

        String udid = "";

 

        public ReceiveMessage(CallbackConnection blockingConnection, String topicApp, String clientId, String udid) {

            subConnection = blockingConnection;

            clientIdRecevie = clientId;

            topicSub = topicApp;

            this.udid = udid;

        }

 

        @Override

        public void run() {

 

            try {

 

                subConnection.connect(new Callback<Void>() {

                    @Override

                    public void onSuccess(Void arg0) {

                        logger.info("connect success");

                        QoS qoSSubscrice = QoS.AT_MOST_ONCE;

                        Topic topic = new Topic(topicSub, qoSSubscrice);

                        Topic[] topics = {topic};

                        subConnection.subscribe(topics, new Callback<byte[]>() {

                            @Override

                            public void onSuccess(byte[] arg0) {

                                logger.info("subscribe topic " + topicSub);

                                logger.info("subscribe success");

                            }

 

                            @Override

                            public void onFailure(Throwable arg0) {

                                logger.info("subscribe fail");

                            }

                        });

                    }

 

                    @Override

                    public void onFailure(Throwable arg0) {

                        logger.info("connect fail");

                    }

                });

 

                subConnection.listener(new Listener() {

                    @Override

                    public void onConnected() {

                        logger.info("wait receive message from topic " + topicSub);

                    }

 

                    @Override

                    public void onDisconnected() {

                        logger.info(udid + " disconnect");

                    }

                    int count = 0;

                    QoS qoSPublish = QoS.AT_MOST_ONCE;

 

                    @Override

                    public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) {

 

                        count++;

                        String responseMsg = body.utf8().toString();

                        logger.info("App receive: " + count + "-" + responseMsg);

 

                    }

 

                    @Override

                    public void onFailure(Throwable value) {

                        logger.info(clientIdRecevie + " connect fail");

                    }

                });

                synchronized (Listener.class) {

                    while (true) {

                        Listener.class.wait();

                    }

                }

 

            } catch (Exception ex) {

                logger.error("error timeout received " + ex);

            }

        }

    }

 

    class AppClient extends Thread {

 

        String clientId = "";

        String userName = "";

        String password = "";

        String topicApp = "";

        String topicCam = "";

        String udid = "";

        public long totalLatency = 0;

        public int totalReceive = 0;

        String formatAppConf = ",";

 

        public AppClient(String appClient) {

            clientId = appClient.split(formatAppConf)[0];

            userName = appClient.split(formatAppConf)[1];

            password = appClient.split(formatAppConf)[2];

            topicApp = appClient.split(formatAppConf)[3];

            topicCam = appClient.split(formatAppConf)[4];

            udid = appClient.split(formatAppConf)[5];

        }

 

        @Override

        public void run() {

 

            CallbackConnection connection = getConnect(clientId, userName, password, udid);

            ReceiveMessage receiveMessage = new ReceiveMessage(connection, topicApp, clientId, udid);

            receiveMessage.start();

 

            PublicMessage publicMessage = new PublicMessage(connection, topicCam, clientId, topicApp, udid);

            publicMessage.start();

 

        }

    }

}

 

Chú ý: mặc định  mqtt client sẽ tự động kết nối lại nếu mất kết nối. nếu muốn kết nối 1 lần ta sử dụng  setConnectAttemptsMax(0).