package com.callback.connect.future;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.log4j.Logger;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
public class AppClientMqtts {
private static final Logger logger = Logger.getLogger(AppClientMqtts.class);
static String MQTT_URL = "ssl://localhost:8883";
public static void main(String args[]) {
AppClientMqtts test_MQTTS = new AppClientMqtts();
test_MQTTS.ConnectCamAapp();
}
public FutureConnection getMQTT(String clientId, String userName, String password, String udid) {
try {
MQTT mqtt = new MQTT();
mqtt.setHost(MQTT_URL);
mqtt.setCleanSession(true);
mqtt.setUserName(userName);
mqtt.setPassword(password);
mqtt.setClientId(clientId);
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
mqtt.setSslContext(sslContext);
FutureConnection connection = mqtt.futureConnection();
connection.connect().await();
if (connection.isConnected()) {
logger.info("connect success");
} else {
logger.info("connect fail");
}
return connection;
} catch (Exception e) {
logger.error("Error Init Mqtt Connection " + e);
return null;
}
}
public void ConnectCamAapp() {
logger.info("start ConnectCamAapp");
String appConf = "01100114999CHMQTTSCAMAPP,"
+ "4999CHMQTTS,"
+ "eacf750b0ab3ee76f9f78c76e34bd11ba1e9e05138dc96d4226885b36ac5bfa3,"
+ "app_gcm/a4b1dea25f4e86c330e2e8684a90e51e513ecfdd2c3edecc1d06d9f8a54c81ed/sub,"
+ "device_001/e3540cfa0e3d8e14c0777ca7bc17c86ef5b409a1da30a14b35a470f5d4b895c9/sub,"
+ "01100114999CHMQTTSCAMAPP";
AppClient appClient = new AppClient(appConf);
appClient.start();
}
static class DefaultTrustManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
class PublicMessage extends Thread {
FutureConnection subConnection = null;
String topicCam = "";
String clientApp = "";
String topicApp = "";
String udid = "";
public PublicMessage(FutureConnection mqtt, String topicCamPub, String clientId, String topicAppSub, String udid) {
subConnection = mqtt;
topicCam = topicCamPub;
clientApp = clientId;
topicApp = topicAppSub;
this.udid = udid;
}
@Override
public void run() {
int count = 0;
try {
QoS qoSSubscrice = QoS.AT_MOST_ONCE;
Topic topic = new Topic(topicApp, qoSSubscrice);
Topic[] topics = {topic};
subConnection.subscribe(topics);
long startTime = System.currentTimeMillis();
while (true) {
String msg = "";
msg = "2app_topic_sub=" + topicApp + "&time=" + System.currentTimeMillis() + "&req=get_session_key&mode=remote&port1=50915"
+"&ip=115.78.13.114&streamname=CC3A61D7DEC6_50915 ";
QoS qoSPublish = QoS.AT_MOST_ONCE;
long startSendMsg = System.currentTimeMillis();
subConnection.publish(topicCam, (msg + count).getBytes(), qoSPublish, false);
count++;
logger.info("App publish: " + msg);
if (count == 50000) {
logger.info("stop send");
break;
}
}
} catch (Exception ex) {
logger.error("error publish messsage " + ex);
}
logger.info(clientApp + " total send: " + count);
}
}
class ReceiveMessage extends Thread {
FutureConnection receiveMqtt = null;
String topicSub = "";
String clientIdRecevie = "";
long startTime;
String udid = "";
public ReceiveMessage(FutureConnection mqtt, String topicApp, String clientId, String udid) {
receiveMqtt = mqtt;
clientIdRecevie = clientId;
topicSub = topicApp;
this.udid = udid;
}
@Override
public void run() {
try {
logger.info("subscribe topic " + topicSub);
logger.info("wait received message from " + topicSub);
int count = 0;
while (true) {
Message message = receiveMqtt.receive().await();
String responseMsg = new String(message.getPayload());
logger.info("App receive :" + responseMsg);
count++;
logger.info("count: " + count);
}
} catch (Exception ex) {
logger.error("error timeout received " + ex);
}
}
}
class AppClient extends Thread {
String clientId = "";
String userName = "";
String password = "";
String topicApp = "";
String topicCam = "";
String udid = "";
public long totalLatency = 0;
public int totalReceive = 0;
String formatAppConf = ",";
public AppClient(String appClient) {
clientId = appClient.split(formatAppConf)[0];
userName = appClient.split(formatAppConf)[1];
password = appClient.split(formatAppConf)[2];
topicApp = appClient.split(formatAppConf)[3];
topicCam = appClient.split(formatAppConf)[4];
udid = appClient.split(formatAppConf)[5];
}
@Override
public void run() {
FutureConnection futureConnection = getMQTT(clientId, userName, password, udid);
ReceiveMessage receiveMessage = new ReceiveMessage(futureConnection, topicApp, clientId, udid);
receiveMessage.start();
PublicMessage publicMessage = new PublicMessage(futureConnection, topicCam, clientId, topicApp, udid);
publicMessage.start();
}
}
}
Chú ý: mặc định mqtt client sẽ tự động kết nối lại nếu mất kết nối. nếu muốn kết nối 1 lần ta sử dụng setConnectAttemptsMax(0).