IIoT Middleware

Save and read real-time metrics and states

The IIoT Middleware enables dynamic data published from Gateways and Functional Modules to be collected and made available to other interested modules, such as the HDM which is in charge of persisting them.

Currently only the MQTT protocol is supported, so modules need to publish and subscribe to specific topics. MQTT clients are available for different languages, in the following a few examples in Python and Java will be provided. Please refer to the official documentation to learn more about how to use the client in your project.

The topic schema is HDT/{factory_entity_id}/{measurement/state}/{descriptor_id}/{value}.

The message schema is {timestamp}#{value}.

Publish a Metric

In order to publish a metric value to the IIoT Middleware for making it available to other components, specific topics structure and message format need to be used.

Note that, if the metric to be saved is composed by multiple fields, then the associated values need to be separated by a pipe "|". For example: "1752760778790#1|2|3.

import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

def on_connect(client, userdata, flags, reason_code, properties):
    loguru.logger.debug(f'Connected with result code "{reason_code}"')


# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1

# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = 15.0

mqtt_client.publish(f'HDT/{worker_id}/measurement/{measurement_descriptor_id}',
                    f'{int(datetime.now().timestamp() * 1000)}#{value}', quality_of_service)

time.sleep(1)  # wait for the messages to be sent
mqtt_client.disconnect()
                           
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

private void publishMetric() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = "unique-id";
    int qos = 1;
    
    // NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
    String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    double value = 15.0;

    try {
        MqttClient mqttClient = new MqttClient(broker, "unique-id");
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());

        // Set callback for connection
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }

            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // Not used in this case, since we are publishing only
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("Message delivered successfully");
            }
        });

        mqttClient.connect(connOpts);
        
        long timestamp = System.currentTimeMillis();
        String message = timestamp + "#" + value;
        mqttClient.publish("HDT/" + workerId + "/measurement/" + measurementDescriptorId,
                new MqttMessage(message.getBytes()));

        TimeUnit.SECONDS.sleep(1);
        mqttClient.disconnect();


    } catch (MqttException | InterruptedException e) {
        e.printStackTrace();
    }
}

Publish a State

In order to publish a state value to the IIoT Middleware for making it available to other components, specific topics structure and message format need to be used.

import os
from datetime import datetime
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

def on_connect(client, userdata, flags, reason_code, properties):
    loguru.logger.debug(f'Connected with result code "{reason_code}"')


# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
quality_of_service = 1

# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
value = {"fatigue": [5, 10]}

mqtt_client.publish(f'HDT/{worker_id}/state/{state_descriptor_id}',
                    f'{int(datetime.now().timestamp() * 1000)}#{value}', quality_of_service)

time.sleep(1)  # wait for the messages to be sent
mqtt_client.disconnect()

                               
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

private void publishState() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = "unique-id";
    int qos = 1;

    // NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
    String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String value = "{\"fatigue\": [5, 10]}";

    try {
        MqttClient mqttClient = new MqttClient(broker, "unique-id");
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());

        // Set callback for connection
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }

            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // Not used in this case, since we are publishing only
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("Message delivered successfully");
            }
        });

        mqttClient.connect(connOpts);

        long timestamp = System.currentTimeMillis();
        String message = timestamp + "#" + value;
        mqttClient.publish("HDT/" + workerId + "/state/" + stateDescriptorId,
                new MqttMessage(message.getBytes()));

        TimeUnit.SECONDS.sleep(1);
        mqttClient.disconnect();


    } catch (MqttException | InterruptedException e) {
        e.printStackTrace();
    }
}

Read a Metric

In order to read real-time metric values published to the IIoT Middleware, it is mandatory to subscribe to the topics of interest and to specify a callback for managing the received message.

import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

# NOTE: you need to specify the measurement_descriptor_id and worker_id UUIDs
def on_connect(client, userdata, flags, reason_code, properties):
    measurement_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    topic = f'HDT/{worker_id}/measurement/{measurement_descriptor_id}'
    client.subscribe(topic, qos=1)

def on_message(client, userdata, msg):
    print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')

# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;

private void readMetric() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = UUID.randomUUID().toString();
    
    // NOTE: you need to specify the measurementDescriptorId and workerId UUIDs
    String measurementDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    topic = "HDT/"+workerId+"/measurement/"+measurementDescriptorId;

    try {
        MqttClient mqttClient = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
    
        // Set callback for connection and message reception
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }
    
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String receivedMessage = new String(message.getPayload());
                System.out.println("Message received on topic " + topic + ": " + receivedMessage);
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // Not used in this case, since we are subscribing only
            }
        });
    
        mqttClient.connect(connOpts);
        mqttClient.subscribe(topic);
    
    } catch (MqttException e) {
        e.printStackTrace();
    }
}

Read a State

In order to read real-time state values published to the IIoT Middleware, it is mandatory to subscribe to the topics of interest and to specify a callback for managing the received message.

import os
import time
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

# NOTE: you need to specify the state_descriptor_id and worker_id UUIDs
def on_connect(client, userdata, flags, reason_code, properties):
state_descriptor_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    worker_id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
    topic = f'HDT/{worker_id}/state/{state_descriptor_id}'
    client.subscribe(topic, qos=1)

def on_message(client, userdata, msg):
    print(f'Received message: {msg.payload.decode()} on topic {msg.topic}')

# NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
mqtt_client = Client(client_id='unique-id', callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(os.getenv("MQTT_USER"), os.getenv("MQTT_PASSWORD"))
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

mqtt_client.connect(os.getenv('MQTT_HOST'), int(os.getenv("MQTT_PORT")))
mqtt_client.loop_start()
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import java.util.UUID;

private void readState() {
    // NOTE: you need to specify the MQTT_HOST, MQTT_PORT, MQTT_USER, and MQTT_PASSWORD environment variables
    String broker = "tcp://" + System.getenv("MQTT_HOST") + ":" + System.getenv("MQTT_PORT");
    String clientId = UUID.randomUUID().toString();

    // NOTE: you need to specify the stateDescriptorId and workerId UUIDs
    String stateDescriptorId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    String workerId = "3fa85f64-5717-4562-b3fc-2c963f66afa6";
    topic = "HDT/"+workerId+"/state/"+stateDescriptorId;

    try {
        MqttClient mqttClient = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(System.getenv("MQTT_USER"));
        connOpts.setPassword(System.getenv("MQTT_PASSWORD").toCharArray());
    
        // Set callback for connection and message reception
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("Connected to " + serverURI);
            }
    
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost: " + cause.getMessage());
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String receivedMessage = new String(message.getPayload());
                System.out.println("Message received on topic " + topic + ": " + receivedMessage);
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // Not used in this case, since we are subscribing only
            }
        });
    
        mqttClient.connect(connOpts);
        mqttClient.subscribe(topic);
    
    } catch (MqttException e) {
        e.printStackTrace();
    }
}