IIoT Middleware
8 minute read
Save and read real-time metrics and states
The IIoT Middleware enables dynamic data published from Gateways and Functional Modules to be collected and made
available to other interested modules, such as the HDM which is in charge of persisting them.
Currently only the MQTT protocol is supported, so modules need to publish and subscribe to specific topics. MQTT
clients
are available for different languages, in the following a few examples in Python and Java will be provided.
Please refer to the official documentation to learn more about how to use the client in your project.
The topic schema is: HDT/{factory_entity_id}/{measurement/state}/{descriptor_id}.
The message content has to be in JSON format, including a timestamp key together with all the required fields from a measurement or the parameter from a state.
Publish a Metric
In order to publish a metric value to the MQTT IIoT Middleware for making it available to other components, specific
topics
structure and message format need to be used. Specifically, the topic schema is:
HDT/{factory_entity_id}/measurement/{measurement_descriptor_id}.
For publishing a simple metric, such as the HR (Hearth Rate), it is sufficient to send a JSON message containing the value together with the timestamp. Example:
{
"timestamp": 1753940931000,
"hr": 91.0 // (The key name, in this case `hr`, must match a key in the fields map of the corresponding `MeasurementDescriptor`.)
}
MeasurementDescriptor fields map (except for timestamp which is always present).Note that, if the metric to be saved is composed by multiple fields, then the associated values need to be flattened at
the first level of the JSON message, using the exact same key names as in the MeasurementDescriptor fields.
Example: consider the case of the ACC (Accelerometer) metric. The fields are X, Y and Z. The resulting message
keys will be:
{
"timestamp": 1753940931000,
"acc_x": 13.0,
"acc_y": 276.0,
"acc_z": -56.0
}
Here is the complete code snippet for properly publishing a metric.
import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
def on_connect(client, userdata, flags, reason_code, properties):
loguru.logger.debug(f'Connected with result code "{reason_code}"')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1
# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = f'{{"timestamp": {int(datetime.now().timestamp() * 1000)}, "acc_x": 13.0, "acc_y": 276.0, "acc_z": -56.0}}'
mqtt_client.publish(f'HDT/{worker_id}/measurement/{measurement_descriptor_id}',
value, quality_of_service)
time.sleep(1) # wait for the messages to be sent
mqtt_client.disconnect()import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
private void publishMetric() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = "unique-id";
int qos = 1;
// NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
try {
MqttClient mqttClient = new MqttClient(broker, "unique-id");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// Not used in this case, since we are publishing only
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered successfully");
}
});
mqttClient.connect(connOpts);
long timestamp = System.currentTimeMillis();
String value = String.format(
"{\"timestamp\": %d, \"acc_x\": %.1f, \"acc_y\": %.1f, \"acc_z\": %.1f}",
timestamp, 13.0, 276.0, -56.0
);
mqttClient.publish("HDT/" + workerId + "/measurement/" + measurementDescriptorId,
new MqttMessage(value.getBytes()));
TimeUnit.SECONDS.sleep(1);
mqttClient.disconnect();
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}Publish a State
In order to publish a state value to the MQTT IIoT Middleware for making it available to other components, specific
topics
structure and message format need to be used. Specifically, the topic schema is:
HDT/{factory_entity_id}/state/{state_descriptor_id}.
Independently of the state content, the only mandatory JSON keys are the timestamp and the name of the output parameter
that is getting saved (as specified in the FunctionalModuleOutput outputParamName). Then the whole JSON object value
corresponding to the parameter will be serialized and saved as it is.
import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
def on_connect(client, userdata, flags, reason_code, properties):
loguru.logger.debug(f'Connected with result code "{reason_code}"')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1
# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = f'{{"timestamp": {int(datetime.now().timestamp() * 1000)}, "fatigue": [5, 10]}'
mqtt_client.publish(f'HDT/{worker_id}/state/{state_descriptor_id}',
value, quality_of_service)
time.sleep(1) # wait for the messages to be sent
mqtt_client.disconnect()import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
private void publishState() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = "unique-id";
int qos = 1;
// NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
try {
MqttClient mqttClient = new MqttClient(broker, "unique-id");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// Not used in this case, since we are publishing only
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered successfully");
}
});
mqttClient.connect(connOpts);
long timestamp = System.currentTimeMillis();
String value = String.format(
"{\"timestamp\": %d, \"fatigue\": [%d, %d]}",
timestamp, 5, 10
);
mqttClient.publish("HDT/" + workerId + "/state/" + stateDescriptorId,
new MqttMessage(value.getBytes()));
TimeUnit.SECONDS.sleep(1);
mqttClient.disconnect();
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}Read a Metric
In order to read real-time metric values published to the MQTT IIoT Middleware, it is mandatory to subscribe to the
topics of
interest and to specify a callback for managing the received message.
import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
def on_connect(client, userdata, flags, reason_code, properties):
measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
topic = f'HDT/{worker_id}/measurement/{measurement_descriptor_id}'
client.subscribe(topic, qos=1)
def on_message(client, userdata, msg):
print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
private void readMetric() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = UUID.randomUUID().toString();
// NOTE: you need to specify the measurementDescriptorId and workerId UUIDs
String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
topic = "HDT/"+workerId+"/measurement/"+measurementDescriptorId;
try {
MqttClient mqttClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection and message reception
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String receivedMessage = new String(message.getPayload());
System.out.println("Message received on topic " + topic + ": " + receivedMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used in this case, since we are subscribing only
}
});
mqttClient.connect(connOpts);
mqttClient.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}Read a State
In order to read real-time state values published to the MQTT IIoT Middleware, it is mandatory to subscribe to the
topics of
interest and to specify a callback for managing the received message.
import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion
# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
def on_connect(client, userdata, flags, reason_code, properties):
state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
topic = f'HDT/{worker_id}/state/{state_descriptor_id}'
client.subscribe(topic, qos=1)
def on_message(client, userdata, msg):
print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')
# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
private void readState() {
// NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
String clientId = UUID.randomUUID().toString();
// NOTE: you need to specify the stateDescriptorId and workerId UUIDs
String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
topic = "HDT/"+workerId+"/state/"+stateDescriptorId;
try {
MqttClient mqttClient = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(System.getenv("MQTT_USER"));
connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
// Set callback for connection and message reception
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to " + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String receivedMessage = new String(message.getPayload());
System.out.println("Message received on topic " + topic + ": " + receivedMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used in this case, since we are subscribing only
}
});
mqttClient.connect(connOpts);
mqttClient.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}