dev-resources.site
for different kinds of informations.
Keeping MQTT Data History with Node.js
The MQTT protocol is widely used in IoT applications because of its simplicity and ability to connect different data sources to applications using a publish/subscribe model. While many MQTT brokers support persistent sessions and can store message history while an MQTT client is unavailable, there may be cases where data needs to be stored for a longer period of time. In such cases it is recommended to use a time series database. There are many options available, but if you need to store unstructured data such as images, sensor data or Protobuf messages, you shouldĀ consider using ReductStore as a MQTT database. It is a time series database specifically designed to store large amounts of unstructured data, optimised for IoT and edge computing.
ReductStore provides client SDKs for many programming languages to integrate it into your infrastructure. For this example, we will use theĀ JavaScript client SDK.
Let's build a simple application to understand how to keep a history of MQTT messages using ReductStore and Node.js.
Prerequisites
For this example we have the following requirements:
- Linux AMD64/ARM64/ARMv7
- Docker and Docker Compose
- NodeJS >= 18
If you are an Ubuntu user, you can install the dependencies by running the following command in your terminal:
$ sudo apt-get update
$ sudo apt-get install docker-compose nodejs
Run MQTT Broker and ReductStore with Docker Compose
The easiest way to run the broker and database is to use Docker Compose with the followingĀ docker-compose.yml
Ā file:
services:
reduct-storage:
image: reduct/store:latest
volumes:
- ./data:/data
ports:
- "8383:8383"
mqtt-broker:
image: eclipse-mosquitto:1.6
ports:
- "1883:1883"
Then run the configuration:
docker-compose up
Docker Compose will download the images if they are not available on your device and launch the containers.
Now we have published ports 1883 for the MQTT protocol and 8383 for the ReductStore HTTP API.
Writing Node.js MQTT Subscriber
Now it's time to dive into the code and start working. First, let's initialize the NPM package and install the necessary dependencies. We'll need theĀ async-mqttĀ and theĀ JavaScript Client SDK.
$ npm init
$ npm install --save reduct-js async-mqtt
Once we have all the dependencies installed, we can write the script:
const MQTT = require("async-mqtt");
const { Client } = require("reduct-js");
MQTT.connectAsync("tcp://localhost:1883")
.then(async (mqttClient) => {
await mqttClient.subscribe("#");
const reductClient = new Client("http://localhost:8383");
const bucket = await reductClient.getOrCreateBucket("mqtt");
mqttClient.on("message", async (topic, msg) => {
const record = await bucket.beginWrite(topic);
await record.write(msg);
console.log(
'Received message "%s" from topic "%s" is written',
msg,
topic,
);
});
})
.catch((error) => console.error(error));
Let's examine the code in detail. First we need to connect to the MQTT broker and subscribe to all topics using theĀ #
Ā wildcard.
MQTT.connectAsync("tcp://localhost:1883")
.then(async (mqttClient) => {
await mqttClient.subscribe("#");
// rest of code
})
.catch((error) => console.error(error));
If the MQTT connection is successful, we can start working with ReductStore. To start writing data to it, we need a bucket. We create a bucket namedĀ mqtt
Ā or get an existing one:
const reductClient = new Client("http://localhost:8383");
const bucket = await reductClient.getOrCreateBucket("mqtt");
The last step is to write the received message to the database. To do this, we need to use a callback on the message
event to capture the message. Then we can write the message to the record corresponding to the topic name.
mqttClient.on("message", async (topic, msg) => {
const record = await bucket.beginWrite(topic);
await record.write(msg);
console.log('Received message "%s" from topic "%s" is written', msg, topic);
});
When we callĀ bucket.beginWrite
, we create an entry in the bucket if it doesn't already exist. Then we write data to the entry with the current timestamp. Now our MQTT data is safe and sound in the storage and we can access it using the sameĀ SDK.
Publishing Data to MQTT Topicsā
When you run the script, it does nothing because there is no data from MQTT. You need to publish something. I prefer to use mosquitto_pub*. For Ubuntu users it is part of theĀ mosquitto-clients
Ā package:
$ sudo apt-get install mosquitto-clients
$ mosuitto_pub -t topic-1 -m "Hello, topic-1!"
$ mosuitto_pub -t topic-2 -m "Hello, topic-2!"
Getting MQTT Data From ReductStoreā
Now you know how to get data from MQTT and write it to ReductStore, but we need a little NodejS script to read the data from the storage:
const { Client } = require("reduct-js");
const client = new Client("http://localhost:8383");
client
.getBucket("mqtt")
.then(async (bucket) => {
for (const entry of await bucket.getEntryList()) {
for await (const record of bucket.query(entry.name)) {
data = await record.read();
console.log('Found record "%s" with timestamp "%d"', data, record.time);
}
}
})
.catch((error) => console.error(error));
Here we browse all the entries in theĀ mqtt
Ā bucket and query all the records in each entry. Reading the most recent record in the entry is very simple.
const record = await bucket.beginRead(entry.name);
const data = await record.readAsString();
To retrieve all records or records within a given time interval, we can use theĀ query
Ā method. This method returns an asynchronous iterator, so we can use aĀ for await
Ā loop to iterate over the records.
for await (const record of bucket.query(entry.name)) {
data = await record.read();
console.log('Found record "%s" with timestamp "%d"', data, record.time);
}
Best Practicesā
The example we've given you is pretty simple and it might not cover all the complexities you'll face in a real-world application. Here are some tips to help you build a strong and efficient IoT application using ReductStore and MQTT:
- Create a ReductStore bucket with aĀ FIFOĀ quota to prevent disk overwrite.
- Use token authentication to protect your data. You can generate an access token using either theĀ Web ConsoleĀ or theĀ CLI Client.
- MapĀ MQTT5Ā properties to ReductStore labels. This will make it easier to filter data whenĀ queryingĀ orĀ replicating.
Conclusionā
As you can see, the MQTT protocol and ReductStore are both incredibly user-friendly technologies that can be easily integrated with each other in NodeJS. This combination provides a powerful solution for various applications. Whether you're working on a small project or a large-scale system, the MQTT protocol and ReductStore provide a reliable and efficient way to handle data communication and storage.
To help you better understand how to use these technologies, we've prepared an example that demonstrates their seamless integration. You can accessĀ the source codeĀ of the example on GitHub. This example shows how easy and effective it is to use MQTT and ReductStore together.
I hope this tutorial has been helpful. If you have any questions or feedback, donāt hesitate to use theĀ ReductStore CommunityĀ forum.
Featured ones: