IIoT Middleware

Save and read real-time metrics and states

The IIoT Middleware enables dynamic data published from Gateways and Functional Modules to be collected and made available to other interested modules, such as the HDM which is in charge of persisting them.

Currently only the MQTT protocol is supported, so modules need to publish and subscribe to specific topics. MQTT clients are available for different languages, in the following a few examples in Python and Java will be provided. Please refer to the official documentation to learn more about how to use the client in your project.

The topic schema is: HDT/{factory_entity_id}/{measurement/state}/{descriptor_id}.

The message content has to be in JSON format, including a timestamp key together with all the required fields from a measurement or the parameter from a state.

Publish a Metric

In order to publish a metric value to the MQTT IIoT Middleware for making it available to other components, specific topics structure and message format need to be used. Specifically, the topic schema is: HDT/{factory_entity_id}/measurement/{measurement_descriptor_id}.

For publishing a simple metric, such as the HR (Hearth Rate), it is sufficient to send a JSON message containing the value together with the timestamp. Example:

{
  "timestamp": 1753940931000,
  "hr": 91.0  // (The key name, in this case `hr`, must match a key in the fields map of the corresponding `MeasurementDescriptor`.)
}

Note that, if the metric to be saved is composed by multiple fields, then the associated values need to be flattened at the first level of the JSON message, using the exact same key names as in the MeasurementDescriptor fields. Example: consider the case of the ACC (Accelerometer) metric. The fields are X, Y and Z. The resulting message keys will be:

{
  "timestamp": 1753940931000,
  "acc_x": 13.0,
  "acc_y": 276.0,
  "acc_z": -56.0
}

Here is the complete code snippet for properly publishing a metric.

import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

def on_connect(client, userdata, flags, reason_code, properties):
    loguru.logger.debug(f'Connected with result code "{reason_code}"')

# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables

mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1

# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs

measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = f'{{"timestamp": {int(datetime.now().timestamp() * 1000)}, "acc_x": 13.0, "acc_y": 276.0, "acc_z": -56.0}}'

mqtt_client.publish(f'HDT/{worker_id}/measurement/{measurement_descriptor_id}',
    value, quality_of_service)

time.sleep(1)  # wait for the messages to be sent
mqtt_client.disconnect()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

private void publishMetric() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = "unique-id";
    int qos = 1;

    // NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
    String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    
    try {
        MqttClient mqttClient = new MqttClient(broker, "unique-id");
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
    
        // Set callback for connection
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }
    
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // Not used in this case, since we are publishing only
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("Message delivered successfully");
            }
        });
    
        mqttClient.connect(connOpts);
        
        long timestamp = System.currentTimeMillis();
        String value = String.format(
            "{\"timestamp\": %d, \"acc_x\": %.1f, \"acc_y\": %.1f, \"acc_z\": %.1f}",
            timestamp, 13.0, 276.0, -56.0
        );
        mqttClient.publish("HDT/" + workerId + "/measurement/" + measurementDescriptorId,
                new MqttMessage(value.getBytes()));
    
        TimeUnit.SECONDS.sleep(1);
        mqttClient.disconnect();
    
    
    } catch (MqttException | InterruptedException e) {
        e.printStackTrace();
    }
}

Publish a State

In order to publish a state value to the MQTT IIoT Middleware for making it available to other components, specific topics structure and message format need to be used. Specifically, the topic schema is: HDT/{factory_entity_id}/state/{state_descriptor_id}.

Independently of the state content, the only mandatory JSON keys are the timestamp and the name of the output parameter that is getting saved (as specified in the FunctionalModuleOutput outputParamName). Then the whole JSON object value corresponding to the parameter will be serialized and saved as it is.

import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

def on_connect(client, userdata, flags, reason_code, properties):
    loguru.logger.debug(f'Connected with result code "{reason_code}"')

# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables

mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1

# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs

state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = f'{{"timestamp": {int(datetime.now().timestamp() * 1000)}, "fatigue": [5, 10]}'

mqtt_client.publish(f'HDT/{worker_id}/state/{state_descriptor_id}',
    value, quality_of_service)

time.sleep(1)  # wait for the messages to be sent
mqtt_client.disconnect()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

private void publishState() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = "unique-id";
    int qos = 1;

    // NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
    String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";

    try {
        MqttClient mqttClient = new MqttClient(broker, "unique-id");
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());

        // Set callback for connection
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }

            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // Not used in this case, since we are publishing only
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("Message delivered successfully");
            }
        });

        mqttClient.connect(connOpts);

        long timestamp = System.currentTimeMillis();
        String value = String.format(
            "{\"timestamp\": %d, \"fatigue\": [%d, %d]}",
            timestamp, 5, 10
        );
        mqttClient.publish("HDT/" + workerId + "/state/" + stateDescriptorId,
                new MqttMessage(value.getBytes()));

        TimeUnit.SECONDS.sleep(1);
        mqttClient.disconnect();


    } catch (MqttException | InterruptedException e) {
        e.printStackTrace();
    }
}

Read a Metric

In order to read real-time metric values published to the MQTT IIoT Middleware, it is mandatory to subscribe to the topics of interest and to specify a callback for managing the received message.

import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs

def on_connect(client, userdata, flags, reason_code, properties):
    measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    topic = f'HDT/{worker_id}/measurement/{measurement_descriptor_id}'
    client.subscribe(topic, qos=1)

def on_message(client, userdata, msg):
    print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')

# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables

mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;

private void readMetric() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = UUID.randomUUID().toString();

    // NOTE: you need to specify the measurementDescriptorId and workerId UUIDs
    String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    topic = "HDT/"+workerId+"/measurement/"+measurementDescriptorId;

    try {
        MqttClient mqttClient = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
    
        // Set callback for connection and message reception
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }
    
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String receivedMessage = new String(message.getPayload());
                System.out.println("Message received on topic " + topic + ": " + receivedMessage);
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // Not used in this case, since we are subscribing only
            }
        });
    
        mqttClient.connect(connOpts);
        mqttClient.subscribe(topic);
    
    } catch (MqttException e) {
        e.printStackTrace();
    }
}

Read a State

In order to read real-time state values published to the MQTT IIoT Middleware, it is mandatory to subscribe to the topics of interest and to specify a callback for managing the received message.

import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs

def on_connect(client, userdata, flags, reason_code, properties):
    state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    topic = f'HDT/{worker_id}/state/{state_descriptor_id}'
    client.subscribe(topic, qos=1)

def on_message(client, userdata, msg):
    print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')

# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables

mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;

private void readState() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = UUID.randomUUID().toString();

    // NOTE: you need to specify the stateDescriptorId and workerId UUIDs
    String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    topic = "HDT/"+workerId+"/state/"+stateDescriptorId;

    try {
        MqttClient mqttClient = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
    
        // Set callback for connection and message reception
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }
    
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String receivedMessage = new String(message.getPayload());
                System.out.println("Message received on topic " + topic + ": " + receivedMessage);
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // Not used in this case, since we are subscribing only
            }
        });
    
        mqttClient.connect(connOpts);
        mqttClient.subscribe(topic);
    
    } catch (MqttException e) {
        e.printStackTrace();
    }
}