dev-resources.site
for different kinds of informations.
Dealing with rejection (in distributed systems)
by Richard Artoul
Distributed systems: theory vs. practice
There are two ways to learn about distributed systems:
- Reading academic and industry papers.
- Operating them in production.
The best distributed systems practitioners have done both extensively because they teach you different things.
Traditionally, when people want to learn about distributed systems, they start (as they should) with the literature where theyâll learn about:
- Algorithms.
- Data structures.
- Replication strategies.
- Consensus.
- Trade-offs between consistency and availability in the face of partition failures.
This helps people build a great foundation, but there are some topics that simply arenât well covered by the literature. These topics include things like:
- Instrumenting the system to make it observable and debuggable.
- Maintaining quality of service in the face of multi-tenancy.
- Backpressure.
- Designing the system to be operable by humans.
Iâve spent the last 10 years of my life operating building and operating distributed systems in production. Iâve been on-call for (almost) every major open-source database on the market:
In addition, Iâve built from scratch (along with my colleagues) and operated several distributed databases in production: M3DB, Husky, and most recently, WarpStream. During this time, I learned a lot of practical knowledge about what it takes to convert a design that works on paper into an implementation that works in production at massive scale.
For example, Huskyâs design was heavily inspired by industry-leading systems like Snowflake, Procella, and the wealth of available academic knowledge about leveraging columnar storage and vectorized query processing to analyze huge volumes of data in a short period of time.
But there is so much more that went into making Husky a scalable, cost-efficient, and reliable system than just what can be found in the academic literature. For example, while there are hundreds of excellent academic and industry papers about how to make a highly efficient vectorized query engine, there are shockingly few papers (rooted in actual experience with sufficient detail to replicate) about how to make that query engine multi-tenant, scale to thousands of concurrent queries, with more than 10 orders of magnitude difference in the cost of individual queries, while still maintaining good quality of service and letting your engineers sleep through the night.
That is an incredibly difficult problem to solve, and many teams have solved it (including us when we were at Datadog), but almost no one has documented how they solved it.
Unfortunately for readers stuck deep in the mud of building vectorized query engines, I will not be discussing how we solved that problem at Datadog in this post. Instead, I want to focus on a related problem that we confronted when building WarpStream: backpressure.
Backpressure
Backpressure is one of the most important practical details that every good distributed system has to get right if itâs going to stand a chance at survival in production. Without a good backpressuring system, a small increase in load or an errant client can easily knock over the entire system and leave it stuck in a death spiral from which it will never recover without manual interventionâââusually by shutting off all the clients.
At its core, backpressure is a really simple concept. When the system is nearing overload, it should start âsaying noâ by slowing down or rejecting requests. This applies pressure back toward the client (hence the term) and prevents the system from tipping over into catastrophic failure. This works because, in most real systems, the cost of rejecting a request is several orders of magnitude cheaper than actually processing it.
Backpressure should happen as early as possible in the request-processing lifecycle. The less work the system has to do before rejecting a request, the more resilient it will be.
Of course, the big question is: How do we know when we should reject a request? We could trivially create a system that is incredibly difficult to knock over by only allowing it to process one request concurrently and rejecting all other requests, but that system wouldnât be of much use to anyone. However, if we start backpressuring too late, well, then our window may have already passed to prevent the system from self-destructing.
Unfortunately, this is one of those scenarios where we need to strive for a difficult-to-quantify Goldilocks zone where the backpressuring system kicks in at just the right time. Thatâs the art of dealing with rejection (in distributed systems). Letâs clarify by defining what failure and success might look like.
Failure of the backpressuring system is easy to define: it looks like catastrophic failure. For example, if an increase in load or traffic can cause the system to run out of memory, thatâs a pretty bad failure of the backpressuring system. In this case, backpressure is happening too late.
Similarly, if the number of requests processed by the system drops significantly below the peak throughput the system is capable of when it isnât overloaded, that can also be a form of catastrophic failure.
Ideal behavior regarding latency is more use-case dependent. For some latency-critical workloads, an ideal system will maintain consistently low latency for requests it chooses to accept and immediately reject all requests that it canât serve within a tight latency budget. These use cases are more of an exception than the norm, though, and in most scenarios, operators prefer higher latency (within reason) over requests being rejected.
As a concrete example, Amazon would almost certainly prefer an incident where the latency to add an item to a userâs cart increases from 100ms to 2 seconds over one where 95% of add-to-cart operations are rejected immediately, but those that are accepted complete in under 100ms.
Something like this might be considered acceptable, for example:
Of course, all of these examples are âwithin reasonâ. If you recruit a thousand servers to do nothing but DDOS a single server, no amount of intelligent programming will save you if the victim serverâs network is completely saturated.
OK, enough abstract discussion; letâs dive into a concrete use case now.
Backpressure in the WarpStream Agents
WarpStream is a drop-in replacement for Apache KafkaÂŽ that is built directly on-top of object storage and deployed as a single, stateless binary. Nodes in a WarpStream cluster are called âAgentsâ, and while the Agents perform many different tasks, primarily theyâre responsible for:
- Writes (Kafka Produce requests)
- Reads (Kafka Fetch requests)
- Background jobs (compactions)
Each of these functions needs a reasonable backpressure mechanism, and if a WarpStream Agent is handling multiple roles, those backpressure systems may need to interact with each other. For now, letâs just focus on writes / Produce requests.
Processing every Produce request requires some memory (at least as much data as is being written by the client), a dedicated Goroutine (which consumes memory and has scheduling overhead), and a variety of other resources. If the Agents blindly accept every Produce request they receive, with no consideration for how fast other Produce requests are being completed or how many are currently in-flight, then a sufficiently high number of Produce requests will eventually overwhelm the Agents 1.
Now, queuing theory tends to deal in concepts like arrival rates and average request processing time. That could lead us to consider using a rate limiter to implement backpressure. For example, we could do some benchmarking and conclude that, on average, Agents can only handle X throughput/s/core, and thus configure a global rate limit for the entire process to X * NUM_CORES 2.
That would work, but it would be pretty annoying to tune. What is a reasonable rate-limit for write throughput per core?
- We could benchmark it, but the performance will vary heavily from one workload to another. Also, what if we improve the performance of the Agents in the future?
- We could measure it empirically at runtime, and then back that out into a dynamically adjusted rate-limit, but thatâs likely to be brittle and complex.
In general, rate limits make a lot of sense for limiting the amount of resources that individual tenants can consume in a multi-tenant environment (which is one aspect of backpressuring), but theyâre not a great solution for making sure that systems donât tip over into catastrophic failure.
Instead, Iâve always found the best results by tracking something that correlates with the system becoming overloaded and falling behind. Inevitably, if the system is overloaded, something will begin to pile up: memory usage, inflight requests, queue depth, etc.
These things are easy to track and they relieve us from the burden of thinking in terms of rates (mostly). The threshold for âthis is too many things in-flightâ is usually much easier to tune and reason about than a pure rate-limit and will automatically adapt to a much wider variety of workloads. As a bonus, if the system gets more efficient over time, the backpressure mechanism will automatically adjust because it will require more load to make things pile up, and so the backpressure system will kick in later. Anytime we can make something self-tuning like this, thatâs a huge win.
For Produce requests in the WarpStream Agents, the best criteria we found for triggering backpressure were two metrics:
- The number of in-flight bytes that had not yet been flushed to object storage.
- The number of in-flight files that had not yet been flushed to object storage.
Intuitively this makes sense: if the Agents are overloaded, then pretty quickly requests will begin piling up in-memory, and the value of those two metrics will spike. Itâs pretty easy to do the equivalent of the following in the WarpStream code:
func (h *Handler) HandleProduceRequest(
ctx context.Context,
req ProduceRequest,
) (ProduceResponse, error) {
if h.numberOfOutstandingBytesOrFilesTooHigh() {
return nil, ErrBackpressure
}
// Process request.
}
And weâre done, right?
Unfortunately, this is a woefully inadequate solution. The WarpStream Agents process every Kafka protocol request in a dedicated Goroutine, and there is a pretty big risk here that a flood of Produce requests will come in at the same time, all individually pass the h.numberOfOutstandingBytesOrFilesTooHigh() check at the same time, and then immediately throw the Agent way over the target limit.
We could fix that by making that method atomically check the metrics and increment them, but we actually have a bigger problem: by the time HandleProduceRequest() is called, weâve already done a lot of work:
- Copied the data to be written off the network.
- Copied it into temporary buffers.
- Spawned (or reused) an existing goroutine.
- Emitted a bunch of metrics.
At this point, itâs almost worth just accepting the request because the incremental cost of actually processing the request at this point is not that much higher than the work weâve already performed.
It would be a lot better if we could have rejected this request earlier. Like, way earlier, before we allowed it to consume almost any memory in the first place. Thankfully, this is possible! We wrote the TCP server that powers the WarpStream Kafka protocol from scratch, so we have full control over it. At a very high level, the server code looks something like this:
for {
conn = listener.Accept()
go func() {
handleConnection(conn)
}()
}
func handleConnection(conn net.Conn) {
for {
header = ReadHeader(conn)
message = ReadMessage(conn)
go func() {
response, err = handler.HandleRequest(message)
sendOutcome(response, err)
}()
}
}
Iâm glossing over a lot of details, but you get the gist. In a really busy WarpStream cluster, a single Agent might have thousands or tens of thousands of active connections, and each of those connections will have Goroutines that are reading bytes off the network, allocating messages, and spawning new Goroutines as fast as they can.
In that scenario, it doesnât matter that the HandleRequest() method will start returning backpressure errors. Thereâs too much concurrency and the Goroutines handling the Kafka client connections will eventually overwhelm the VMâs resources and trigger an out of memory error.
Ideally, once the Agent detected that it was overloaded, all these connection handler Goroutines would stop processing messages for a while to allow the system to recover. This is the difference between load-shedding and backpressuring. The handler in the above code is shedding load (by rejecting requests), but itâs not applying pressure backward to the rest of the system.
So how do we fix this? Well, the first thing we can do is make a tiny modification to the handleConnection function:
for {
conn = listener.Accept()
go func() {
handleConnection(conn)
}()
}
func handleConnection(conn net.Conn) {
for {
for {
throttleDuration = handler.ShouldThrottle()
if throttleDuration > 0 {
time.Sleep(throttleDuration)
continue
}
break
}
header = ReadHeader(conn)
message = ReadMessage(conn)
go func() {
response, err = handler.HandleRequest(message)
sendOutcome(response, err)
}()
}
}
Again Iâm oversimplifying, but this is already much better than just the previous solution. Now, it will be much harder for misbehaving clients to knock an Agent over because if the Agent is overloaded, it will stop reading bytes from the network entirely. Itâs pretty hard to do less work than that.
Even better, TCP incorporates the concept of backpressure deeply into its design, so this simple trick will apply backpressure back into the networking stack and eventually all the way back to the client VMs.
Finally, we can take this one step further and make the Agents refuse to even accept new connections when theyâre overloaded:
for {
// Stop accepting new connections if overloaded.
sleepUntilHealthy()
conn = listener.Accept()
go func() {
handleConnection(conn)
}()
}
func handleConnection(conn net.Conn) {
for {
// Stop accepting new requests on existing connections
// if overloaded.
sleepUntilHealthy()
header = ReadHeader(conn)
message = ReadMessage(conn)
go func() {
response, err = handler.HandleRequest(message)
sendOutcome(response, err)
}()
}
}
func sleepUntilHealthy() {
for {
throttleDuration = handler.ShouldThrottle()
if throttleDuration > 0 {
time.Sleep(throttleDuration)
continue
}
break
}
}
This is a bit heavy-handed, but thatâs OK. It will only kick in during very dire circumstances where the only alternative would be catastrophic failures and/or running out of memory 3.
Sounds good on paper. But does it work? Letâs find out!
At WarpStream we donât like to coddle our software. Production is a messy place where terrible things happen daily, so we try to simulate that as much as possible in all of our test environments.
One of our most aggressive environments is a test cluster that runs 24/7 with three WarpStream Agents. The benchmark workload is configured such that all three Agents are pegged at 80â100% CPU utilization all the time. The benchmark itself consists of eight different test workloads, using four different Kafka clients, and varying batch sizes, partition counts, throughput, number of client instances, etc.
In total, there are hundreds of producer and consumer instances, thousands of partitions, four different client compression algorithms, a mix of regular and compacted topics, and almost all the producers are configured to use the most difficult partitioning strategy where they round-robin records amongst all the partitions.
In addition, the benchmark workloads periodically delete topics and recreate them, rewind and begin reading all of the data from the beginning of retention for the compacted topics, manually trigger consumer group rebalances, and much more. Itâs just absolute chaos. Great for testing!
When we iterate on WarpStreamâs backpressure system, we use a very simple test: we aggressively scale the cluster down from three Agents to one. This triples the load on the sole remaining Agent that was already running at almost 100% CPU utilization.
Before our most recent improvements, this is what would happen:
Not fun.
But with the new build and all of our latest tricks?
Not perfect, but it is pretty decent considering the Agent is running at 100% CPU utilization:
Importantly, the struggling Agent immediately recovers as soon as we provide additional capacity by adding a node. That is exactly the behavior you want out of a distributed system like this: the system should feel âspringyâ such that it immediately âbounces backâ as soon as additional resources are provided or load is removed.
Another counter-intuitive outcome here is that the Agent continues to function reasonably even while pegged at 100% CPU utilization for a sustained period of time. This is very difficult to accomplish in practice, but it represents the best case scenario for backpressuring: the Agent is able to utilize 100% of the available resources on the machine without ever becoming unstable or unresponsive.
Any operator (or auto-scaler) can look at that graph and immediately determine the right course of action: scale up! Contrast that with a system that starts backpressuring while the underlying resources are under-utilized (say at 40% CPU utilization). Thatâs going to be a lot more difficult to understand, debug, and most importantly, react to in an automated manner.
Of course, thatâs just how we manage backpressure for Produce requests. The Fetch code path is even more nuanced and required some novel tricks that weâd never employed in any previous system we ever worked on before. But this post is already way too long, so thatâll have to wait until next time!
If you like sleeping through the night and letting your infrastructure auto-scale and protect itself automatically, check out WarpStream.
š This is true âby definitionâ of the most basic principles of queuing theory.
² Itâs usually best to define limits as a function of the available resources. This way, the application automatically scales to different instance types without modifying the underlying configuration.
Âł Note that all of this is independent of the quota / throttling system that is native to Apache Kafka. Weâll discuss that more in a different post.
To learn more about WarpStream Schema Validation, read the docs, or contact us.
Create a free WarpStream account and start streaming with $400 in free credits. Get Started!
Featured ones: