Logo

dev-resources.site

for different kinds of informations.

Building pipelines with IAsyncEnumerable in .NET

Published at
8/27/2024
Categories
dotnet
async
pipelines
streaming
Author
nikiforovall
Categories
4 categories in total
dotnet
open
async
open
pipelines
open
streaming
open
Author
12 person written this
nikiforovall
open
Building pipelines with IAsyncEnumerable in .NET

TL;DR

This article demonstrates how to use IAsyncEnumerable and System.Linq.Async to build pipelines in C#.

Source code : https://github.com/NikiforovAll/async-enumerable-pipelines

You can see all demos by running:

dotnet example --list
Enter fullscreen mode Exit fullscreen mode

Here’s a sneak peek 👀:

// TextSummarizationAndAggregationPipeline
var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .ReportProgress()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .SelectAwait(Summarize)
    .WriteResultToFile(path: Path.Combine(Path.GetTempPath(), "summaries.txt"))
    .ForEachAsync(x => AnsiConsole.MarkupLine($"Processed [green]{x.Name}[/]"));

Enter fullscreen mode Exit fullscreen mode

Introduction

Pipelines are a powerful way to process data in a streaming fashion. They are a series of stages that transform data from one form to another. In this article, we will explore how to build pipelines using IAsyncEnumerable and System.Linq.Async.

Pipelines are a common pattern in modern software development. They are used to process data in a streaming fashion, which can be more efficient than processing it all at once. Pipelines are also composable, meaning that you can combine multiple stages together to create more complex processing logic.

💡 I already describe an approach to building pipelines in my previous blog post, you might want to take a look at Building pipelines with System.Threading.Channels. Both System.Threading.Channels and IAsyncEnumerable provide powerful tools for managing asynchronous data streams in .NET. However, while System.Threading.Channels offers a more explicit approach to handling producer-consumer scenarios, IAsyncEnumerable brings a more integrated and LINQ-friendly way to work with asynchronous sequences. Understanding the strengths and nuances of each can help you choose the right tool for your specific use case.

Examples

There are many interesting concepts that I’m going to cover in this article. Let’s start with the basics.

Using System.Linq.Async operators to build a pipeline

Let’s say we want to build a pipeline that reads files from a directory, parses them, and counts the number of words in each file.

This can be illustrated as follows:

graph LR; A[Read files] --> B[Parse files]; B --> C[Count words]; C --> D[Output results];

🎯 Our goal is to represent each stage of the pipeline in the code using IAsyncEnumerable and System.Linq.Async.

System.Linq.Async is a library that provides asynchronous versions of LINQ operators. It allows you to work with IAsyncEnumerable in a similar way to how you would work with IEnumerable. It makes it easy to build pipelines.

Basically, you have control-flow described as chain of methods calls, and you can implement each stage of the pipeline as a separate method. In my opinion, it makes the code more readable and maintainable. The benefit of this approach is once you determine the stages of the pipeline, you can implement them independently and focus on the logic of each stage.

The process of parsing files can be implemented as follows:

var path = Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "Data");

var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .Select(CalculateWordCount)
    .OrderByDescending(x => x.WordCount)
    .ForEachAsync(Console.WriteLine);

await pipeline;
Enter fullscreen mode Exit fullscreen mode

Everything starts with the conversion of IEnumerable<string> file paths to IAsyncEnumerable<string>.

💡 Alternatively, we could write our own method that returns IAsyncEnumerable, for example, we could easily swap the local file system with Azure Blob Storage. It means we can reuse the same pipeline with different data sources.

💡 Later, we will see that not only IEnumerable can be converted to IAsyncEnumerable, but also IObservable.

As you can see System.Linq.Async provides a set of extension methods that allow you to work with IAsyncEnumerable in a similar way to how you would work with IEnumerable. The SelectAwait method is used to asynchronously project each element of the sequence. The Where method is used to filter elements based on a predicate. The OrderByDescending method is used to sort the elements of the sequence in descending order. The ForEachAsync method is used to asynchronously iterate over the sequence.

It worth to point out that ForEachAsync is a terminal operation that triggers the execution of the pipeline. It is important to remember that IAsyncEnumerable is a cold sequence, meaning that it does not start processing until you start iterating over it.

Here are the building blocks of the pipeline:

public static class Steps
{
    public static async ValueTask<FilePayload> ReadFile(string file)
    {
        var content = await File.ReadAllTextAsync(file);
        var name = Path.GetFileName(file);

        return new FilePayload(name, content);
    }

    public static bool IsValidFileForProcessing(FilePayload file) =>
        file is { Content.Length: > 0, Name: [.., 't', 'x', 't'] };

    public static WordCountPayload CalculateWordCount(FilePayload payload)
    {
        var words = payload.Content.Split(' ');

        return new(payload.Name, words.Length);
    }
}

public record FilePayload(string Name, string Content);
public record WordCountPayload(string Name, int WordCount);
Enter fullscreen mode Exit fullscreen mode

Let’s see the pipeline in action:

Combining IAsyncEnumerable with IObservable

Let’s say we want to use simple file watcher to monitor changes in the directory and trigger the pipeline when a new file is created or an existing file is modified.

var fileWatcher = CreateFileObservable(path);

var pipeline = fileWatcher
    .TakeUntil(DateTimeOffset.Now.AddSeconds(15))
    .ToAsyncEnumerable()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .Select(CalculateWordCount)
    .ForEachAsync(Console.WriteLine);
Enter fullscreen mode Exit fullscreen mode

In this example, we use IObservable to monitor changes in the directory. We create an observable sequence of file paths using the CreateFileObservable method. We then use the TakeUntil operator to limit the duration of the sequence to 15 seconds. We convert the observable sequence to an asynchronous enumerable sequence using the ToAsyncEnumerable method. We then apply the same pipeline as before to process the files.

The CreateFileObservable method is implemented as follows:

static IObservable<string> CreateFileObservable(string path) =>
    Observable.Create<string>(observer =>
    {
        var watcher = new FileSystemWatcher(path)
        {
            NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite,
            Filter = "*.*",
            EnableRaisingEvents = true
        };

        void onChanged(object sender, FileSystemEventArgs e)
        {
            try
            {
                observer.OnNext(e.FullPath);
            }
            catch (Exception ex)
            {
                observer.OnError(ex);
            }
        }

        watcher.Created += onChanged;
        watcher.Changed += onChanged;

        return () =>
        {
            watcher.Created -= onChanged;
            watcher.Changed -= onChanged;
            watcher.Dispose();
        };
    });
Enter fullscreen mode Exit fullscreen mode

Let’s see the pipeline in action:

In the demo below, I’m appending “ word” to the end of the file content to trigger the pipeline.

Implementing reusable operators - Batch

Let’s say we want to batch the processing of files to improve performance. We can implement a custom operator called Batch that groups elements of the sequence into batches of a specified size.

In the example above, we are reading files in batches in parallel. We are using the Batch operator to group files into batches of size 2. We then process each batch in parallel using the ProcessEachAsync method.

const int batchSize = 2;

var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .Batch<string, FilePayload>(batchSize)
    .ProcessEachAsync(ReadFile)
    .Where(IsValidFileForProcessing)
    .Select(CalculateWordCount)
    .OrderByDescending(x => x.WordCount)
    .ForEachAsync(Console.WriteLine);

await pipeline;
Enter fullscreen mode Exit fullscreen mode

In the example above, we are reading files in batches in parallel. We are using the Batch operator to group files into batches of size 2. We then process each batch in parallel using the ProcessEachAsync method.

💡 I will leave the implementation of the Batch operator as an exercise for the reader. Please check source code for the full implementation. https://github.com/NikiforovAll/async-enumerable-pipelines/blob/main/Pipelines.Core/PipelineBuilderExtensions.cs

Let’s see the pipeline in action:

Implementing reusable domain-specific operators - TextSummarization with Semantic Kernel

To demonstrate something more complex, let’s say we want to summarize the content of the files using the Semantic Kernel library. Summarization is a common task in natural language processing (NLP) that involves generating a concise representation of a text document.

var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .ReportProgress()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .SelectAwait(Summarize)
    .WriteResultToFile(path: Path.Combine(Path.GetTempPath(), "summaries.txt"))
    .ForEachAsync(x => AnsiConsole.MarkupLine($"Processed [green]{x.Name}[/]"));
Enter fullscreen mode Exit fullscreen mode

In the example above, we are reading files, summarizing their content, and writing the results to a file. We are using the ReportProgress operator to report progress as each file is processed. We are using the Summarize operator to summarize the content of each file. We are using the WriteResultToFile operator to write the results to a file.

Before we move forward, let’s see how the pipeline works in the demo below:

Now, we are ready to move forward and see the details of the implementation.

The Summarize method is implemented as follows:

async ValueTask<SummarizationPayload> Summarize(FilePayload file)
{
    var prompt = """

        Please summarize the content above in 20 words or less:

        The output format should be: [title]: [summary]
        """;

    var result = await kernel.InvokePromptAsync(prompt, new KernelArguments() { ["input"] = file.Content });

    return new(file.Name, result.ToString());
}
Enter fullscreen mode Exit fullscreen mode

Then we want to write the results into a file:

public static async IAsyncEnumerable<SummarizationPayload> WriteResultToFile(
    this IAsyncEnumerable<SummarizationPayload> values,
    string path
)
{
    const int batchSize = 10;

    using var streamWriter = new StreamWriter(path, append: true);

    await foreach (var batch in values.Buffer(batchSize))
    {
        foreach (var value in batch)
        {
            await streamWriter.WriteLineAsync(value.Summary);

            yield return value;
        }

        await streamWriter.FlushAsync();
    }

    AnsiConsole.MarkupLine($"Results written to [green]{path}[/]");
}
Enter fullscreen mode Exit fullscreen mode

💡 Note, IAsyncEnumerable is pull-based model. With this approach, each summary is read individually and appended to the end of the file. This means that results are continuously saved as each batch is processed by calling the FlushAsync method.

The ReportProgress method is quite interesting because it eagerly reads all elements of the sequence to determine the total count. It then reports progress as each element is processed.

public static async IAsyncEnumerable<string> ReportProgress(this IAsyncEnumerable<string> values)
{
    var totalCount = await values.CountAsync();

    await foreach (var (value, index) in values.Select((value, index) => (value, index)))
    {
        yield return value;

        AnsiConsole
            .Progress()
            .Start(ctx =>
            {
                var task = ctx.AddTask($"Processing - {Path.GetFileName(value)}", true, totalCount);
                task.Increment(index + 1);
                task.StopTask();
            });
    }
}

Enter fullscreen mode Exit fullscreen mode

💡 This is a good demonstration of leaky abstractions. Not all data sources can provide the full sequence immediately, so we need to be careful.

Conclusion

That is it! 🙌 We have seen how to build pipelines using IAsyncEnumerable and System.Linq.Async. I hope you found this article helpful. If you have any questions or comments, please feel free to leave them below.

References

pipelines Article's
30 articles in total
Favicon
The Art of Iteration: Starting the Cycle
Favicon
The Art of Iteration: Loop in Pipeline Stage
Favicon
Automating Docker Workflows with Jenkins: A Complete Guide
Favicon
TIL how to see the entire commit column on GitLab using JS
Favicon
Getting Started with Apache Kafka: A Backend Engineer's Perspective
Favicon
Building pipelines with IAsyncEnumerable in .NET
Favicon
DevOps Security Integrating Best Practices into Your Pipeline
Favicon
Creating a data pipeline using Dataproc workflow templates and cloud Schedule
Favicon
☸️ Kubernetes: A Convenient Variable Substitution Mechanism for Kustomize
Favicon
Setting Up a CI/CD Pipeline with AWS and Git: A Comprehensive Guide
Favicon
Enabling Pipelines: Easier than ever
Favicon
Optimizing GitLab CI for Readability and Maintainability: From 1K to 600 Lines!
Favicon
Building Robust Data Pipelines: A Comprehensive Guide
Favicon
Azure DevOps Pipelines breaks my "additional arguments" when using Deploy to Azure
Favicon
What is CI/CD Pipeline?-Comparing pipelines!
Favicon
🌟 The Power of Automation: Deploying an ARM Template in Microsoft Azure 🚀
Favicon
Meet cici-tools, a multi-tool for building GitLab CI/CD pipelines
Favicon
Unlocking the Power of Data: 7 Key Factors to Consider When Building Data Pipelines
Favicon
Optimize Development with Jenkins Pipelines and Continuous Integration
Favicon
Amplify Your Tech Stack with Jenkins Shared Libraries
Favicon
Important Questions related to Data Engineering
Favicon
How To Secure Your CI/CD Pipeline
Favicon
Flexible and dynamic flow control of Azure DevOps YAML Pipelines using variables
Favicon
Go API Project Set-Up
Favicon
Sftp with Az Devops
Favicon
Error: Full scoped PAT is restricted by your organisation
Favicon
Error: No hosted parallelism has been purchased or granted
Favicon
SparrowCI - DSL is dead, long live DSL!
Favicon
Introducing the CircleCI Config SDK
Favicon
Run DB Scripts to Azure PostgreSQL Single Server using Azure CLI Task in pipeline

Featured ones: