dev-resources.site
for different kinds of informations.
Throttle HTTP requests on paged resources with Vert.x
Hey Java developers, I know that you are interested in asynchronous programming and I know you are struggling with such paradigm.
I know that because I’m one of you!
Let us see how to deal with rate limiting issues in the magical async world in Java with Vert.x.
For the unaware a little recap:
What is Vert.x?
Vert.x is a polyglot event-driven and non-blocking application framework.
What is “Rate limiting”?
Rate limiting is a set of policies implemented by a web server used to force client to “throttle” requests from being overloading.
What are paged resources?
Paged resources are resources split in various pages to prevent overloading.
We will work with a real implementation case:
We are music fanatics (as I am), we are users of Discogs.com, and we want to fetch its data in our non-blocking application.
Taking a look at the discogs.com API reference we will notice that — aside from the API specifications — there are some information about data format, headers, versioning and so on.
We are now interested in two sections: Rate Limiting and Pagination.
Those are the problems to deal with.
A sketch of our architecture
Every arrow from left to right is a method call, every arrow from right to left is a Future.
We will start from the bottom and go up the river.
Rate Limiting
Discogs permits, according with documentation, 25 request per minute, so, 25 will be our rate limit, and one minute is the size of the rate limit window.
The idea is to have an interface with a method called *execute *which will accept a Request (a data object that represents the request) as parameter and return a Future (because we think async!) of Buffer, that is the response’s body.
Trivial and neat.
interface Requests {
Future<Buffer> **execute**(Request request);
}
We need to put a delay between requests, hence inside the RequestExecutor implementation we will use a queue to decouple the execute invocations from the throttled request execution:
class ThrottledRequests implements Requests {
...
@Override
public Future<Buffer> execute(Request request) {
try {
Future<Buffer> future = Future.future();
queue.put(new RequestContext(request, future));
return future;
} catch (Exception e) {
return Future.failedFuture(e);
}
}
...
}
RequestContext is a simple data object that allow us to bind request to its future.
Then, the request should be consumed from the queue and executed, we will do this with a periodic task by starting it every N milliseconds, where N is a non-fixed value computed from the maximum number of requests we are allowed to do in the rate limit window.
At first we will start with an optimistic 1, therefore here’s our ThrottledRequests constructor:
public class ThrottledRequests implements Requests {
private static final int RATE_LIMIT_WINDOW_SIZE = 60000;
private final Logger log = getLogger(getClass());
private final BlockingQueue<RequestContext> queue = new LinkedBlockingQueue<>();
private final AtomicLong executorId = new AtomicLong(0);
private final AtomicLong actualDelay = new AtomicLong(100);
private final HttpClient http;
private final Vertx vertx;
ThrottledRequests(Vertx vertx) {
this.vertx = vertx;
this.http = vertx.createHttpClient();
long id = vertx.setPeriodic(actualDelay.get(), executor());
this.executorId.set(id);
}
...
}
Some explanations:
- vertx: you need the vertx instance to launch a periodic executor.
- actualDelay: is an AtomicLong that represent the actual delay, so, as we said before, it’s starting value is 1.
- executor(): core method of the class, it insantiate a function that will perform the throttled requests.
- executorId: we need to track the executor task id to stop it when the actualDelay will change. Is an AtomicLong too
Let us see how to handle async request executions:
private Handler<Long> executor() {
return timerId -> {
RequestContext context = queue.poll();
if (context != null) {
Request inputRequest = context.request;
http.request(inputRequest.method(), inputRequest.options())
.putHeader("User-Agent", "YourApp/0.1")
.setFollowRedirects(true)
.handler(response -> {
response.bodyHandler(context.future::complete);
checkAndUpdateRateLimit(response);
})
.end();
}
};
}
Easy, uh?
On every request responded we complete the future and…checkAndUpdateRateLimit():
according to the API documentation, there is a header in every response we will get from discogs called X-Discogs-Ratelimit, that tells us how many requests we can do in a rate limit window of 1 minute. Cool.
private void checkAndUpdateRateLimit(HttpClientResponse response) {
Optional.ofNullable(response.getHeader("X-Discogs-Ratelimit"))
.map(Long::parseLong)
.map(rateLimit -> rateLimit - 1)
.map(reqPerMinute -> RATE_LIMIT_WINDOW_SIZE / reqPerMinute)
.ifPresent(throttleDelay -> {
if (throttleDelay != actualDelay.getAndSet(throttleDelay)) {
vertx.cancelTimer(executorId.get());
long id = vertx.setPeriodic(throttleDelay, executor());
executorId.set(id);
}
});
}
The calculation is pretty simple:
- Check the rate limit header
- Subtract one, just to avoid to fill the window
- Calculate the delay you should take between requests to do less than rateLimit requests in the time window (one minute)
- If the delay is different from the current one, we remove the old executor’s timer and define another one with a new delay time.
Pagination
Handling pagination is a little trickier than rate limit, since there is some work to do with futures, you know, the total page count will be known only after fetching the first page.
The idea is to divide responsibilities, we will implement three components:
(the example is on the Inventory resource, which retrieves Listing entities)
- GetListingPage: one implementation for every resource, such class uses Requests as collaborator, knows the url to invoke and knows how to deserialize the single fetched page. So its responsability is the Page.
- Pages uses GetResourcePage to fetch first page and eventually the others, join them. Its responsibility is the Pages.
- Client obtains all the pages from Pages and extract the entities from those.
Let’s start from GetListingPage, is a class that implement an interface with one method: apply
@Override
Future<ListingPage> apply(String userId, Integer pageNumber) {
log.info("Request {} inventory page {}", userId, pageNumber);
String path = String.format("/users/%s/inventory?page=%d", userId, pageNumber);
Request request = Request.get(path);
Future<ListingPage> future = Future.future();
executor.execute(request).setHandler(async -> {
if (async.succeeded()) {
String json = async.result().toString();
ListingPage page = Json.fromJson(json, ListingPage.class);
future.complete(page);
} else {
future.fail(async.cause());
}
});
return future;
}
Nothing special, execute request and deserialize JSON output to ListingPage class (a class that represent the ListingPage structure).
Maybe the more interesting class is Pages. The public method is getFor:
public Future<List<T>> getFor(String userId) {
Future<List<T>> result = Future.*future*();
getPage.apply(userId, 1).setHandler(async -> {
if (async.succeeded()) {
T firstPage = async.result();
int totalPages = firstPage.pages();
log.info("Total pages: {}", totalPages);
List<Future> futures = IntStream
.range(2, totalPages + 1)
.mapToObj(page -> getPage.apply(userId, page))
.collect(Collectors.toList());
CompositeFuture.all(futures)
.setHandler(joinPages(result,firstPage));
}
});
return result;
}
We will ask for the first page and then, knowing the total pages count, we’ll ask for every other page, combining all the Futures into a CompositeFuture.
When every future will be completed, we will joinPages:
private Handler<AsyncResult<CompositeFuture>> joinPages(
Future<List<T>> future, T firstPage
) {
return async -> {
if (async.succeeded()) {
CompositeFuture remnants = async.result();
Collection remnantPages = IntStream
.range(0, remnants.size())
.mapToObj(remnants::resultAt)
.map(Page.class::cast)
.collect(Collectors.toList());
List<T> total = new ArrayList<>();
total.add(firstPage);
total.addAll(remnantPages);
future.complete(total);
} else {
future.fail(async.cause());
}
};
}
Now, at the higher level (Client) we can just get pages and extract entities:
listingPages.getFor(user).setHandler(async -> {
if (async.succeeded()) {
List<Listing> listings = async.result().stream()
.map(ListingPage::listings)
.flatMap(Collection::stream)
.collect(Collectors.*toList*()));
... do your logic ...
}
});
Pretty cool, huh?
Source code:
Featured ones: