Logo

dev-resources.site

for different kinds of informations.

Subscribing new records with Reduct C++ SDK

Published at
6/4/2023
Categories
tutorial
cpp
database
reductstore
Author
atimin
Categories
4 categories in total
tutorial
open
cpp
open
database
open
reductstore
open
Author
6 person written this
atimin
open
Subscribing new records with Reduct C++ SDK

This article provides an introduction to ReductStore and explains how to use the Reduct C++ SDK to subscribe to data from the database.

Prerequisites

To subscribe to new records, we should use a continuous query, which has been supported by ReductStore since version v1.4. We can use the following Docker command to run it:

docker pull reduct/store:latest
docker run -p 8383:8383 reduct/store:latest 

Enter fullscreen mode Exit fullscreen mode

Now, we need to install the Reduct Client SDK for C++. Please refer to these instructions.

Full Example

Now, take a look at the code of the example.

#include <reduct/client.h>

#include <iostream>
#include <thread>

using reduct::IBucket;
using reduct::IClient;

int main() {
  auto writer = std::thread([]() {
    auto client = IClient::Build("http://127.0.0.1:8383");

    auto [bucket, err] = client->GetOrCreateBucket("bucket");
    if (err) {
      std::cerr << "Error: " << err;
      return;
    }

    for (int i = 0; i < 10; ++i) {
      const IBucket::WriteOptions opts{
          .timestamp = IBucket::Time::clock::now(),
          .labels = {{"good", i % 2 == 0 ? "true" : "false"}},
      };

      const auto msg = "Hey " + std::to_string(i);
      [[maybe_unused]] auto write_err = bucket->Write("entry-1", opts, [msg](auto rec) { rec->WriteAll(msg); });
      std::cout << "Write: " << msg << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(1));
    }
  });

  // Subscribe to good messages
  int good_count = 0;
  auto client = IClient::Build("http://127.0.0.1:8383");
  auto [bucket, err] = client->GetOrCreateBucket("bucket");
  if (err) {
    std::cerr << "Error: " << err;
    return -1;
  }

  const auto opts = IBucket::QueryOptions{
      .include = {{"good", "true"}},
      .continuous = true,
      .poll_interval = std::chrono::milliseconds{100},
  };

  // Continuously read messages until we get 3 good ones
  auto query_err =
      bucket->Query("entry-1", IBucket::Time::clock::now(), std::nullopt, opts, [&good_count](auto &&record) {
        auto [msg, read_err] = record.ReadAll();
        if (read_err) {
          std::cerr << "Error: " << read_err;
          return false;
        }
        std::cout << "Read: " << msg << std::endl;
        return ++good_count != 3;
      });

  writer.join();

  if (query_err) {
    std::cerr << "Query error:" << query_err;
    return -1;
  }
}
Enter fullscreen mode Exit fullscreen mode

To build use this CMakeLists.txt:

cmake_minimum_required(VERSION 3.18)

project(ReductCppExamples)
set(CMAKE_CXX_STANDARD 20)

find_package(ZLIB)
find_package(OpenSSL)

find_package(ReductCpp 1.4.0)

add_executable(subscription subscription.cc)
target_link_libraries(subscription ${REDUCT_CPP_LIBRARIES} ${ZLIB_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto)
Enter fullscreen mode Exit fullscreen mode

The example code demonstrates how to use the C++ Reduct SDK to subscribe to new records from a bucket. The program writes 10 records to a bucket, reads records with the label "good" set to "true" or "false", and continuously reads records until it has read 3 records with this label set to "true".

Let's consider the example in detail.

Write New Records with Labels

To communicate with a ReductStore instance, first create a client.

auto client = IClient::Build("http://127.0.0.1:8383");

Enter fullscreen mode Exit fullscreen mode

In this example, we run the database locally with default settings, but we may need to have an API token for authorization.

Like many other blob storages, ReductStore keeps data in buckets for granular access control and quotas. For read/write operations, we have to get a bucket or create one:

 auto [bucket, err] = client->GetOrCreateBucket("bucket");
  if (err) {
    std::cerr << "Error: " << err;
    return;
  }
Enter fullscreen mode Exit fullscreen mode

When creating a bucket, you have the option to provide additional settings. One particularly useful setting is the FIFO quota, which automatically removes old data when the bucket size reaches a certain limit. This feature is especially beneficial for edge devices, as it helps prevent the device from running out of disk space.

Buckets contain entries, you can understand them as topics or folders. ReductStore doesn’t provide tree of entries and they must have unique names. Let’s write a record to entry-1:

const IBucket::WriteOptions opts{
    .timestamp = IBucket::Time::clock::now(),
    .labels = {{"good", i % 2 == 0 ? "true" : "false"}},
};

const auto msg = "Hey " + std::to_string(i);
auto write_err = bucket->Write("entry-1", opts, [msg](auto rec) { rec->WriteAll(msg); });

Enter fullscreen mode Exit fullscreen mode

ReductStore is a time-series database that stores data as a blob. Each blob is a record that must have a timestamp. However, you can attach additional information to the record, such as labels, and use them for annotation or data filtering. In this case, we assign the label goog with the values true or false.

In this example, we send short text messages for demonstration purposes only. ReductStore is better suited for handling larger data blobs, such as images, sound, or binary data like Protobuf messages.

Continuous Query

After we ran writing labels in a separated thread we can query only good data and wait for first 3:

const auto opts = IBucket::QueryOptions{
    .include = {{"good", "true"}},
    .continuous = true,
    .poll_interval = std::chrono::milliseconds{100},
};

auto query_err =
    bucket->Query("entry-1", IBucket::Time::clock::now(), std::nullopt, opts, [&good_count](auto &&record) {
      auto [msg, read_err] = record.ReadAll();
      if (read_err) {
        std::cerr << "Error: " << read_err;
        return false;
      }
      std::cout << "Read: " << msg << std::endl;
      return ++good_count != 3;
    });
Enter fullscreen mode Exit fullscreen mode

This is a simple example. We use the flag continuous to indicate that we will wait for new records and poll them every 100 ms.

In ReductStore, queries work as iterators. It doesn't matter how many records are stored; we only ask for the next one. When we use a continuous query, we are always asking for a new record, even if we didn't receive a previous one. We use the pool_interval option to specify how often we ask for a new record.

How could it be useful?

ReductStore is an open source database focused on edge computing and AI/ML applications. Its continuous queries work as a publish-subscription communication model, similar to MQTT. You can use the database as a message broker or easily integrate it with your warehouse by creating a program that subscribes to new records and writes them or only labels them to another database.

You can use this feature in a Python or JavaScript application by using the Python and JavaScript client SDKs.

I hope you find this release useful. If you have any questions or feedback, don’t hesitate to reach out in Discord or by opening a discussion on GitHub.

Thanks for using ReductStore!

reductstore Article's
30 articles in total
Favicon
ReductStore v1.13.0 Released With New Conditional Query API
Favicon
Keeping MQTT Data History with Node.js
Favicon
ReductStore v1.12.0 released: record deletion API and storage engine optimization
Favicon
3 Ways to Store Computer Vision Data
Favicon
ReductStore v1.11.0: Changing labels and storage engine optimization
Favicon
Getting Started with ReductStore in C++
Favicon
How to Keep a History of MQTT Data With Rust
Favicon
Deploy ReductStore as Azure Virtual Machine
Favicon
Getting Started with ReductStore in Node.js
Favicon
Getting Started with ReductStore in Python
Favicon
ReductStore v1.10.0: downsampling and optimization
Favicon
ReductStore CLI Client now in Rust
Favicon
Time Series Blob Data: ReductStore vs. MongoDB
Favicon
Time Series Blob Data: ReductStore vs. TimescaleDB
Favicon
ReductStore v1.9.0 Released
Favicon
How to Keep a History of MQTT Data With Python
Favicon
Performance comparison: ReductStore Vs. Minio
Favicon
ReductStore v1.8.0 Has Been Released with Data Replication
Favicon
ReductStore v1.7.0 has been released with provisioning and batch writing
Favicon
ReductStore 1.6.0 has been released with new license and client SDK for Rust
Favicon
ReductStore v1.5.0 has been released
Favicon
ReductStore v1.4.0 in Rust has been released
Favicon
Subscribing new records with Reduct C++ SDK
Favicon
6 weeks with Rust
Favicon
Data Reduction and Why It Is Important For Edge Computing
Favicon
We Are Moving to Rust
Favicon
CLI Client for ReductStore v0.8.0 has been released
Favicon
How to Use "Cats" dataset with Python ReductStore SDK
Favicon
Streamline your edge computing workflows with ReductStore, now available on Snap
Favicon
ReductStore Client SDK for C++ v1.3.0 with Labels Support

Featured ones: