Logo

dev-resources.site

for different kinds of informations.

How to Keep a History of MQTT Data With Rust

Published at
8/9/2024
Categories
rust
tutorial
mqtt
reductstore
Author
atimin
Categories
4 categories in total
rust
open
tutorial
open
mqtt
open
reductstore
open
Author
6 person written this
atimin
open
How to Keep a History of MQTT Data With Rust

The MQTT protocol is an easy way to connect different data sources to applications. This makes it very popular for IoT (Internet of Things) applications. Some MQTT brokers can store messages for a while, even when the MQTT client is offline. However, sometimes you need to keep this data for a longer period of time. In these cases it's a good idea to use a time series database.

There are many time series databases available, but if you need to store a history of images,Ā vibration sensor dataĀ or protobuf messages, you might want to use ReductStore. This database is designed to store a lot of blob data and works well with IoT and edge computing.

ReductStoreĀ has client SDKs (software development kits) for many programming languages. This means you can easily use it in your existing system. For this example, we'll use theĀ Rust SDKĀ from ReductStore.

Let's build a simple MQTT application to see how it all works.

Prerequisites

For this usage example, we have the following requirements:

  • Linux AMD64
  • Docker and Docker Compose
  • Rust >= 1.75

If you're using Ubuntu, you can set up the required dependencies by executing the command below in your terminal:

$ sudo apt-get update
$ sudo apt-get install docker-compose
$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Enter fullscreen mode Exit fullscreen mode

If you are using a different operating system, you can find the installation instructions for Docker Compose onĀ the official website.

Running the MQTT broker and ReductStore using Docker Compose

The easiest way to set up the broker and database is to use Docker Compose with the following 'docker-compose.yml' file:

services:
  reduct-storage:
    image: reduct/store:latest
    volumes:
      - ./data:/data
    ports:
      - "8383:8383"

  mqtt-broker:
    image: eclipse-mosquitto:1.6
    ports:
      - "1883:1883"
Enter fullscreen mode Exit fullscreen mode

Then run the configuration:

docker-compose up
Enter fullscreen mode Exit fullscreen mode

Docker Compose will download the images if they are not available on your device and start the containers. Note that we have released ports 1883 for the MQTT protocol and 8383 forĀ ReductStore HTTP API.

Writing Rust Program

Now let's start coding and get into the details. First, install the necessary dependencies. We'll need a Cargo.toml file with the following dependencies:

[dependencies]
reduct-rs = "1.10"
rumqttc="0.24"
tokio = { version = "1", features = ["rt-multi-thread"] }
futures-util = "0.3"
Enter fullscreen mode Exit fullscreen mode

After installing the required dependencies, we can start programming.

use reduct_rs::ReductClient;
use rumqttc::Event::Incoming;
use rumqttc::Packet::Publish;
use rumqttc::{AsyncClient, MqttOptions, QoS};

#[tokio::main]
async fn main() {
// Connect to ReductStore instance at 8383 port
    let client = ReductClient::builder().url("http://127.0.0.1:8383").build();

// Get ot create a bucket named 'mqtt'
    let bucket = client
        .create_bucket("mqtt")
        .exist_ok(true)
        .send()
        .await
        .unwrap();

// Connect to the mqtt broker
    let mqtt_options = MqttOptions::new("reduct", "127.0.0.1", 1883);
    let (mqtt, mut mqtt_loop) = AsyncClient::new(mqtt_options, 10);
// Subscribe to all topics
    mqtt.subscribe("#", QoS::AtMostOnce).await.unwrap();

// Write received message to the bucket
// by using topic as an entry name
    while let Ok(notification) = mqtt_loop.poll().await {
        if let Incoming(Publish(packet)) = notification {
            let topic = packet.topic;
            let payload = packet.payload;

            bucket
                .write_record(&topic)
                .data(payload.clone())
                .send()
                .await
                .unwrap();
            println!(
                "Received message {} from {} is written to the bucket",
                String::from_utf8_lossy(&payload),
                topic,
            );
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's look at the code in detail. First, we create a client to communicate with the ReductStore instance and create a bucket for the MQTT data:

// Connect to ReductStore instance at 8383 port
    let client = ReductClient::builder().url("http://127.0.0.1:8383").build();

// Get ot create a bucket named 'mqtt'
    let bucket = client
        .create_bucket("mqtt")
        .exist_ok(true)
        .send()
        .await
        .unwrap();
Enter fullscreen mode Exit fullscreen mode

The bucket, called "mqtt", is a storage unit in ReductStore. It contains rules for storing and accessing data. We'll need this name when reading data.

Note theĀ exit_okĀ flag; this allows us to create a bucket the first time the script is run, which can then be used for subsequent runs.

Now we need to fill the bucket with the MQTT data:

// Connect to the mqtt broker
    let mqtt_options = MqttOptions::new("reduct", "127.0.0.1", 1883);
    let (mqtt, mut mqtt_loop) = AsyncClient::new(mqtt_options, 10);
// Subscribe to all topics
    mqtt.subscribe("#", QoS::AtMostOnce).await.unwrap();

// Write received message to the bucket
// by using topic as an entry name
    while let Ok(notification) = mqtt_loop.poll().await {
        if let Incoming(Publish(packet)) = notification {
            ...
        }
    }
Enter fullscreen mode Exit fullscreen mode

This code snippet connects to the MQTT broker and subscribes to all topics using the "#" wildcard. MQTT topics are a way of organising your data streams. A publisher needs to specify a topic name to send data, and a subscriber needs to specify either a specific name or a wildcard to receive the data. Later you'll see that the ReductStore has entries in the bucket, which is also used to organise the data.

Let's see how we store the data in the database:

    let topic = packet.topic;
    let payload = packet.payload;

    bucket
        .write_record(&topic)
        .data(payload.clone())
        .send()
        .await
        .unwrap();
Enter fullscreen mode Exit fullscreen mode

This was quite straightforward. We used the topic name as the entry name and wrote the MQTT payload there.

Publishing Data to MQTT Topic

When the script is started, it remains idle because there's no data coming in from MQTT. To make it work, you need to publish some data. My favourite tool for this isĀ mosquitto_pub. For those using Ubuntu, this tool comes with theĀ mosquitto-clientsĀ package.

$ sudo apt-get install mosquitto-clients
$ mosuitto_pub -t topic-1 -m "Hello, topic-1!"
$ mosuitto_pub -t topic-2 -m "Hello, topic-2!"
Enter fullscreen mode Exit fullscreen mode

Getting Data From ReductStore

You've learned how to retrieve data from MQTT and store it in ReductStore. Now we need a simple Rust script to read this data from the store:

use futures_util::StreamExt;
use reduct_rs::ReductClient;
use std::pin::pin;
#[tokio::main]
async fn main() {
// Connect to ReductStore instance at 8383 port
    let client = ReductClient::builder().url("http://127.0.0.1:8383").build();

// Get bucket named 'mqtt'
    let bucket = client.get_bucket("mqtt").await.unwrap();

// Read all entries from the bucket
    let entries = bucket.entries().await.unwrap();

    for entry in entries {
// Query all records from the entry
        let mut record_stream = bucket.query(&entry.name).send().await.unwrap();

// Retrieve records as a stream
        let mut record_stream = pin!(record_stream);
        while let Some(record) = record_stream.next().await {
            let record = record.unwrap();
            println!(
                "MQTT topic: {}, Record: ts={}, data={}",
                entry.name,
                record.timestamp_us(),
                String::from_utf8_lossy(&record.bytes().await.unwrap()),
            );
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We have already covered how to create a client and acquire a bucket. Now let's move on to reading the data:

    let entries = bucket.entries().await.unwrap();

    for entry in entries {
// Query all records from the entry
        let mut record_stream = bucket.query(&entry.name).send().await.unwrap();

// Retrieve records as a stream
        let mut record_stream = pin!(record_stream);
        while let Some(record) = record_stream.next().await {
            let record = record.unwrap();
            println!(
                "MQTT topic: {}, Record: ts={}, data={}",
                entry.name,
                record.timestamp_us(),
                String::from_utf8_lossy(&record.bytes().await.unwrap()),
            );
        }
    }
Enter fullscreen mode Exit fullscreen mode

As you can see, this is also very simple. We browse all the entries in theĀ mqttĀ bucket, retrieve all the records from each entry, and then print their timestamps and contents. That's it.

Best Practices

The example is a simple one and may not cover all of the complexities that you may be faced with in a real-world application. Here are some tips to help you build a strong and efficient IoT application using ReductStore and MQTT:

  • Create a ReductStore bucket with aĀ FIFO quotaĀ to prevent disk overwrites.
  • Use token authentication to protect your data. You can generate an access token using either theĀ Web ConsoleĀ or theĀ CLI Client.
  • MapĀ MQTT5Ā properties to ReductStore labels. This will make it easier to filter data whenĀ queryingĀ orĀ replicating.
  • UseĀ Reduct CLIĀ for data replication or backup purposes.

Conclusion

The MQTT protocol and ReductStore are easy to use tools that work well together in Rust. They provide a powerful solution for many applications. Regardless of the size of your project, these tools handle data communication and storage effectively.

To help you understand how to use these tools, we've created an example that shows how they work together. You can seeĀ the source code of this exampleĀ on GitHub. This example shows how easy and useful it is to use MQTT and ReductStore together.


I hope this tutorial was helpful. If you have any questions or feedback, don't hesitate to use theĀ ReductStore CommunityĀ forum.

reductstore Article's
30 articles in total
Favicon
ReductStore v1.13.0 Released With New Conditional Query API
Favicon
Keeping MQTT Data History with Node.js
Favicon
ReductStore v1.12.0 released: record deletion API and storage engine optimization
Favicon
3 Ways to Store Computer Vision Data
Favicon
ReductStore v1.11.0: Changing labels and storage engine optimization
Favicon
Getting Started with ReductStore in C++
Favicon
How to Keep a History of MQTT Data With Rust
Favicon
Deploy ReductStore as Azure Virtual Machine
Favicon
Getting Started with ReductStore in Node.js
Favicon
Getting Started with ReductStore in Python
Favicon
ReductStore v1.10.0: downsampling and optimization
Favicon
ReductStore CLI Client now in Rust
Favicon
Time Series Blob Data: ReductStore vs. MongoDB
Favicon
Time Series Blob Data: ReductStore vs. TimescaleDB
Favicon
ReductStore v1.9.0 Released
Favicon
How to Keep a History of MQTT Data With Python
Favicon
Performance comparison: ReductStore Vs. Minio
Favicon
ReductStore v1.8.0 Has Been Released with Data Replication
Favicon
ReductStore v1.7.0 has been released with provisioning and batch writing
Favicon
ReductStore 1.6.0 has been released with new license and client SDK for Rust
Favicon
ReductStore v1.5.0 has been released
Favicon
ReductStore v1.4.0 in Rust has been released
Favicon
Subscribing new records with Reduct C++ SDK
Favicon
6 weeks with Rust
Favicon
Data Reduction and Why It Is Important For Edge Computing
Favicon
We Are Moving to Rust
Favicon
CLI Client for ReductStore v0.8.0 has been released
Favicon
How to Use "Cats" dataset with Python ReductStore SDK
Favicon
Streamline your edge computing workflows with ReductStore, now available on Snap
Favicon
ReductStore Client SDK for C++ v1.3.0 with Labels Support

Featured ones: