IIoT Middleware
7 minute read
Save and read real-time metrics and states
The IIoT Middleware enables dynamic data published from Gateways and Functional Modules to be collected and made
available to other interested modules, such as the HDM which is in charge of persisting them.
Currently only the MQTT protocol is supported, so modules need to publish and subscribe to specific topics. MQTT clients
are available for different languages, in the following a few examples in Python and Java will be provided.
Please refer to the official documentation to learn more about how to use the client in your project.
The topic schema is HDT/{factory_entity_id}/{measurement/state}/{descriptor_id}/{value}.
The message schema is {timestamp}#{value}.
Publish a Metric
In order to publish a metric value to the IIoT Middleware for making it available to other components, specific topics structure and message format need to be used.
Note that, if the metric to be saved is composed by multiple fields, then the associated values need to be separated by a pipe "|".
For example: "1752760778790#1|2|3.
import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
def on_connect(client, userdata, flags, reason_code, properties):
loguru.logger.debug(f'Connected with result code "{reason_code}"')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1
# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = 15.0
mqtt_client.publish(f'HDT/{worker_id}/measurement/{measurement_descriptor_id}',
f'{int(datetime.now().timestamp() * 1000)}#{value}', quality_of_service)
time.sleep(1) # wait for the messages to be sent
mqtt_client.disconnect()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
private void publishMetric() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = "unique-id";
int qos = 1;
// NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
double value = 15.0;
try {
MqttClient mqttClient = new MqttClient(broker, "unique-id");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// Not used in this case, since we are publishing only
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered successfully");
}
});
mqttClient.connect(connOpts);
long timestamp = System.currentTimeMillis();
String message = timestamp + "#" + value;
mqttClient.publish("HDT/" + workerId + "/measurement/" + measurementDescriptorId,
new MqttMessage(message.getBytes()));
TimeUnit.SECONDS.sleep(1);
mqttClient.disconnect();
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}Publish a State
In order to publish a state value to the IIoT Middleware for making it available to other components, specific topics structure and message format need to be used.
import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
def on_connect(client, userdata, flags, reason_code, properties):
loguru.logger.debug(f'Connected with result code "{reason_code}"')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1
# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = {"fatigue": [5, 10]}
mqtt_client.publish(f'HDT/{worker_id}/state/{state_descriptor_id}',
f'{int(datetime.now().timestamp() * 1000)}#{value}', quality_of_service)
time.sleep(1) # wait for the messages to be sent
mqtt_client.disconnect()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
private void publishState() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = "unique-id";
int qos = 1;
// NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String value = "{\"fatigue\": [5, 10]}";
try {
MqttClient mqttClient = new MqttClient(broker, "unique-id");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// Not used in this case, since we are publishing only
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered successfully");
}
});
mqttClient.connect(connOpts);
long timestamp = System.currentTimeMillis();
String message = timestamp + "#" + value;
mqttClient.publish("HDT/" + workerId + "/state/" + stateDescriptorId,
new MqttMessage(message.getBytes()));
TimeUnit.SECONDS.sleep(1);
mqttClient.disconnect();
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}Read a Metric
In order to read real-time metric values published to the IIoT Middleware, it is mandatory to subscribe to the topics of interest and to specify a callback for managing the received message.
import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
def on_connect(client, userdata, flags, reason_code, properties):
measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
topic = f'HDT/{worker_id}/measurement/{measurement_descriptor_id}'
client.subscribe(topic, qos=1)
def on_message(client, userdata, msg):
print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
private void readMetric() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = UUID.randomUUID().toString();
// NOTE: you need to specify the measurementDescriptorId and workerId UUIDs
String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
topic = "HDT/"+workerId+"/measurement/"+measurementDescriptorId;
try {
MqttClient mqttClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection and message reception
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String receivedMessage = new String(message.getPayload());
System.out.println("Message received on topic " + topic + ": " + receivedMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used in this case, since we are subscribing only
}
});
mqttClient.connect(connOpts);
mqttClient.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}Read a State
In order to read real-time state values published to the IIoT Middleware, it is mandatory to subscribe to the topics of interest and to specify a callback for managing the received message.
import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
def on_connect(client, userdata, flags, reason_code, properties):
state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
topic = f'HDT/{worker_id}/state/{state_descriptor_id}'
client.subscribe(topic, qos=1)
def on_message(client, userdata, msg):
print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
private void readState() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = UUID.randomUUID().toString();
// NOTE: you need to specify the stateDescriptorId and workerId UUIDs
String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
topic = "HDT/"+workerId+"/state/"+stateDescriptorId;
try {
MqttClient mqttClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection and message reception
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String receivedMessage = new String(message.getPayload());
System.out.println("Message received on topic " + topic + ": " + receivedMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used in this case, since we are subscribing only
}
});
mqttClient.connect(connOpts);
mqttClient.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}