Logo

dev-resources.site

for different kinds of informations.

Dealing with rejection (in distributed systems)

Published at
8/20/2024
Categories
datastreaming
apachekafka
dataengineering
kafka
Author
warpstream
Author
10 person written this
warpstream
open
Dealing with rejection (in distributed systems)

by Richard Artoul

Distributed systems: theory vs. practice

There are two ways to learn about distributed systems:

  1. Reading academic and industry papers.
  2. Operating them in production.

The best distributed systems practitioners have done both extensively because they teach you different things.

Traditionally, when people want to learn about distributed systems, they start (as they should) with the literature where they’ll learn about:

  1. Algorithms.
  2. Data structures.
  3. Replication strategies.
  4. Consensus.
  5. Trade-offs between consistency and availability in the face of partition failures.

This helps people build a great foundation, but there are some topics that simply aren’t well covered by the literature. These topics include things like:

  1. Instrumenting the system to make it observable and debuggable.
  2. Maintaining quality of service in the face of multi-tenancy.
  3. Backpressure.
  4. Designing the system to be operable by humans.

I’ve spent the last 10 years of my life operating building and operating distributed systems in production. I’ve been on-call for (almost) every major open-source database on the market:

In addition, I’ve built from scratch (along with my colleagues) and operated several distributed databases in production: M3DB, Husky, and most recently, WarpStream. During this time, I learned a lot of practical knowledge about what it takes to convert a design that works on paper into an implementation that works in production at massive scale.

For example, Husky’s design was heavily inspired by industry-leading systems like Snowflake, Procella, and the wealth of available academic knowledge about leveraging columnar storage and vectorized query processing to analyze huge volumes of data in a short period of time.

But there is so much more that went into making Husky a scalable, cost-efficient, and reliable system than just what can be found in the academic literature. For example, while there are hundreds of excellent academic and industry papers about how to make a highly efficient vectorized query engine, there are shockingly few papers (rooted in actual experience with sufficient detail to replicate) about how to make that query engine multi-tenant, scale to thousands of concurrent queries, with more than 10 orders of magnitude difference in the cost of individual queries, while still maintaining good quality of service and letting your engineers sleep through the night.

That is an incredibly difficult problem to solve, and many teams have solved it (including us when we were at Datadog), but almost no one has documented how they solved it.

Unfortunately for readers stuck deep in the mud of building vectorized query engines, I will not be discussing how we solved that problem at Datadog in this post. Instead, I want to focus on a related problem that we confronted when building WarpStream: backpressure.

Backpressure

Backpressure is one of the most important practical details that every good distributed system has to get right if it’s going to stand a chance at survival in production. Without a good backpressuring system, a small increase in load or an errant client can easily knock over the entire system and leave it stuck in a death spiral from which it will never recover without manual intervention — usually by shutting off all the clients.

At its core, backpressure is a really simple concept. When the system is nearing overload, it should start “saying no” by slowing down or rejecting requests. This applies pressure back toward the client (hence the term) and prevents the system from tipping over into catastrophic failure. This works because, in most real systems, the cost of rejecting a request is several orders of magnitude cheaper than actually processing it.

Backpressure should happen as early as possible in the request-processing lifecycle. The less work the system has to do before rejecting a request, the more resilient it will be.

Of course, the big question is: How do we know when we should reject a request? We could trivially create a system that is incredibly difficult to knock over by only allowing it to process one request concurrently and rejecting all other requests, but that system wouldn’t be of much use to anyone. However, if we start backpressuring too late, well, then our window may have already passed to prevent the system from self-destructing.

Unfortunately, this is one of those scenarios where we need to strive for a difficult-to-quantify Goldilocks zone where the backpressuring system kicks in at just the right time. That’s the art of dealing with rejection (in distributed systems). Let’s clarify by defining what failure and success might look like.

Failure of the backpressuring system is easy to define: it looks like catastrophic failure. For example, if an increase in load or traffic can cause the system to run out of memory, that’s a pretty bad failure of the backpressuring system. In this case, backpressure is happening too late.

Similarly, if the number of requests processed by the system drops significantly below the peak throughput the system is capable of when it isn’t overloaded, that can also be a form of catastrophic failure.

Ideal behavior regarding latency is more use-case dependent. For some latency-critical workloads, an ideal system will maintain consistently low latency for requests it chooses to accept and immediately reject all requests that it can’t serve within a tight latency budget. These use cases are more of an exception than the norm, though, and in most scenarios, operators prefer higher latency (within reason) over requests being rejected.

As a concrete example, Amazon would almost certainly prefer an incident where the latency to add an item to a user’s cart increases from 100ms to 2 seconds over one where 95% of add-to-cart operations are rejected immediately, but those that are accepted complete in under 100ms.

Something like this might be considered acceptable, for example:

Of course, all of these examples are “within reason”. If you recruit a thousand servers to do nothing but DDOS a single server, no amount of intelligent programming will save you if the victim server’s network is completely saturated.

OK, enough abstract discussion; let’s dive into a concrete use case now.

Backpressure in the WarpStream Agents

WarpStream is a drop-in replacement for Apache Kafka® that is built directly on-top of object storage and deployed as a single, stateless binary. Nodes in a WarpStream cluster are called “Agents”, and while the Agents perform many different tasks, primarily they’re responsible for:

  1. Writes (Kafka Produce requests)
  2. Reads (Kafka Fetch requests)
  3. Background jobs (compactions)

Each of these functions needs a reasonable backpressure mechanism, and if a WarpStream Agent is handling multiple roles, those backpressure systems may need to interact with each other. For now, let’s just focus on writes / Produce requests.

Processing every Produce request requires some memory (at least as much data as is being written by the client), a dedicated Goroutine (which consumes memory and has scheduling overhead), and a variety of other resources. If the Agents blindly accept every Produce request they receive, with no consideration for how fast other Produce requests are being completed or how many are currently in-flight, then a sufficiently high number of Produce requests will eventually overwhelm the Agents 1.

Now, queuing theory tends to deal in concepts like arrival rates and average request processing time. That could lead us to consider using a rate limiter to implement backpressure. For example, we could do some benchmarking and conclude that, on average, Agents can only handle X throughput/s/core, and thus configure a global rate limit for the entire process to X * NUM_CORES 2.

That would work, but it would be pretty annoying to tune. What is a reasonable rate-limit for write throughput per core?

  • We could benchmark it, but the performance will vary heavily from one workload to another. Also, what if we improve the performance of the Agents in the future?
  • We could measure it empirically at runtime, and then back that out into a dynamically adjusted rate-limit, but that’s likely to be brittle and complex.

In general, rate limits make a lot of sense for limiting the amount of resources that individual tenants can consume in a multi-tenant environment (which is one aspect of backpressuring), but they’re not a great solution for making sure that systems don’t tip over into catastrophic failure.

Instead, I’ve always found the best results by tracking something that correlates with the system becoming overloaded and falling behind. Inevitably, if the system is overloaded, something will begin to pile up: memory usage, inflight requests, queue depth, etc.

These things are easy to track and they relieve us from the burden of thinking in terms of rates (mostly). The threshold for “this is too many things in-flight” is usually much easier to tune and reason about than a pure rate-limit and will automatically adapt to a much wider variety of workloads. As a bonus, if the system gets more efficient over time, the backpressure mechanism will automatically adjust because it will require more load to make things pile up, and so the backpressure system will kick in later. Anytime we can make something self-tuning like this, that’s a huge win.

For Produce requests in the WarpStream Agents, the best criteria we found for triggering backpressure were two metrics:

  1. The number of in-flight bytes that had not yet been flushed to object storage.
  2. The number of in-flight files that had not yet been flushed to object storage.

Intuitively this makes sense: if the Agents are overloaded, then pretty quickly requests will begin piling up in-memory, and the value of those two metrics will spike. It’s pretty easy to do the equivalent of the following in the WarpStream code:

func (h *Handler) HandleProduceRequest(
 ctx context.Context,
 req ProduceRequest,
) (ProduceResponse, error) {
 if h.numberOfOutstandingBytesOrFilesTooHigh() {
  return nil, ErrBackpressure
 }

 // Process request.
}
Enter fullscreen mode Exit fullscreen mode

And we’re done, right?

Unfortunately, this is a woefully inadequate solution. The WarpStream Agents process every Kafka protocol request in a dedicated Goroutine, and there is a pretty big risk here that a flood of Produce requests will come in at the same time, all individually pass the h.numberOfOutstandingBytesOrFilesTooHigh() check at the same time, and then immediately throw the Agent way over the target limit.

We could fix that by making that method atomically check the metrics and increment them, but we actually have a bigger problem: by the time HandleProduceRequest() is called, we’ve already done a lot of work:

  1. Copied the data to be written off the network.
  2. Copied it into temporary buffers.
  3. Spawned (or reused) an existing goroutine.
  4. Emitted a bunch of metrics.

At this point, it’s almost worth just accepting the request because the incremental cost of actually processing the request at this point is not that much higher than the work we’ve already performed.

It would be a lot better if we could have rejected this request earlier. Like, way earlier, before we allowed it to consume almost any memory in the first place. Thankfully, this is possible! We wrote the TCP server that powers the WarpStream Kafka protocol from scratch, so we have full control over it. At a very high level, the server code looks something like this:


for {
 conn = listener.Accept()
 go func() {
  handleConnection(conn)
 }()
}

func handleConnection(conn net.Conn) {
 for {
  header = ReadHeader(conn)
  message = ReadMessage(conn)
  go func() {
   response, err = handler.HandleRequest(message)
   sendOutcome(response, err)
  }()
 }
}
Enter fullscreen mode Exit fullscreen mode

I’m glossing over a lot of details, but you get the gist. In a really busy WarpStream cluster, a single Agent might have thousands or tens of thousands of active connections, and each of those connections will have Goroutines that are reading bytes off the network, allocating messages, and spawning new Goroutines as fast as they can.

In that scenario, it doesn’t matter that the HandleRequest() method will start returning backpressure errors. There’s too much concurrency and the Goroutines handling the Kafka client connections will eventually overwhelm the VM’s resources and trigger an out of memory error.

Ideally, once the Agent detected that it was overloaded, all these connection handler Goroutines would stop processing messages for a while to allow the system to recover. This is the difference between load-shedding and backpressuring. The handler in the above code is shedding load (by rejecting requests), but it’s not applying pressure backward to the rest of the system.

So how do we fix this? Well, the first thing we can do is make a tiny modification to the handleConnection function:


for {
 conn = listener.Accept()
 go func() {
  handleConnection(conn)
 }()
}

func handleConnection(conn net.Conn) {
 for {
  for {
   throttleDuration = handler.ShouldThrottle()
   if throttleDuration > 0 {
    time.Sleep(throttleDuration)
    continue
   }
   break
  }

  header = ReadHeader(conn)
  message = ReadMessage(conn)
  go func() {
   response, err = handler.HandleRequest(message)
   sendOutcome(response, err)
  }()
 }
}
Enter fullscreen mode Exit fullscreen mode

Again I’m oversimplifying, but this is already much better than just the previous solution. Now, it will be much harder for misbehaving clients to knock an Agent over because if the Agent is overloaded, it will stop reading bytes from the network entirely. It’s pretty hard to do less work than that.

Even better, TCP incorporates the concept of backpressure deeply into its design, so this simple trick will apply backpressure back into the networking stack and eventually all the way back to the client VMs.

Finally, we can take this one step further and make the Agents refuse to even accept new connections when they’re overloaded:

for {
 // Stop accepting new connections if overloaded.
 sleepUntilHealthy()
 conn = listener.Accept()
 go func() {
  handleConnection(conn)
 }()
}

func handleConnection(conn net.Conn) {
 for {
  // Stop accepting new requests on existing connections
  // if overloaded.
  sleepUntilHealthy()
  header = ReadHeader(conn)
  message = ReadMessage(conn)
  go func() {
   response, err = handler.HandleRequest(message)
   sendOutcome(response, err)
  }()
 }
}

func sleepUntilHealthy() {
 for {
  throttleDuration = handler.ShouldThrottle()
  if throttleDuration > 0 {
   time.Sleep(throttleDuration)
   continue
  }
  break
 }
}
Enter fullscreen mode Exit fullscreen mode

This is a bit heavy-handed, but that’s OK. It will only kick in during very dire circumstances where the only alternative would be catastrophic failures and/or running out of memory 3.

Sounds good on paper. But does it work? Let’s find out!

At WarpStream we don’t like to coddle our software. Production is a messy place where terrible things happen daily, so we try to simulate that as much as possible in all of our test environments.

One of our most aggressive environments is a test cluster that runs 24/7 with three WarpStream Agents. The benchmark workload is configured such that all three Agents are pegged at 80–100% CPU utilization all the time. The benchmark itself consists of eight different test workloads, using four different Kafka clients, and varying batch sizes, partition counts, throughput, number of client instances, etc.

In total, there are hundreds of producer and consumer instances, thousands of partitions, four different client compression algorithms, a mix of regular and compacted topics, and almost all the producers are configured to use the most difficult partitioning strategy where they round-robin records amongst all the partitions.

In addition, the benchmark workloads periodically delete topics and recreate them, rewind and begin reading all of the data from the beginning of retention for the compacted topics, manually trigger consumer group rebalances, and much more. It’s just absolute chaos. Great for testing!

When we iterate on WarpStream’s backpressure system, we use a very simple test: we aggressively scale the cluster down from three Agents to one. This triples the load on the sole remaining Agent that was already running at almost 100% CPU utilization.

Before our most recent improvements, this is what would happen:

Not fun.

But with the new build and all of our latest tricks?

Not perfect, but it is pretty decent considering the Agent is running at 100% CPU utilization:

Importantly, the struggling Agent immediately recovers as soon as we provide additional capacity by adding a node. That is exactly the behavior you want out of a distributed system like this: the system should feel “springy” such that it immediately “bounces back” as soon as additional resources are provided or load is removed.

Another counter-intuitive outcome here is that the Agent continues to function reasonably even while pegged at 100% CPU utilization for a sustained period of time. This is very difficult to accomplish in practice, but it represents the best case scenario for backpressuring: the Agent is able to utilize 100% of the available resources on the machine without ever becoming unstable or unresponsive.

Any operator (or auto-scaler) can look at that graph and immediately determine the right course of action: scale up! Contrast that with a system that starts backpressuring while the underlying resources are under-utilized (say at 40% CPU utilization). That’s going to be a lot more difficult to understand, debug, and most importantly, react to in an automated manner.

Of course, that’s just how we manage backpressure for Produce requests. The Fetch code path is even more nuanced and required some novel tricks that we’d never employed in any previous system we ever worked on before. But this post is already way too long, so that’ll have to wait until next time!

If you like sleeping through the night and letting your infrastructure auto-scale and protect itself automatically, check out WarpStream.

¹ This is true “by definition” of the most basic principles of queuing theory.

² It’s usually best to define limits as a function of the available resources. This way, the application automatically scales to different instance types without modifying the underlying configuration.

³ Note that all of this is independent of the quota / throttling system that is native to Apache Kafka. We’ll discuss that more in a different post.

To learn more about WarpStream Schema Validation, read the docs, or contact us.

Create a free WarpStream account and start streaming with $400 in free credits. Get Started!

apachekafka Article's
30 articles in total
Favicon
Mastering Apache Kafka: A Complete Guide to the Heart of Real-Time Data Streaming
Favicon
AIM Weekly for 11/11/2024
Favicon
Apache Kafka: A Simple Guide to Messaging and Streaming
Favicon
Design a real-time data processing
Favicon
Building a Scalable Data Pipeline with Apache Kafka
Favicon
Building a Scalable Data Pipeline with Apache Kafka
Favicon
Implementing AI with Scikit-Learn and Kafka: A Complete Guide
Favicon
Understanding the Importance of Kafka in High-Volume Data Environments
Favicon
How can i stop my kafka consumer from consuming messages ?
Favicon
Getting Started with Apache Kafka: A Beginner's Guide to Distributed Event Streaming
Favicon
🚀 Apache Kafka Cluster Explained: Core Concepts and Architectures 🌐
Favicon
WarpStream Newsletter #5: Dealing with Rejection, Schema Validation, and Time Lag
Favicon
Dealing with rejection (in distributed systems)
Favicon
Apache Kafka on Amazon Linux EC2
Favicon
Announcing WarpStream Schema Validation
Favicon
The Kafka Metric You’re Not Using: Stop Counting Messages, Start Measuring Time
Favicon
WarpStream Newsletter #4: Data Pipelines, Zero Disks, BYOC and More
Favicon
Integrating Apache Kafka with Apache AGE for Real-Time Graph Processing
Favicon
Integrating Apache Kafka with Apache AGE for Real-Time Graph Processing
Favicon
Multiple Regions, Single Pane of Glass
Favicon
FLaNK-AIM: 20 May 2024 Weekly
Favicon
Secure by default: How WarpStream’s BYOC deployment model secures the most sensitive workloads
Favicon
Zero Disks is Better (for Kafka)
Favicon
FLaNK AI-April 22, 2024
Favicon
Pixel Federation Powers Mobile Analytics Platform with WarpStream, saves 83% over MSK
Favicon
FLaNK AI - 15 April 2024
Favicon
WarpStream Newsletter #3: Always Be Shipping
Favicon
Introducing WarpStream Managed Data Pipelines for BYOC Clusters
Favicon
Apache Kafka
Favicon
FLaNK-AIM Weekly 06 May 2024

Featured ones: