Logo

dev-resources.site

for different kinds of informations.

External Merge Problem - Complete Guide for Gophers

Published at
1/11/2025
Categories
go
algorithms
computerscience
programming
Author
oleg_sydorov
Author
12 person written this
oleg_sydorov
open
External Merge Problem - Complete Guide for Gophers

The external sorting problem is a well-known topic in computer science courses and is often used as a teaching tool. However, it's rare to meet someone who has actually implemented a solution to this problem in code for a specific technical scenario, let alone tackled the required optimizations. Encountering this challenge during a hackathon inspired me to write this article.

So, here is the hackathon task:

You have a simple text file with IPv4 addresses. One line is one address, line by line:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
Enter fullscreen mode Exit fullscreen mode

The file is unlimited in size and can occupy tens and hundreds of gigabytes.

You should calculate the number of unique addresses in this file using as little memory and time as possible. There is a "naive" algorithm for solving this problem (read line by line, put lines into HashSet). It's better if your implementation is more complicated and faster than this naive algorithm.

A 120GB file with 8 billion lines was submitted for parsing.

There were no specific requirements regarding the speed of program execution. However, after quickly reviewing available information on the topic online, I concluded that an acceptable execution time for standard hardware (such as a home PC) would be approximately one hour or less.

For obvious reasons, the file cannot be read and processed in its entirety unless the system has at least 128GB of memory available. But is working with chunks and merging inevitable?

If you are not comfortable implementing an external merge, I suggest you first familiarize yourself with an alternative solution that is acceptable, although far from optimal.

Idea

  • Create a 2^32 bit bitmap. This is a uint64 array, since uint64 contains 64 bits.

  • For each IP:

  1. Parse the string address into four octets: A.B.C.D.
  2. Translate it into a number ipNum = (A << 24) | (B << 16) | (C << 8) | D.
  3. Set the corresponding bit in the bitmap.
  • 1. After reading all the addresses, run through the bitmap and count the number of set bits.

Pros:
Very fast uniqueness detection: setting the bit O(1), no need to check, just set it.

No overhead for hashing, sorting, etc.
Cons:
Huge memory consumption (512 MB for the full IPv4 space, without taking into account overhead).

If the file is huge, but smaller than the full IPv4 space, this can still be advantageous in terms of time, but not always reasonable in terms of memory.

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}
Enter fullscreen mode Exit fullscreen mode

This approach is straightforward and reliable, making it a viable option when no alternatives are available. However, in a production environment—especially when aiming to achieve optimal performance—it's essential to develop a more efficient solution.

Thus, our approach involves chunking, internal merge sorting, and deduplication.

The Principle of Parallelization in External Sorting

  1. Reading and transforming chunks:

The file is split into relatively small parts (chunks), say a few hundred megabytes or a few gigabytes. For each chunk:

  • A goroutine (or a pool of goroutines) is launched, which reads the chunk, parses the IP addresses into numbers and stores them in a temporary array in memory.

  • Then this array is sorted (for example, with the standard sort.Slice), and the result, after removing duplicates, is written to a temporary file.

Since each part can be processed independently, you can run several such handlers in parallel, if you have several CPU cores and sufficient disk bandwidth. This will allow you to use resources as efficiently as possible.

  1. Merge sorted chunks (merge step):

Once all chunks are sorted and written to temporary files, you need to merge these sorted lists into a single sorted stream, removing duplicates:

  • Similar to the external sorting process, you can parallelize the merge by dividing multiple temporary files into groups, merging them in parallel and gradually reducing the number of files.

  • This leaves one large sorted and deduplicated output stream, from which you can calculate the total number of unique IPs.

Advantages of parallelization:

  • Use of multiple CPU cores:
    Single-threaded sorting of a very large array can be slow, but if you have a multi-core processor, you can sort multiple chunks in parallel, speeding up the process several times.

  • Load balancing:

If the chunk sizes are chosen wisely, each chunk can be processed in approximately the same amount of time. If some chunks are larger/smaller or more complex, you can dynamically distribute their processing across different goroutines.

  • IO optimization:

Parallelization allows one chunk to be read while another is being sorted or written, reducing idle time.

Bottom Line

External sorting naturally lends itself to parallelization through file chunking. This approach enables the efficient use of multi-core processors and minimizes IO bottlenecks, resulting in significantly faster sorting and deduplication compared to a single-threaded approach. By distributing the workload effectively, you can achieve high performance even when dealing with massive datasets.

Important consideration:

While reading the file line by line, we can also count the total number of lines. During the process, we perform deduplication in two stages: first during chunking and then during merging. As a result, there’s no need to count the lines in the final output file. Instead, the total number of unique lines can be calculated as:

finalCount := totalLines - (DeletedInChunks + DeletedInMerge)

This approach avoids redundant operations and makes the computation more efficient by keeping track of deletions during each stage of deduplication. This saves us serval minutes.

Оne more thing:

Since any small performance gain matters on huge amounts of data, I suggest using a self-written accelerated analogue of strings.Slice()

func fastSplit(s string) []string {
    n := 1
    c := DelimiterByte

    for i := 0; i < len(s); i++ {
        if s[i] == c {
            n++
        }
    }

    out := make([]string, n)
    count := 0
    begin := 0
    length := len(s) - 1

    for i := 0; i <= length; i++ {
        if s[i] == c {
            out[count] = s[begin:i]
            count++
            begin = i + 1
        }
    }
    out[count] = s[begin : length+1]

    return out
}
Enter fullscreen mode Exit fullscreen mode

Additionally, a worker template was adopted to manage parallel processing, with the number of threads being configurable. By default, the number of threads is set to runtime.NumCPU(), allowing the program to utilize all available CPU cores efficiently. This approach ensures optimal resource usage while also providing flexibility to adjust the number of threads based on the specific requirements or limitations of the environment.

Important Note: When using multithreading, it is crucial to protect shared data to prevent race conditions and ensure the correctness of the program. This can be achieved by using synchronization mechanisms such as mutexes, channels (in Go), or other concurrency-safe techniques, depending on the specific requirements of your implementation.

Summary so far

The implementation of these ideas resulted in code that, when executed on a Ryzen 7700 processor paired with an M.2 SSD, completed the task in approximately 40 minutes.

Considering compression.

The next consideration, based on the volume of data and hence the presence of significant disk operations, was the use of compression. The Brotli algorithm was chosen for compression. Its high compression ratio and efficient decompression make it a suitable choice for reducing disk IO overhead while maintaining good performance during intermediate storage and processing.

Here is the example of chunking with Brotli:

package main

import (
    "fmt"
    "github.com/andybalholm/brotli"
    "os"
    "sort"
)

func processChunk(ips []uint32, chunkIndex int) (string, error) {
    sort.Slice(ips, func(i, j int) bool { return ips[i] < ips[j] })
    ips = deduplicate(ips)

    outFileName := fmt.Sprintf("chunk_%d.tmp", chunkIndex)
    f, err := os.Create(path + outFileName)
    if err != nil {
        return "", err
    }
    defer chkClose(f)

    compressor := brotli.NewWriterLevel(f, 5)
    defer chkClose(compressor)

    for _, ip := range ips {
        fmt.Fprintf(compressor, "%d\n", ip)
    }

    chunkFiles = append(chunkFiles, outFileName)
    return outFileName, nil
}
Enter fullscreen mode Exit fullscreen mode

Results of Using Compression

The effectiveness of compression is debatable and highly dependent on the conditions under which the solution is used. High compression reduces disk space usage but proportionally increases overall execution time. On slow HDDs, compression can provide a significant speed boost, as disk I/O becomes the bottleneck. Conversely, on fast SSDs, compression may lead to slower execution times.

In tests conducted on a system with M.2 SSDs, compression showed no performance improvement. As a result, I ultimately decided to forgo it. However, if you're willing to risk adding complexity to your code and potentially reducing its readability, you could implement compression as an optional feature, controlled by a configurable flag.

What to do next

In pursuit of further optimization, we turn our attention to the binary transformation of our solution. Once the text-based IP addresses are converted into numeric hashes, all subsequent operations can be performed in binary format.

func ipToUint32(ipStr string) (uint32, error) {
    parts := fastSplit(ipStr)
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}
Enter fullscreen mode Exit fullscreen mode

Advantages of the Binary Format

  • Compactness:

Each number occupies a fixed size (e.g., uint32 = 4 bytes).
For 1 million IP addresses, the file size will be only ~4 MB.

  • Fast Processing:

There's no need to parse strings, which speeds up reading and writing operations.

  • Cross-Platform Compatibility:

By using a consistent byte order (either LittleEndian or BigEndian), files can be read across different platforms.

Conclusion
Storing data in binary format is a more efficient method for writing and reading numbers. For complete optimization, convert both the data writing and reading processes to binary format. Use binary.Write for writing and binary.Read for reading.

Here's what the processChunk function might look like to work with binary format:

package main

import (
    "encoding/binary"
    "fmt"
    "os"
    "sort"
)

func processChunk(ips []uint32, chunkIndex int) (string, error) {
    sort.Slice(ips, func(i, j int) bool { return ips[i] < ips[j] })

    ips = deduplicate(ips)

    outFileName := fmt.Sprintf("chunk_%d.tmp", chunkIndex)

    f, err := os.Create(path + outFileName)
    if err != nil {
        return "", err
    }
    defer f.Close()

    for _, ip := range ips {
        err := binary.Write(f, binary.LittleEndian, ip)
        if err != nil {
            return "", fmt.Errorf("failed to write binary data: %w", err)
        }
    }

    chunkFiles = append(chunkFiles, outFileName)

    return outFileName, nil
} 
Enter fullscreen mode Exit fullscreen mode

WTF?! It became much slower!!

In binary format it became slower to work. A file with 100 million lines (IP addresses) is processed in binary form in 4.5 minutes, against 25 seconds in text. With equal chunk size and number of workers. Why?

Working with Binary Format May Be Slower than Text Format
Using binary format can sometimes be slower than text format due to the specifics of how binary.Read and binary.Write operate, as well as potential inefficiencies in their implementation. Here are the main reasons why this might happen:

I/O Operations

  • Text Format:

Works with larger data blocks using bufio.Scanner, which is optimized for reading lines.
Reads entire lines and parses them, which can be more efficient for small conversion operations.

  • Binary Format:

binary.Read reads 4 bytes at a time, resulting in more frequent small I/O operations.
Frequent calls to binary.Read increase overhead from switching between user and system space.

Solution: Use a buffer to read multiple numbers at once.

func processChunk(ips []uint32, chunkIndex int) (string, error) {
    sort.Slice(ips, func(i, j int) bool { return ips[i] < ips[j] })

    ips = deduplicate(ips)

    outFileName := fmt.Sprintf("chunk_%d.tmp", chunkIndex)

    f, err := os.Create(path + outFileName)
    if err != nil {
        return "", err
    }
    defer f.Close()

    bw := bufio.NewWriter(f)

    for _, ip := range ips {
        err := binary.Write(bw, binary.LittleEndian, ip)
        if err != nil {
            return "", fmt.Errorf("failed to write binary data: %w", err)
        }
    }

    err = bw.Flush()
    if err != nil {
        return "", fmt.Errorf("failed to flush buffer: %w", err)
    }

    chunkFiles = append(chunkFiles, outFileName)

    return outFileName, nil
}
Enter fullscreen mode Exit fullscreen mode

Why Does Buffering Improve Performance?

  • Fewer I/O Operations:
    Instead of writing each number directly to disk, data is accumulated in a buffer and written in larger blocks.

  • Reduced Overhead:

Each disk write operation incurs overhead due to context switching between the process and the operating system. Buffering reduces the number of such calls.

We also present the code for binary multiphase merge:

func mergeTwoFiles(fileA, fileB, outFile string) error {
    defer cleanUpChunk(fileA, fileB)

    fa, err := os.Open(fileA)
    if err != nil {
        return err
    }
    defer fa.Close()

    fb, err := os.Open(fileB)
    if err != nil {
        return err
    }
    defer fb.Close()

    fOut, err := os.Create(outFile)
    if err != nil {
        return err
    }
    defer fOut.Close()

    const batchSize = 1024
    bufferA := make([]uint32, batchSize)
    bufferB := make([]uint32, batchSize)

    bw := bufio.NewWriter(fOut)
    defer bw.Flush()

    var indexA, sizeA, indexB, sizeB int
    var lastWritten uint32
    var hasLast bool

    readNextBatch := func(file *os.File, buffer []uint32) (int, error) {
        tempBuffer := make([]byte, len(buffer)*4)
        n, err := file.Read(tempBuffer)
        if err != nil && err != io.EOF {
            return 0, err
        }
        count := n / 4
        for i := 0; i < count; i++ {
            buffer[i] = binary.LittleEndian.Uint32(tempBuffer[i*4 : (i+1)*4])
        }
        return count, nil
    }

    sizeA, err = readNextBatch(fa, bufferA)
    if err != nil {
        return err
    }
    sizeB, err = readNextBatch(fb, bufferB)
    if err != nil {
        return err
    }

    for indexA < sizeA || indexB < sizeB {
        var valA, valB uint32
        hasA := indexA < sizeA
        hasB := indexB < sizeB

        if hasA {
            valA = bufferA[indexA]
        }
        if hasB {
            valB = bufferB[indexB]
        }

        if hasA && (!hasB || valA < valB) {
            if !hasLast || valA != lastWritten {
                binary.Write(bw, binary.LittleEndian, valA)
                lastWritten = valA
                hasLast = true
            }
            indexA++
            if indexA == sizeA {
                sizeA, err = readNextBatch(fa, bufferA)
                if err != nil && err != io.EOF {
                    return err
                }
                indexA = 0
            }
        } else if hasB && (!hasA || valB < valA) {
            if !hasLast || valB != lastWritten {
                binary.Write(bw, binary.LittleEndian, valB)
                lastWritten = valB
                hasLast = true
            }
            indexB++
            if indexB == sizeB { 
                sizeB, err = readNextBatch(fb, bufferB)
                if err != nil && err != io.EOF {
                    return err
                }
                indexB = 0
            }
        } else if hasA && hasB && valA == valB {
            if !hasLast || valA != lastWritten {
                binary.Write(bw, binary.LittleEndian, valA)
                lastWritten = valA
                hasLast = true
                atomic.AddUint64(&DeletedInMerge, 1)
            } else {
                atomic.AddUint64(&DeletedInMerge, 1)
            }
            indexA++
            indexB++
            if indexA == sizeA {
                sizeA, err = readNextBatch(fa, bufferA)
                if err != nil && err != io.EOF {
                    return err
                }
                indexA = 0
            }
            if indexB == sizeB {
                sizeB, err = readNextBatch(fb, bufferB)
                if err != nil && err != io.EOF {
                    return err
                }
                indexB = 0
            }
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

The result is fantastic: 14 min for 110Gb file with 8 billion lines!

Image description

That's an outstanding result! Processing an 110 GB file with 8 billion lines in 14 minutes is indeed impressive. It demonstrates the power of:

  • Buffered I/O:

By processing large chunks of data in memory instead of line-by-line or value-by-value, you drastically reduce the number of I/O operations, which are often the bottleneck.

  • Optimized Binary Processing:

Switching to binary reading and writing minimizes parsing overhead, reduces the size of intermediate data, and improves memory efficiency.

  • Efficient Deduplication:

Using memory-efficient algorithms for deduplication and sorting ensures that CPU cycles are utilized effectively.

  • Parallelism:

Leveraging goroutines and channels to parallelize the workload across workers balances CPU and disk utilization.

Conclusion

Finally, here is the complete code for the final solution. Feel free to use it and adapt it to your needs!

External merge solution for Gophers

Good luck!

go Article's
30 articles in total
Favicon
A técnica dos dois ponteiros
Favicon
Preventing SQL Injection with Raw SQL and ORM in Golang
Favicon
🐹 Golang Integration with Kafka and Uber ZapLog 📨
Favicon
🌐 Building Golang RESTful API with Gin, MongoDB 🌱
Favicon
Golang e DSA
Favicon
Prevent Race Conditions Like a Pro with sync.Mutex in Go!
Favicon
tnfy.link - What's about ID?
Favicon
Developing a Simple RESTful API with Gin, ginvalidator, and validatorgo
Favicon
Desbravando Go: Capítulo 1 – Primeiros Passos na Linguagem
Favicon
Compile-Time Assertions in Go (Golang)
Favicon
Mastering GoFrame Logging: From Zero to Hero
Favicon
GoFr: An Opinionated Microservice Development Framework
Favicon
The Struggle of Finding a Free Excel to PDF Converter: My Journey and Solution
Favicon
Setting Up Your Go Environment
Favicon
External Merge Problem - Complete Guide for Gophers
Favicon
Mastering Go's encoding/json: Efficient Parsing Techniques for Optimal Performance
Favicon
Golang with Colly: Use Random Fake User-Agents When Scraping
Favicon
Versioning in Go Huma
Favicon
Go Basics: Syntax and Structure
Favicon
Interesting feedback on Fuego!
Favicon
Making Beautiful API Keys
Favicon
Building a Semantic Search Engine with OpenAI, Go, and PostgreSQL (pgvector)
Favicon
Go's Concurrency Decoded: Goroutine Scheduling
Favicon
Golang: Struct, Interface And Dependency Injection(DI)
Favicon
Desvendando Subprocessos: Criando um Bot de Música com Go
Favicon
go
Favicon
🚀 New Article Alert: Master sync.Pool in Golang! 🚀
Favicon
Week Seven Recap of #100DaysOfCode
Favicon
Ore: Advanced Dependency Injection Package for Go
Favicon
Golang vs C++: A Modern Alternative for High-Performance Applications

Featured ones: