Logo

dev-resources.site

for different kinds of informations.

Multiple Regions, Single Pane of Glass

Published at
6/24/2024
Categories
streaming
apachekafka
datastreaming
dataengineering
Author
warpstream
Author
10 person written this
warpstream
open
Multiple Regions, Single Pane of Glass

by Emmanuel Pot

Multiple Regions, Single Pane of Glass

A common problem when building infrastructure-as-a-service products is the need to provide highly available and isolated resources in many different regions while also having the overall product present as a “single pane of glass” to end-users. Unfortunately, these two requirements stand in direct opposition to each other. Ideally, regional infrastructure is, well, regional, with zero inter-regional dependencies. On the other hand, users really don’t want to have to sign into multiple accounts/websites to manage infrastructure spread across many different regions.

When we first designed how we would expand WarpStream’s cloud control planes from a single region to many, we searched around for good content on the topic and didn’t find much. Many different infrastructure companies have solved this problem, but very few have blogged about it, so we decided to write about our approach and, perhaps more importantly, some of the approaches we didn’t take.

Let’s start by briefly reviewing WarpStream’s architecture by tracing the flow of a single request through the system. An operation usually begins with a customer’s Kafka client issuing a Kafka protocol message to the Agents, say a Metadata request. Since Kafka Metadata requests don’t interact with raw topic data like Produce and Fetch do, they can be handled solely by the WarpStream control plane. So when the WarpStream Agents receive a Kafka Metadata request, they just proxy it directly to the control plane.

WarpStream Agents deployed in a customer cloud account, sending metadata requests to WarpStream’s Metadata Store.

The request will hit a load balancer and then one of WarpStream’s “Gateway” nodes. The Gateway node’s job is to perform light authentication and authorization (basically, verify the request’s API key and map it to the correct customer / virtual cluster), and then forward the request to the Metadata Store for this customer’s cluster.

Based on this, it’s already clear that WarpStream’s control plane has to deal with two very different types of data:

  1. Platform data : everything that users can control from our web console and APIs: users, clusters, API keys, SASL credentials, etc. This data is persisted in a primary Aurora database that runs in us-east-1 and changes very infrequently.
  2. Cluster metadata : all the metadata that enables WarpStream to present the abstraction of Kafka on top of a low-level primitive like commodity object storage. For example, the Metadata Store keeps track of all the topic-partitions (and offsets) that are contained within every file stored in the user’s object storage bucket.

These two different types of data have very different requirements. The cluster metadata is in the critical path of every Kafka operation (both writes and reads), and therefore must be strongly consistent, extremely durable, highly available, and have low latency. As a result, we run every instance of the Metadata Store in a single region, whichever region is closest to the user’s WarpStream Agents. We also run each instance of the Metadata Store quadruply replicated across three availability zones, and we never replicate this metadata across multiple regions (for now).

The requirements for the platform data, on the other hand, look completely different. This data changes infrequently, and the data being slightly stale is of no consequence (eventual consistency is ok). While platform data like API keys are technically required in the critical path, since they’re trivially cacheable for arbitrarily long periods of time, they’re not really in the critical path. Also, unlike the cluster metadata, some of the platform data needs to be available in multiple regions for the service to function as a single pane of glass.

When we were evaluating how to add support for additional regions to WarpStream, there wasn’t much to think about for the virtual cluster Metadata Stores. We would just run dedicated instances of it in more regions, and users would connect their Agents to whichever region was closest to their Agents since most (but not all) WarpStream clusters run in a single region anyway.

The platform data (like API keys) is a different story. We could have used the same approach we did with the Metadata Store for the platform data by running a dedicated (and fully isolated) Aurora instance in every region, but that would have resulted in a poor user experience. Every region would have presented to users as a fully independent “website,” and users who wanted to run clusters in multiple regions would have had to maintain different WarpStream accounts, re-invite their teams, configure billing multiple times, etc, which is not what we wanted.

Hub and Spoke

When we looked at these requirements, the architecture that seemed like the best candidate was a “hub and spoke” model. The us-east-1 region that hosts our Aurora cluster would be the primary “hub” region that hosts the WarpStream UI and all of our “infrastructure as code” APIs for creating/destroying virtual clusters. All the other regions would be “spokes” that run fully independent and isolated versions of WarpStream’s Metadata Store, but not the Aurora database that stores the “platform data”.

Three spoke regions running fully isolated Metadata Stores powered by platform data replicated from the hub region.

CRUD operations to create and destroy virtual clusters would always be routed to the hub region, but actual customer WarpStream clusters and their Agents would only ever interact with a single “spoke” region and have no cross-regional dependencies.

This approach would give us the best of both worlds: a single pane of glass where WarpStream customers could manage clusters in any region while still keeping regions independent from each other such that a failure in one region (including the hub region) would never cause a failure in any other region. The one caveat with this approach is that any unavailability of Aurora in the primary hub region would prevent customers from creating new clusters in all regions, but existing clusters would continue working just fine. We felt like this was an acceptable trade-off.

However, this architecture did present a conundrum for us. In order for our product to present as a single pane of glass, some of the data in our primary region (like whether a virtual cluster exists, whether an API key was valid, etc) had to be made available in all of our spoke regions.

The hub region can read the platform data from the primary Aurora database, but where do the spoke regions read it from?

But we also needed to avoid creating any critical path inter-regional dependencies. Whatever we ended up doing, we had to ensure that the failure of a single region could never impact clusters running in different regions.

Easier said than done!

Option 1: Multi-Region Aurora

The first option we considered was to leverage AWS Aurora’s native multi-region functionality. Specifically, AWS Aurora has support for spawning read replicas in other regions. There are limits on how many additional regions can contain read replicas, and this approach would only work with AWS so we’d need a different solution for multi-cloud, but we thought this solution could be a good enough stop-gap in the short term to scale from a single region to a handful without much engineering work. We also really liked the idea of offloading the tricky problem of replicating a subset of our platform data to the AWS Aurora team.

Multi-region AWS Aurora cluster with the primaries in the hub region and read replicas in the spoke regions.

Unfortunately, when we investigated further, we discovered that any unavailability of the primary Aurora region could result in the unavailability of the secondary region read replicas. If that ever happened to us, we’d end up in a terrible situation: all of our spoke regions and their associated clusters / Metadata Stores would still be running (thanks to the in-memory caches), but restarting or deploying the control plane nodes would cause an incident due to the in-memory caches being dropped and unable to be refilled.

It turns out that multi-region functionality in Aurora is designed for a completely different use case: failing over regions fast when the primary region fails. Useful for that situation, but we wanted a solution that would never require manual intervention, so we ruled it out.

We briefly considered migrating to a different database with better multi-region availability support like CockroachDB or Spanner, but we had no previous experience with these technologies, and migrating all of our platform data to a brand-new database technology felt like overkill.

Option 2: Smart (and durable) Caches

Luckily, the platform data (like which virtual clusters exist and which API keys are valid) changes infrequently. So, another approach we considered was to query the source of truth (our us-east-1 Aurora database) from all subregions and then cache that data aggressively. For example, the first time a gateway node encountered an API key that it had not seen before, it would query Aurora in us-east-1 to determine if it was valid and then cache the result in memory.

ap-southeast-1 spoke region querying the AWS Aurora cluster in us-east-1 to fill its in-memory caches.

This approach was appealing because it would require only minimal code changes, and it took advantage of a strategy we were already employing within the primary region to be resilient against Aurora failures: in-memory caching. The Gateway nodes were already using a custom “smart” cache (internally referred to as the “loading cache”) that would fit the bill perfectly. This cache employs a number of tricks to make it suitable for critical use cases like this:

  1. It automatically deduplicates cache fills. This eliminates the thundering herd problem.
  2. It incorporates negative caching as a first-class concept, so if the Gateway nodes keep receiving requests for API keys that no longer exist, they don’t keep querying Aurora over and over again.
  3. It limits the concurrency of cache fills so that a flood of requests with new and unique API keys results in the cache being filled at a continuous (and manageable) rate instead of flooding Aurora with queries.
  4. It implements asynchronous background refreshes (again, with limited concurrency) so that changes in Aurora (like invalidating an API key) are eventually reflected back into the state of the in-memory caches. This ensures that in normal circumstances, when Aurora is available, invalidating an API key is reflected within seconds, but in rare circumstances where Aurora is unavailable, the API gateway nodes can keep running more or less indefinitely as long as they aren’t restarted.

This smart caching strategy had served us well within our primary region, but ultimately we decided that it wasn’t an acceptable solution to our multi-region data replication problem. A failure of the primary Aurora database in us-east-1 wouldn’t immediately impair the other regions, but it would leave us unable to deploy or restart any of the control planes in our other regions until the availability of the Aurora database was restored. In other words, this approach suffered from the same problem as the Aurora read replicas approach.

Briefly, we considered extending our existing loading cache implementation to be durable so that we could restart control plane nodes, even when the primary Aurora database was down, without losing the data that had already been cached.

ap-southeast-1 spoke region querying the AWS Aurora cluster in us-east-1 to fill its in-memory caches, but then persisting the cached data to a local DynamoDB instance so that the Gateway nodes can still be restarted safely even if the primary Aurora cluster is unavailable.

However, we also decided against that approach because it didn’t feel very stable. The system would function completely differently when the primary Aurora database was available than when it was unavailable, and we didn’t like the idea of relying heavily on an infrequently exercised code path for such critical functionality.

Ultimately, we decided that while the loading cache was great for caching data within a region, it was not an acceptable solution for replicating data across regions.

Option 3: Push-Based Replication (we chose this one)

Both of the models we considered previously were “pull-based” models. Instead, we decided to pursue a “push-based” approach using a technique we’d learned at previous jobs called “contexts”. A Context is a bundle of metadata with the following characteristics:

  1. Its values change slowly (if at all).
  2. The metadata is associated with specific clusters or tenants.
  3. The metadata needs to be made available on a large number of machines in a highly available manner.
  4. Availability is always favored over consistency, i.e., we’d rather use values that are several hours old than have the system fail entirely.

For example, one of the contexts we created is called the “cluster context” and it contains:

  1. The cluster’s ID
  2. The cluster’s name
  3. The ID of the tenant (customer) the cluster belongs to
  4. A few additional internal fields are required for the Metadata Store to begin processing requests

Building the contexts was straightforward. We wrote a job that scans the Aurora database every 10 minutes, builds the contexts, and then writes them as individual key-value pairs to a durable KV store in the relevant spoke regions.

Context publisher replicates context from the hub regions to the spoke regions.

Of course, we pride ourselves on the fact that a new WarpStream cluster can be created in under a second, so forcing users to wait 10 minutes before their clusters were usable after creation wasn’t acceptable. Solving for this was easy though: when a new cluster is created (or any operation is performed that could result in a context being created or an existing one mutated), we submit an asynchronous request to the same job service that will trigger an update for that specific context immediately.

This gives us the best of both worlds. Changes to the contexts (like a new cluster being created or an API key being revoked) are reflected in their associated subregions almost instantaneously, but in the worst-case scenario where we forget to issue the async update request in some code path (or it fails for some reason), the issue will automatically resolve itself within a few minutes. In other words, this approach is fast in the happy path, and self-healing in the non-happy path. Simple and easy to reason about.

The primary downside of this approach is that it was a lot more work to implement. But we think it was worth it for a few reasons:

  1. We truly have zero inter-regional dependencies in the critical path. Instead, the primary region pushes updates to the sub-regions proactively, but the sub-regions never query the primary region or create any external connections. In fact, the spoke regions aren’t even aware of the hub region in any meaningful way. This makes reasoning about availability, reliability, and failure modes easy. We know the failure of one region will never impact other regions because no region takes dependencies on another region, so it can’t have any impact by definition.
  2. The context framework we created is broadly useful. For example, in the future we’ll use it to build out support for our own feature flagging system without taking on any additional external dependencies.

With this setup, we have been able to deploy our control plane in three additional new regions all over the world, and we would be ready to spawn more depending on customers’ needs.

apachekafka Article's
30 articles in total
Favicon
Mastering Apache Kafka: A Complete Guide to the Heart of Real-Time Data Streaming
Favicon
AIM Weekly for 11/11/2024
Favicon
Apache Kafka: A Simple Guide to Messaging and Streaming
Favicon
Design a real-time data processing
Favicon
Building a Scalable Data Pipeline with Apache Kafka
Favicon
Building a Scalable Data Pipeline with Apache Kafka
Favicon
Implementing AI with Scikit-Learn and Kafka: A Complete Guide
Favicon
Understanding the Importance of Kafka in High-Volume Data Environments
Favicon
How can i stop my kafka consumer from consuming messages ?
Favicon
Getting Started with Apache Kafka: A Beginner's Guide to Distributed Event Streaming
Favicon
🚀 Apache Kafka Cluster Explained: Core Concepts and Architectures 🌐
Favicon
WarpStream Newsletter #5: Dealing with Rejection, Schema Validation, and Time Lag
Favicon
Dealing with rejection (in distributed systems)
Favicon
Apache Kafka on Amazon Linux EC2
Favicon
Announcing WarpStream Schema Validation
Favicon
The Kafka Metric You’re Not Using: Stop Counting Messages, Start Measuring Time
Favicon
WarpStream Newsletter #4: Data Pipelines, Zero Disks, BYOC and More
Favicon
Integrating Apache Kafka with Apache AGE for Real-Time Graph Processing
Favicon
Integrating Apache Kafka with Apache AGE for Real-Time Graph Processing
Favicon
Multiple Regions, Single Pane of Glass
Favicon
FLaNK-AIM: 20 May 2024 Weekly
Favicon
Secure by default: How WarpStream’s BYOC deployment model secures the most sensitive workloads
Favicon
Zero Disks is Better (for Kafka)
Favicon
FLaNK AI-April 22, 2024
Favicon
Pixel Federation Powers Mobile Analytics Platform with WarpStream, saves 83% over MSK
Favicon
FLaNK AI - 15 April 2024
Favicon
WarpStream Newsletter #3: Always Be Shipping
Favicon
Introducing WarpStream Managed Data Pipelines for BYOC Clusters
Favicon
Apache Kafka
Favicon
FLaNK-AIM Weekly 06 May 2024

Featured ones: