Logo

dev-resources.site

for different kinds of informations.

External Merge Problem - Complete Guide for Gophers

Published at
1/11/2025
Categories
go
algorithms
computerscience
programming
Author
oleg_sydorov
Author
12 person written this
oleg_sydorov
open
External Merge Problem - Complete Guide for Gophers

The external sorting problem is a well-known topic in computer science courses and is often used as a teaching tool. However, it's rare to meet someone who has actually implemented a solution to this problem in code for a specific technical scenario, let alone tackled the required optimizations. Encountering this challenge during a hackathon inspired me to write this article.

So, here is the hackathon task:

You have a simple text file with IPv4 addresses. One line is one address, line by line:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 
Enter fullscreen mode Exit fullscreen mode

The file is unlimited in size and can occupy tens and hundreds of gigabytes.

You should calculate the number of unique addresses in this file using as little memory and time as possible. There is a "naive" algorithm for solving this problem (read line by line, put lines into HashSet). It's better if your implementation is more complicated and faster than this naive algorithm.

A 120GB file with 8 billion lines was submitted for parsing.

There were no specific requirements regarding the speed of program execution. However, after quickly reviewing available information on the topic online, I concluded that an acceptable execution time for standard hardware (such as a home PC) would be approximately one hour or less.

For obvious reasons, the file cannot be read and processed in its entirety unless the system has at least 128GB of memory available. But is working with chunks and merging inevitable?

If you are not comfortable implementing an external merge, I suggest you first familiarize yourself with an alternative solution that is acceptable, although far from optimal.

Idea

  • Create a 2^32 bit bitmap. This is a uint64 array, since uint64 contains 64 bits.

  • For each IP:

  1. Parse the string address into four octets: A.B.C.D.
  2. Translate it into a number ipNum = (A << 24) | (B << 16) | (C << 8) | D.
  3. Set the corresponding bit in the bitmap.
  • 1. After reading all the addresses, run through the bitmap and count the number of set bits.

Pros:
Very fast uniqueness detection: setting the bit O(1), no need to check, just set it.

No overhead for hashing, sorting, etc.
Cons:
Huge memory consumption (512 MB for the full IPv4 space, without taking into account overhead).

If the file is huge, but smaller than the full IPv4 space, this can still be advantageous in terms of time, but not always reasonable in terms of memory.

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}
Enter fullscreen mode Exit fullscreen mode

This approach is straightforward and reliable, making it a viable option when no alternatives are available. However, in a production environment—especially when aiming to achieve optimal performance—it's essential to develop a more efficient solution.

Thus, our approach involves chunking, internal merge sorting, and deduplication.

The Principle of Parallelization in External Sorting

  1. Reading and transforming chunks:

The file is split into relatively small parts (chunks), say a few hundred megabytes or a few gigabytes. For each chunk:

  • A goroutine (or a pool of goroutines) is launched, which reads the chunk, parses the IP addresses into numbers and stores them in a temporary array in memory.

  • Then this array is sorted (for example, with the standard sort.Slice), and the result, after removing duplicates, is written to a temporary file.

Since each part can be processed independently, you can run several such handlers in parallel, if you have several CPU cores and sufficient disk bandwidth. This will allow you to use resources as efficiently as possible.

  1. Merge sorted chunks (merge step):

Once all chunks are sorted and written to temporary files, you need to merge these sorted lists into a single sorted stream, removing duplicates:

  • Similar to the external sorting process, you can parallelize the merge by dividing multiple temporary files into groups, merging them in parallel and gradually reducing the number of files.

  • This leaves one large sorted and deduplicated output stream, from which you can calculate the total number of unique IPs.

Advantages of parallelization:

  • Use of multiple CPU cores:
    Single-threaded sorting of a very large array can be slow, but if you have a multi-core processor, you can sort multiple chunks in parallel, speeding up the process several times.

  • Load balancing:

If the chunk sizes are chosen wisely, each chunk can be processed in approximately the same amount of time. If some chunks are larger/smaller or more complex, you can dynamically distribute their processing across different goroutines.

  • IO optimization:

Parallelization allows one chunk to be read while another is being sorted or written, reducing idle time.

Bottom Line

External sorting naturally lends itself to parallelization through file chunking. This approach enables the efficient use of multi-core processors and minimizes IO bottlenecks, resulting in significantly faster sorting and deduplication compared to a single-threaded approach. By distributing the workload effectively, you can achieve high performance even when dealing with massive datasets.

Important consideration:

While reading the file line by line, we can also count the total number of lines. During the process, we perform deduplication in two stages: first during chunking and then during merging. As a result, there’s no need to count the lines in the final output file. Instead, the total number of unique lines can be calculated as:

finalCount := totalLines - (DeletedInChunks + DeletedInMerge)

This approach avoids redundant operations and makes the computation more efficient by keeping track of deletions during each stage of deduplication. This saves us serval minutes.

Оne more thing:

Since any small performance gain matters on huge amounts of data, I suggest using a self-written accelerated analogue of strings.Slice()

func fastSplit(s string) []string {
    n := 1
    c := DelimiterByte

    for i := 0; i < len(s); i++ {
        if s[i] == c {
            n++
        }
    }

    out := make([]string, n)
    count := 0
    begin := 0
    length := len(s) - 1

    for i := 0; i <= length; i++ {
        if s[i] == c {
            out[count] = s[begin:i]
            count++
            begin = i + 1
        }
    }
    out[count] = s[begin : length+1]

    return out
}
Enter fullscreen mode Exit fullscreen mode

Additionally, a worker template was adopted to manage parallel processing, with the number of threads being configurable. By default, the number of threads is set to runtime.NumCPU(), allowing the program to utilize all available CPU cores efficiently. This approach ensures optimal resource usage while also providing flexibility to adjust the number of threads based on the specific requirements or limitations of the environment.

Important Note: When using multithreading, it is crucial to protect shared data to prevent race conditions and ensure the correctness of the program. This can be achieved by using synchronization mechanisms such as mutexes, channels (in Go), or other concurrency-safe techniques, depending on the specific requirements of your implementation.

Summary so far

The implementation of these ideas resulted in code that, when executed on a Ryzen 7700 processor paired with an M.2 SSD, completed the task in approximately 40 minutes.

Considering compression.

The next consideration, based on the volume of data and hence the presence of significant disk operations, was the use of compression. The Brotli algorithm was chosen for compression. Its high compression ratio and efficient decompression make it a suitable choice for reducing disk IO overhead while maintaining good performance during intermediate storage and processing.

Here is the example of chunking with Brotli:

package main

import (
    "fmt"
    "github.com/andybalholm/brotli"
    "os"
    "sort"
)

func processChunk(ips []uint32, chunkIndex int) (string, error) {
    sort.Slice(ips, func(i, j int) bool { return ips[i] < ips[j] })
    ips = deduplicate(ips)

    outFileName := fmt.Sprintf("chunk_%d.tmp", chunkIndex)
    f, err := os.Create(path + outFileName)
    if err != nil {
        return "", err
    }
    defer chkClose(f)

    compressor := brotli.NewWriterLevel(f, 5)
    defer chkClose(compressor)

    for _, ip := range ips {
        fmt.Fprintf(compressor, "%d\n", ip)
    }

    chunkFiles = append(chunkFiles, outFileName)
    return outFileName, nil
}
Enter fullscreen mode Exit fullscreen mode

Results of Using Compression

The effectiveness of compression is debatable and highly dependent on the conditions under which the solution is used. High compression reduces disk space usage but proportionally increases overall execution time. On slow HDDs, compression can provide a significant speed boost, as disk I/O becomes the bottleneck. Conversely, on fast SSDs, compression may lead to slower execution times.

In tests conducted on a system with M.2 SSDs, compression showed no performance improvement. As a result, I ultimately decided to forgo it. However, if you're willing to risk adding complexity to your code and potentially reducing its readability, you could implement compression as an optional feature, controlled by a configurable flag.

What to do next

In pursuit of further optimization, we turn our attention to the binary transformation of our solution. Once the text-based IP addresses are converted into numeric hashes, all subsequent operations can be performed in binary format.

func ipToUint32(ipStr string) (uint32, error) {
    parts := fastSplit(ipStr)
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}
Enter fullscreen mode Exit fullscreen mode

Advantages of the Binary Format

  • Compactness:

Each number occupies a fixed size (e.g., uint32 = 4 bytes).
For 1 million IP addresses, the file size will be only ~4 MB.

  • Fast Processing:

There's no need to parse strings, which speeds up reading and writing operations.

  • Cross-Platform Compatibility:

By using a consistent byte order (either LittleEndian or BigEndian), files can be read across different platforms.

Conclusion
Storing data in binary format is a more efficient method for writing and reading numbers. For complete optimization, convert both the data writing and reading processes to binary format. Use binary.Write for writing and binary.Read for reading.

Here's what the processChunk function might look like to work with binary format:

package main

import (
    "encoding/binary"
    "fmt"
    "os"
    "sort"
)

func processChunk(ips []uint32, chunkIndex int) (string, error) {
    sort.Slice(ips, func(i, j int) bool { return ips[i] < ips[j] })

    ips = deduplicate(ips)

    outFileName := fmt.Sprintf("chunk_%d.tmp", chunkIndex)

    f, err := os.Create(path + outFileName)
    if err != nil {
        return "", err
    }
    defer f.Close()

    for _, ip := range ips {
        err := binary.Write(f, binary.LittleEndian, ip)
        if err != nil {
            return "", fmt.Errorf("failed to write binary data: %w", err)
        }
    }

    chunkFiles = append(chunkFiles, outFileName)

    return outFileName, nil
} 
Enter fullscreen mode Exit fullscreen mode

WTF?! It became much slower!!

In binary format it became slower to work. A file with 100 million lines (IP addresses) is processed in binary form in 4.5 minutes, against 25 seconds in text. With equal chunk size and number of workers. Why?

Working with Binary Format May Be Slower than Text Format
Using binary format can sometimes be slower than text format due to the specifics of how binary.Read and binary.Write operate, as well as potential inefficiencies in their implementation. Here are the main reasons why this might happen:

I/O Operations

  • Text Format:

Works with larger data blocks using bufio.Scanner, which is optimized for reading lines.
Reads entire lines and parses them, which can be more efficient for small conversion operations.

  • Binary Format:

binary.Read reads 4 bytes at a time, resulting in more frequent small I/O operations.
Frequent calls to binary.Read increase overhead from switching between user and system space.

Solution: Use a buffer to read multiple numbers at once.

func processChunk(ips []uint32, chunkIndex int) (string, error) {
    sort.Slice(ips, func(i, j int) bool { return ips[i] < ips[j] })

    ips = deduplicate(ips)

    outFileName := fmt.Sprintf("chunk_%d.tmp", chunkIndex)

    f, err := os.Create(path + outFileName)
    if err != nil {
        return "", err
    }
    defer f.Close()

    bw := bufio.NewWriter(f)

    for _, ip := range ips {
        err := binary.Write(bw, binary.LittleEndian, ip)
        if err != nil {
            return "", fmt.Errorf("failed to write binary data: %w", err)
        }
    }

    err = bw.Flush()
    if err != nil {
        return "", fmt.Errorf("failed to flush buffer: %w", err)
    }

    chunkFiles = append(chunkFiles, outFileName)

    return outFileName, nil
}
Enter fullscreen mode Exit fullscreen mode

Why Does Buffering Improve Performance?

  • Fewer I/O Operations:
    Instead of writing each number directly to disk, data is accumulated in a buffer and written in larger blocks.

  • Reduced Overhead:

Each disk write operation incurs overhead due to context switching between the process and the operating system. Buffering reduces the number of such calls.

We also present the code for binary multiphase merge:

func mergeTwoFiles(fileA, fileB, outFile string) error {
    defer cleanUpChunk(fileA, fileB)

    fa, err := os.Open(fileA)
    if err != nil {
        return err
    }
    defer fa.Close()

    fb, err := os.Open(fileB)
    if err != nil {
        return err
    }
    defer fb.Close()

    fOut, err := os.Create(outFile)
    if err != nil {
        return err
    }
    defer fOut.Close()

    const batchSize = 1024
    bufferA := make([]uint32, batchSize)
    bufferB := make([]uint32, batchSize)

    bw := bufio.NewWriter(fOut)
    defer bw.Flush()

    var indexA, sizeA, indexB, sizeB int
    var lastWritten uint32
    var hasLast bool

    readNextBatch := func(file *os.File, buffer []uint32) (int, error) {
        tempBuffer := make([]byte, len(buffer)*4)
        n, err := file.Read(tempBuffer)
        if err != nil && err != io.EOF {
            return 0, err
        }
        count := n / 4
        for i := 0; i < count; i++ {
            buffer[i] = binary.LittleEndian.Uint32(tempBuffer[i*4 : (i+1)*4])
        }
        return count, nil
    }

    sizeA, err = readNextBatch(fa, bufferA)
    if err != nil {
        return err
    }
    sizeB, err = readNextBatch(fb, bufferB)
    if err != nil {
        return err
    }

    for indexA < sizeA || indexB < sizeB {
        var valA, valB uint32
        hasA := indexA < sizeA
        hasB := indexB < sizeB

        if hasA {
            valA = bufferA[indexA]
        }
        if hasB {
            valB = bufferB[indexB]
        }

        if hasA && (!hasB || valA < valB) {
            if !hasLast || valA != lastWritten {
                binary.Write(bw, binary.LittleEndian, valA)
                lastWritten = valA
                hasLast = true
            }
            indexA++
            if indexA == sizeA {
                sizeA, err = readNextBatch(fa, bufferA)
                if err != nil && err != io.EOF {
                    return err
                }
                indexA = 0
            }
        } else if hasB && (!hasA || valB < valA) {
            if !hasLast || valB != lastWritten {
                binary.Write(bw, binary.LittleEndian, valB)
                lastWritten = valB
                hasLast = true
            }
            indexB++
            if indexB == sizeB { 
                sizeB, err = readNextBatch(fb, bufferB)
                if err != nil && err != io.EOF {
                    return err
                }
                indexB = 0
            }
        } else if hasA && hasB && valA == valB {
            if !hasLast || valA != lastWritten {
                binary.Write(bw, binary.LittleEndian, valA)
                lastWritten = valA
                hasLast = true
                atomic.AddUint64(&DeletedInMerge, 1)
            } else {
                atomic.AddUint64(&DeletedInMerge, 1)
            }
            indexA++
            indexB++
            if indexA == sizeA {
                sizeA, err = readNextBatch(fa, bufferA)
                if err != nil && err != io.EOF {
                    return err
                }
                indexA = 0
            }
            if indexB == sizeB {
                sizeB, err = readNextBatch(fb, bufferB)
                if err != nil && err != io.EOF {
                    return err
                }
                indexB = 0
            }
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

The result is fantastic: 14 min for 110Gb file with 8 billion lines!

Image description

That's an outstanding result! Processing an 110 GB file with 8 billion lines in 14 minutes is indeed impressive. It demonstrates the power of:

  • Buffered I/O:

By processing large chunks of data in memory instead of line-by-line or value-by-value, you drastically reduce the number of I/O operations, which are often the bottleneck.

  • Optimized Binary Processing:

Switching to binary reading and writing minimizes parsing overhead, reduces the size of intermediate data, and improves memory efficiency.

  • Efficient Deduplication:

Using memory-efficient algorithms for deduplication and sorting ensures that CPU cycles are utilized effectively.

  • Parallelism:

Leveraging goroutines and channels to parallelize the workload across workers balances CPU and disk utilization.

Conclusion

Finally, here is the complete code for the final solution. Feel free to use it and adapt it to your needs!

External merge solution for Gophers

Good luck!

computerscience Article's
30 articles in total
Favicon
Binary Made Easy — Understand the Basics
Favicon
Understanding Lists in Python
Favicon
Truth Tables: Foundations and Applications in Logic and Neural Networks
Favicon
LinearBoost: Faster Than XGBoost and LightGBM, Outperforming Them on F1 Score on Seven Famous Benchmark Datasets
Favicon
External Merge Problem - Complete Guide for Gophers
Favicon
From 0 to O(n): A Beginner's Guide to Big O Notation
Favicon
Here’s why Julia is THE language in scientific programming
Favicon
Securing C++ iostream: Key Vulnerabilities and Mitigation Strategies
Favicon
Why Your Brain Ghosts Most of Your Memories!?
Favicon
I am currently reducing at least 22 proofs by at least 99312 steps total.
Favicon
Guide to TCP/IP Ports and UDP Ports
Favicon
Important Port Numbers in Networking and Open-Source Projects
Favicon
A Practical Approach to Problem-Solving in JavaScript
Favicon
CS50 - Week 6
Favicon
Quintum Computing And History
Favicon
Relational Database Design: DBMS
Favicon
🚀 Say Hello to PaperLens! 🔎
Favicon
Database Scaling NLogN 📈
Favicon
LeetCode Meditations: Sum of Two Integers
Favicon
Is there an ethics issue in computer science? (EPQ)
Favicon
Confusion about coding field after introduced chatgpt and other AI models AI can make itself code and also make websites and apps etc. We have a carrier confusion because I am a BTech 1st year computer science student. Please help.
Favicon
Kubernetes Gateway API
Favicon
Recursion it is : LeetCode 494
Favicon
Simplifying the Cloud: Fundamentals of Cloud Computing with a Focus on AWS
Favicon
Trees in SQL
Favicon
Trees in SQL (part 2)
Favicon
Behavioural Analysis models for a project
Favicon
Choosing the Right Excel Consultant: Boost Efficiency and Productivity
Favicon
Pointers in C++: Memory Management, Arrays, and Smart Pointers
Favicon
Designing for Durability: How Precision Engineering Creates Tools That Last

Featured ones: