Logo

dev-resources.site

for different kinds of informations.

Data Processing with Elixir (Part 2)

Published at
8/13/2023
Categories
elixir
dataprocessing
etl
Author
parthex
Categories
3 categories in total
elixir
open
dataprocessing
open
etl
open
Author
7 person written this
parthex
open
Data Processing with Elixir (Part 2)

In the last post we saw how to process data in Elixir using the Task.async module. It's a great way do parallel processing in Elixir. Please read the previous post to see what the dataset schema looks like in order to understand the code below.

In this part we will see how we can use the Flow library to process Github events. Flow is great library for processing data in Elixir. It allows one to utilise all cores on the machine. Its a much nicer way to write data pipelines than using Task.async as it allows us to use the familiar map and reduce abstractions that we are so used to in the Elixir world.

Following is the code to use the Flow library to process the TSV file chunks in parallel

# BINARY
tab_char = :binary.compile_pattern("\t")

streams =
  for file <- File.ls!("/Users/parth/temp/github_events/clickhouse_tsv") do
    full_path = "/Users/parth/temp/github_events/clickhouse_tsv/#{file}"
    File.stream!(full_path, [{:read_ahead, 100_000}])
  end

streams
|> Flow.from_enumerables()
|> Flow.map(fn line ->
  line |> String.split(tab_char) |> Enum.at(1)
end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn event_type, acc ->
  Map.update(acc, event_type, 1, &(&1 + 1))
end)
|> Enum.to_list()

# Result
# [{"CommitCommentEvent", 9954564}, {"CreateEvent", 295163106}]
Enter fullscreen mode Exit fullscreen mode

In the above code I create a list of streams - one each for each tsv file chunk I have. Next I pipe it into Flow.from_enumerables that creates flow definition from it. Then Flow.map processes each line in parallel. Under the hood Flow.map will spin up as many mapper processes as there are CPU cores. You can read more about Flow in its excellent documentation.
The mapper is very simple - it splits the line on tab character and returns the 2nd item in the list.

Then the Flow.partition applies a hash function on the results of the Flow.map stage and sends the result to one of the reducers. The strings that have the same hash value will go to the same reducer.

Finally there is a Enum.to_list() at the end of the Flow that actually causes the Flow pipeline to start running. Remember Flow is lazy it won't start running until you call a Enum.to_list or a Flow.run on the pipeline.

When I ran the above pipeline it took around 630 seconds to run and the CPU usage was around consistent 4.5 cpu cores

I wanted to see if using a regex in the Flow.map stage will make things any faster. Below is updated code with regex parsing in the Flow.map stage.

regex = Regex.compile!("[^\t]+\t([^\t]+)\t.+")

streams =
  for file <- File.ls!("/Users/parth/temp/github_events/clickhouse_tsv") do
    full_path = "/Users/parth/temp/github_events/clickhouse_tsv/#{file}"
    File.stream!(full_path, [{:read_ahead, 100_000}])
  end

streams
|> Flow.from_enumerables()
|> Flow.map(fn line ->
  [[_, event_type]] = Regex.scan(regex, line)
  event_type
end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn event_type, acc ->
  Map.update(acc, event_type, 1, &(&1 + 1))
end)
|> Enum.to_list()
Enter fullscreen mode Exit fullscreen mode

The above code ran in 470 secs which is a ~25% improvement in speed compared to 630 seconds. I suspect the regex approach creates less garbage compared to the previous approach of splitting the string into a list and then picking the second item and hence might be faster.

In the next part we will explore if we can further optimise this code.

dataprocessing Article's
30 articles in total
Favicon
What is Real-Time Data Processing?
Favicon
Apache Spark vs. Apache Flink: A Comparison of the Data Processing Duo
Favicon
Data Manipulation with Strings and Arrays in Bash
Favicon
How to Choose the best Automated Data Processing Equipment
Favicon
Simplifying Data Processing with Java Stream API
Favicon
All About Database Sharding and Improving Scalability.
Favicon
Understanding Apache Spark and Hadoop Jobs
Favicon
How to Choose the Right Data Processing Service Provider: A Comprehensive Guide
Favicon
Real-Time Data Scrubbing Before Storing In A Data Warehouse
Favicon
The Modern Data Stack - An essential guide
Favicon
Beginner's guide to Apache Flink
Favicon
Kubernetes for Big Data Processing.
Favicon
How to do question answering from a PDF
Favicon
Standardizing the Data Using StandardScaler in ML
Favicon
Introducing Memphis Functions
Favicon
Elements of Event Driven Architecture(EDA)
Favicon
What is data collection for machine learning?
Favicon
Event-Driven Architecture with Serverless Functions – Part 1
Favicon
Simplify Data Cleansing with YAML Configurations
Favicon
How to deduplicate scraped data
Favicon
Data Processing with Elixir (Part 2)
Favicon
From Transactions to Analytics: Exploring the World of OLTP and OLAP.
Favicon
Part 3: Transforming MongoDB CDC Event Messages
Favicon
kafka: distributed task queue
Favicon
Getting started with Apache Flink: A guide to stream processing
Favicon
Apache Flink vs Apache Spark: A detailed comparison for data processing
Favicon
Memphis is now GA!
Favicon
Real-Time Data Processing using AWS
Favicon
What Is DPA and Why Is It a Must in Software Development Outsourcing?
Favicon
Customer Data Pipeline And Data Processing: Types, Importance, And Benefits

Featured ones: