Logo

dev-resources.site

for different kinds of informations.

Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.

Published at
9/22/2021
Categories
sip3
kotlin
vertx
performance
Author
agafox
Categories
4 categories in total
sip3
open
kotlin
open
vertx
open
performance
open
Author
6 person written this
agafox
open
Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.

I've been working with the Vert.x framework for more than 4 years but I won't stop being excited how simple, lightweight and elegant it is (especially the event loop thread model). In this blog post I will tell you how we implemented PeriodicallyExpiringHashMap data structure in less than 100 lines of code. But first let me give you a bit of a context about why do we need it.

Problem

SIP3 is a very advanced VoIP monitoring and troubleshooting platform. To provide detailed information about calls quality we need to:

  1. Aggregate RTP packets into RTP streams in a real-time
  2. Periodically walk though all the RTP streams and terminate ones that haven't been updated for a certain period of time.

Let's stay away from telecom specific and take a look at a simplified code example:

class RtpStreamHandler : AbstractVerticle() {

    var expirationDelay: Long = 1000
    var aggregationTimeout: Long = 30000

    private val rtpStreams = mutableMapOf<String, RtpStream>()

    override fun start() {
        vertx.setPeriodic(expirationDelay) {
            val now = System.currentTimeMillis()

            rtpStreams.filterValues { rtpStream -> rtpStream.updatedAt + aggregationTimeout < now }
                .forEach { (rtpStreamId, rtpStream) ->
                    terminateRtpStream(rtpStream)
                    rtpStreams.remove(rtpStreamId)
                }
        }

        vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
            val rtpPacket = event.body()
            handleRtpPacket(rtpPacket)
        }
    }

    fun handleRtpPacket(rtpPacket: RtpPacket) {
        val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
        rtpStream.addPacket(rtpPacket)
    }

    fun terminateRtpStream(rtpStream: RtpStream) {
        vertx.eventBus().localSend("on_rtp_stream", rtpStream)
    }
}
Enter fullscreen mode Exit fullscreen mode

Now let's imagine that we constantly have 30K of active RTP streams. Also every second we terminate approximately a thousand of old steams but get a thousand of new ones instead. In these circumstances our code doesn't look very efficient and we certainly need a better solution.

Solution

As you can see from the first code snippet once an RTP stream was updated it won't be terminated at least for the next aggregationTimeout. This means that we can simply do not bother about it for some time.

And this is the key idea behind the SIP3 PeriodicallyExpiringHashMap implementation:

class PeriodicallyExpiringHashMap<K, V> private constructor(
    vertx: Vertx,
    private val delay: Long,
    private val period: Int,
    private val expireAt: (K, V) -> Long,
    private val onExpire: (K, V) -> Unit
) {

    private val objects = mutableMapOf<K, V>()
    private val expiringSlots = (0 until period).map { mutableMapOf<K, V>() }.toList()
    private var expiringSlotIdx = 0

    init {
        vertx.setPeriodic(delay) {
            terminateExpiringSlot()
            expiringSlotIdx += 1
            if (expiringSlotIdx >= period) {
                expiringSlotIdx = 0
            }
        }
    }

    fun getOrPut(key: K, defaultValue: () -> V): V {
        return objects.getOrPut(key) {
            defaultValue.invoke().also { expiringSlots[expiringSlotIdx][key] = it }
        }
    }

    private fun terminateExpiringSlot() {
        val now = System.currentTimeMillis()

        expiringSlots[expiringSlotIdx].apply {
            forEach { (k, v) ->
                val expireAt = expireAt(k, v)

                when {
                    expireAt <= now -> {
                        objects.remove(k)?.let { onExpire(k, it) }
                    }
                    else -> {
                        var shift = ((expireAt - now) / delay).toInt() + 1
                        if (shift >= period) {
                            shift = period - 1
                        }
                        val nextExpiringSlotIdx = (expiringSlotIdx + shift) % period

                        expiringSlots[nextExpiringSlotIdx][k] = v
                    }
                }
            }
            clear()
        }
    }

    data class Builder<K, V>(
        var delay: Long = 1000,
        var period: Int = 60,
        var expireAt: (K, V) -> Long = { _: K, _: V -> Long.MAX_VALUE },
        var onExpire: (K, V) -> Unit = { _: K, _: V -> }
    ) {
        fun delay(delay: Long) = apply { this.delay = delay }
        fun period(period: Int) = apply { this.period = period }
        fun expireAt(expireAt: (K, V) -> Long) = apply { this.expireAt = expireAt }
        fun onExpire(onExpire: (K, V) -> Unit) = apply { this.onExpire = onExpire }

        fun build(vertx: Vertx) = PeriodicallyExpiringHashMap(vertx, delay, period, expireAt, onExpire)
    }
}
Enter fullscreen mode Exit fullscreen mode

Here are the benefits of this data structure:

  1. Now we just have a bunch of time slots. So, instead of walking through all the objects in our map every expirationDelay we can walk trough a single slot. So, instead of checking on 30K objects every second we will check on 1K only.
  2. We don't need to create a copy of original map every time we decide to walk though it. In the previous example it also was an issue, because rtpSteams.filtervalues creates a copy of the original map.
  3. The last and the most important. Our implementation will stay consistent within a particular verticle context. That means you can simply extend it and implement the rest of the methods (including tricky ones, like size()).

Conclusions

Finally let's see how our verticle will look like with the new PeriodicallyExpiringHashMap data structure:

class RtpStreamHandler : AbstractVerticle() {

    var expirationDelay: Long = 1000
    var aggregationTimeout: Long = 30000

    private lateinit var rtpStreams: PeriodicallyExpiringHashMap<String, RtpStream>

    override fun start() {
        rtpStreams = PeriodicallyExpiringHashMap.Builder<String, RtpStream>()
            .delay(expirationDelay)
            .period((aggregationTimeout / expirationDelay).toInt())
            .expireAt { _, rtpStream -> rtpStream.updatedAt + aggregationTimeout }
            .onExpire { _, rtpStream -> terminateRtpStream(rtpStream) }
            .build(vertx)

        vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
            val rtpPacket = event.body()
            handleRtpPacket(rtpPacket)
        }
    }

    fun handleRtpPacket(rtpPacket: RtpPacket) {
        val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
        rtpStream.addPacket(rtpPacket)
    }

    fun terminateRtpStream(rtpStream: RtpStream) {
        vertx.eventBus().localSend("on_rtp_stream", rtpStream)
    }
}
Enter fullscreen mode Exit fullscreen mode

And here are the load tests results (purple - is our new implementation):

Load Tests

The tests look great and the code looks clean and simple thanks to the Vert.x event loop thread model.

๐Ÿ‘จโ€๐Ÿ’ป Happy coding,
Your SIP3 team.

vertx Article's
28 articles in total
Favicon
Error handlers and failure handlers in Vert.x
Favicon
Why we discarded Reactive systems architecture from our code?
Favicon
Build web application in Vert.x [Part 1/ โ™พ๏ธ]
Favicon
Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.
Favicon
Surprising Qualities of Event Driven System
Favicon
Idiomatic Kotlin Abstractions for the Vert.x EventBus
Favicon
Vert.x Circuit Breaker
Favicon
Writing Async Tests for Vert.x using Kotlin
Favicon
Reducing Boilerplate in Vert.x Tests written in Kotlin
Favicon
Writing Vert.x Integration Tests with Kotlin & Testcontainers
Favicon
Quarkus: Entendendo a relaรงรฃo entre o Mutiny e o Vert.x
Favicon
HTTPS Client Certificate Authentication With Java
Favicon
Throttle HTTP requests on paged resources with Vert.x
Favicon
Supercharge Your Kotlin Vert.x Application with EventBus Extensions
Favicon
Handle backpressure between Kafka and a database with Vert.x
Favicon
Handling unknown JSON structures
Favicon
Introduction to Vert.x
Favicon
Future Composition in Vert.x
Favicon
How to extend Vert.x EventBus API to save on serialization.
Favicon
How to write beautiful unit tests in Vert.x
Favicon
Scaling Vert.x application for session dependent data processing.
Favicon
KVision v3.7.0 is released (with Vert.x support)
Favicon
Reactive Java using the Vert.x toolkit
Favicon
Vert.x Kotlin Coroutines
Favicon
How we built a RESTful API with Vert.x, Kotlin Coroutines and Keycloak
Favicon
vertx-jooq 2.4 released
Favicon
Sirix - Released 0.9.1 (time travel queries and versioning made easy)
Favicon
Reactive Programming with Kotlin - Quick Intro to Vert.x

Featured ones: