Mapping MQTT and JMS Clients
JMS clients and MQTT clients can share the same Topic destinations when they are run simultaneously. Messages published by MQTT/JMS clients are automatically converted to suit JMS/MQTT consumers.
MQTT Publish packets with QOS levels 1 or 2 are treated as JMS messages with delivery mode set to Persistent mode and follow the same semantics of a Persistent JMS Message. When an MQTT Publisher sends a packet to the FioranoMQ Server, depending on the QOS level, it is stored into the Topic destination and forwarded to the corresponding subscribers.
Case 1
MQTT Publisher publishes on to a Topic destination which has a JMS Subscriber subscribed to it. The MQTT Packet will be sent in the form of JMS BytesMessage to the JMS Subscriber after constructing a BytesMessage with MQTT Publish packet as its payload.
Case 2
JMS Publisher publishes a JMS Message on to a Topic destination which has an MQTT Subscriber. The JMS message is converted to an MQTT Publish packet before delivering it to the MQTT Subscriber. Contents such as Message properties and body are dumped to the payload of the MQTT Publish packet. Therefore, MQTT client Consumers must be aware of the origin of packets it can receive and read the payload accordingly.
If an MQTT Subscriber receives a JMS Message in the form of MQTT Publish Packet, it must therefore assume that the first four bytes in the payload indicate the number of message properties followed by key-value pairs; and then the size of data followed by the data part.
Writing Fiorano MQTT Clients
Fiorano's MQTT client libraries are available in Java language as of 10.2.0 version. This section illustrates an example to Publish/Subscribe MQTT packets and various methods to be used to write their own application.
FioranoMqttClient is a lightweight client communicating to an MQTT Server using its methods and implements the interface IFioranoMqttClient. Following are the jars required for the MQTT client:
- mqttpackets-impl.jar
- mqtttransport-impl.jar
- mqttutil-impl.jar
An instance of FioranoMqttClient class can be fetched using its static method 'getFioranoMqttClient'.
Following publisher code creates an MQTT connection and publishes MQTT Publish Packet with various parameters set.
Publisher Code Sample
//Get the fioranoMqttClient object
try (IFioranoMqttClient mqttClient = FioranoMqttClient.getFioranoMqttClient(mqttProviderURL)) {
//Create connection
boolean connectionInfoAlreadyPresent = mqttClient.connect(clientID, cleanStart, (short) keepAlive);
System.out.println("Created connection. Connection info already present : " + connectionInfoAlreadyPresent);
//Set the exception listener
mqttClient.setExceptionListener(new ExceptionListener() {
@Override
public void onException(FioranoMqttException exception) {
exception.printStackTrace();
}
});
//Set the completion listener before publishing messages
mqttClient.setCompletionListener(new CompletionListener() {
@Override
public void onCompletion(int msgID) {
System.out.println("Successfully sent publish packet with ID : " + msgID);
}
@Override
public void onException(int msgID, Exception mqttException) {
System.out.println("Exception occurred while publishing packet with ID : " +
msgID + "\t Exception : " + mqttException);
}
});
Following Subscriber Code creates a Subscriber and consumes packets.
Subscriber Code Sample
//Get the fioranoMqttClient object
try (IFioranoMqttClient mqttClient = FioranoMqttClient.getFioranoMqttClient(mqttProviderURL)) {
//Create connection
boolean connectionInfoAlreadyPresent = mqttClient.connect(clientID, cleanStart, (short) keepAlive);
System.out.println("Created connection. Connection info already present : " + connectionInfoAlreadyPresent);
//Set the exception listener
mqttClient.setExceptionListener(new ExceptionListener() {
@Override
public void onException(FioranoMqttException exception) {
exception.printStackTrace();
}
});
//Set the message listener
mqttClient.setMessageListener(new MessageListener() {
@Override
public void onMessage(String topicName, byte[] payload, int qos, boolean isRetain) {
String payloadString = (payload != null) ? new String(payload) : null;
if ("unSubscribe".equalsIgnoreCase(payloadString))
unSubscribe = true;
System.out.println("Received message : msg : " + payloadString + "\t topicName : "
+ topicName + " \t QOS : " + qos + "\t isRetain : " + isRetain);
}
});
LinkedHashMap<String, Byte> subscribeTopics = new LinkedHashMap<>();
subscribeTopics.put(topicName, requestQOS);
System.out.println("Subscribing to topic : " + topicName + "\t with request QOS : " + requestQOS);
//Subscribe to the topics
byte[] grantedQOS = mqttClient.subscribe(subscribeTopics);
System.out.println("Granted QOS for topic : " + topicName + "\t is : " + grantedQOS[0]);
while (!unSubscribe)
Thread.sleep(100);
System.out.println("Received 'unSubscribe' message");
//Unsubscribe
mqttClient.unsubscribe(new String[]{topicName});
} catch (Exception e) {
e.printStackTrace();
}
}
private class InitializationException extends Exception {
InitializationException(String message) {
super(message);
}
}
}