Logo

dev-resources.site

for different kinds of informations.

Ordering Event Bus Events with RxJS and concatMap

Published at
9/20/2022
Categories
rxjs
async
operators
eventbus
Author
deanius
Categories
4 categories in total
rxjs
open
async
open
operators
open
eventbus
open
Author
7 person written this
deanius
open
Ordering Event Bus Events with RxJS and concatMap

An Event Bus can help our app by providing a single source of truth for the relative timing of events. If trigger('B') follows trigger('A') , then we we expect listeners will receive events in A, B order. Any other order would certainly lead to bugs.

Strangely enough, with a simple implementation of an Event Bus in RxJS (a Subject, and some Observers that tap in a function), one can accidentally introduce a bug in which listeners don't hear events in the same order! Both the buses Luis Aviles made here and the one I built in this post suffer from this bug. In this post we will use concatMap to fix that, learning the "Return The Work" principle of Observables.

Your Thoughts?

The library that implements this bus fully today is called omnibus-rxjs. I'm thinking of putting it up as @rxfx/bus, and extracting out a family of libraries under that namespace. Which name do you like? Leave a comment below.


Trigger and Listen

In part 1 we constructed an Event Bus that let us:

  1. Call bus.trigger to trigger an event to the bus
  2. Use bus.listen to register a function to be run on matching events.

On this bus, what happens if a handler function run by bus.listen contains a call to bus.trigger? Could this confuse the event ordering?

bus.listen(
  (item) => item === "hello-dave",
  () => { bus.trigger("hello-HAL") }
);
Enter fullscreen mode Exit fullscreen mode

On the surface there's nothing suspicious here. And to review our bus code at this point, it's nothing more than this:

class Bus<T> {
  private events: Subject<T>;
  constructor() {
    this.events = new Subject();
  }
  listen(matcher: Predicate<T>, handler: (item: T) => void): Subscription {
    return this.events
      .asObservable()
      .pipe(filter(matcher), tap(handler))
      .subscribe();
  }
  trigger(item: T) {
    this.events.next(item);
  }
}
Enter fullscreen mode Exit fullscreen mode

So, assuming 3 events are triggered, how could 2 listeners disagree on what order the events arrived?

// Listener 1 heard: ["hello-world", "hello-Dave", "hello-HAL"]
// Listener 2 heard: ["hello-world", "hello-HAL", "hello-Dave"]
Enter fullscreen mode Exit fullscreen mode

The Stack is The Culprit

JavaScript is a stack-based language, where each synchronous function call begins and ends inside its parents'. How is this relevant? Allow me to illustrate..

Suppose there are 3 listeners on a bus carrying strings. First we create a logger called L1. Then our 'Dave listener' which will re-trigger. Then another logger called L2 will be attached. If we fire an event that re-triggers, like "hello-dave", this shows the sequence of calls to the listeners that results in the problem:

 trigger('hello-Dave')
  - L1 listener
  - Dave listener
     - trigger('hello-HAL')
        - L1 listener
        - L2 listener
  - L2 listener
Enter fullscreen mode Exit fullscreen mode

At runtime, the trigger('hello-HAL') from inside the "Dave" listener started firing each matching listener sequentially. But L2 hadn't yet processed hello-Dave, so L2 sees hello-HAL even before hello-Dave. This is what we want to prevent. Our answer will be to not begin the re-triggering immediately, but to "Return the Work"

Observables - Units of Work

Of all the definitions of Observables out there, the one I like is that an Observable represents the potential for some work to be done, or resource to be used. Like the command-line string ls -l encodes the potential of a memory-occupying, data-emitting process. So wherever you did work immediately before, you can now do it in an Observable. Like React VDOM is for DOM elements, Observables are to effects.

Now, if we have a stream of these work Observables, we can strictly serialize them with the RxJS operator concatMap.

The Fix

If you suspected some kind of queueing was the solution you're right. But if you didn't know aboutconcatMap, you may have imagined building some data structure to hold the queue. But the concatMap operator actually does this internally already. Let's give our bus an RxJS Subject for the triggerings we want to serialize.

class Bus<T> {
  private events: Subject<T>;
+  private triggerings: Subject<Observable<void>>;

  constructor() {
    this.events = new Subject();
+    this.triggerings = new Subject();
+    this.triggerings.pipe(
+      concatMap((t) => t)
+    ).subscribe();
  }
}
Enter fullscreen mode Exit fullscreen mode

When the constructor is run, the triggeringsSubject begins listening for work items—Observables—which it passes to concatMap to execute only serially. Now, we change the implementation of trigger to push one of these work items to that Subject:

  trigger(item: T) {
-    this.events.next(item);  
+    this.triggerings.next(
+      new Observable((notify) => {
+        this.events.next(item);
+        notify.complete();
+      })
+    );
  }
Enter fullscreen mode Exit fullscreen mode

The Observable's work is to run each listener for the item completely and synchronously with events.next(item), and only then call notify.complete().

And voila! concatMap can serialize the calls now! We no longer violate causality, and our logs show each listener agrees on the same event ordering.

// Listener 1 heard: ["hello-world", "hello-Dave", "hello-HAL"]
// Listener 2 heard: ["hello-world", "hello-Dave", "hello-HAL"]
Enter fullscreen mode Exit fullscreen mode

And we can see this is because triggerings—the execution of those Observables—always finishes notifying listeners for one event before processing the next one.

 trigger('hello-dave')
  - L1 listener
  - Dave listener (queues hello HAL)
  - L2 listener
 trigger('hello-HAL')
  - L1 listener
  - L2 listener
Enter fullscreen mode Exit fullscreen mode

Decoupling For The Win

Where we're going with this bus is to make it usable for a Pub-Sub implementation. This bug we just fixed was a kind of coupling, where "Dave listener" was interfering with L2's event order. This is normal when making sync calls in a stack-based language, but we can bring order back to it again with RxJS.

Now that we've decoupled this runtime behavior of listeners from each other, it's time to look at another important way listeners should be decoupled: errors. And that will be the topic of the next post.

Links

operators Article's
30 articles in total
Favicon
Essential MySQL Operators and Their Applications
Favicon
Exposing replica nodes in Percona Operator for PostgreSQL
Favicon
It’s just ‘,’ – The Comma Operator
Favicon
Operators, Conditionals and Inputs
Favicon
Practical Guide to Python Conditional Statements
Favicon
Python Operators Demystified
Favicon
SQL Operators Made Easy for Beginners
Favicon
First Steps in SQL Operators: A Beginner's Guide
Favicon
AND / OR operators, Short-Circuiting and Nullish Coalescing in Javascript
Favicon
From Zero to Hero: Disaster Recovery for PostgreSQL with Streaming Replication in Kubernetes
Favicon
Google Search Operators & Usage Tips
Favicon
Operators in C programming
Favicon
MySQL Operators – A Guide
Favicon
Annotations in Kubernetes Operator Design
Favicon
Exploring the unusual: JavaScript arrays and the 'in' operator
Favicon
Install Kubernetes Controllers via Operators - ARGO CD
Favicon
Mastering Advanced JavaScript Operators: The Ultimate Guide
Favicon
Operators in JavaScript: The Fundamentals
Favicon
Dart as, is, is! operatörleri
Favicon
Nullish Coalescing Operator
Favicon
Difference between ? and ?? in JavaScript/Typescript
Favicon
Ordering Event Bus Events with RxJS and concatMap
Favicon
Division, Floor Division and Modulus - Python Arithmetic Operators every beginner should know.
Favicon
Operators in Python
Favicon
Angular - Rxjs - Operator mergeAll
Favicon
Angular - Rxjs - Operator map
Favicon
Swift — 11 Useful Combine Operators You Need to Know
Favicon
Cloud Native CICD Pipelines in OpenShift
Favicon
Kubernetes Operators to realize the dream of Zero-Touch Ops
Favicon
JavaScript Basic - Variable, Data Types, Operators, Comparisons

Featured ones: