dev-resources.site
for different kinds of informations.
Data Processing with Elixir (Part 2)
In the last post we saw how to process data in Elixir using the Task.async
module. It's a great way do parallel processing in Elixir. Please read the previous post to see what the dataset schema looks like in order to understand the code below.
In this part we will see how we can use the Flow library to process Github events. Flow is great library for processing data in Elixir. It allows one to utilise all cores on the machine. Its a much nicer way to write data pipelines than using Task.async as it allows us to use the familiar map and reduce abstractions that we are so used to in the Elixir world.
Following is the code to use the Flow library to process the TSV file chunks in parallel
# BINARY
tab_char = :binary.compile_pattern("\t")
streams =
for file <- File.ls!("/Users/parth/temp/github_events/clickhouse_tsv") do
full_path = "/Users/parth/temp/github_events/clickhouse_tsv/#{file}"
File.stream!(full_path, [{:read_ahead, 100_000}])
end
streams
|> Flow.from_enumerables()
|> Flow.map(fn line ->
line |> String.split(tab_char) |> Enum.at(1)
end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn event_type, acc ->
Map.update(acc, event_type, 1, &(&1 + 1))
end)
|> Enum.to_list()
# Result
# [{"CommitCommentEvent", 9954564}, {"CreateEvent", 295163106}]
In the above code I create a list of streams - one each for each tsv file chunk I have. Next I pipe it into Flow.from_enumerables
that creates flow definition from it. Then Flow.map
processes each line in parallel. Under the hood Flow.map will spin up as many mapper processes as there are CPU cores. You can read more about Flow in its excellent documentation.
The mapper is very simple - it splits the line on tab character and returns the 2nd item in the list.
Then the Flow.partition
applies a hash function on the results of the Flow.map
stage and sends the result to one of the reducers. The strings that have the same hash value will go to the same reducer.
Finally there is a Enum.to_list()
at the end of the Flow that actually causes the Flow pipeline to start running. Remember Flow is lazy it won't start running until you call a Enum.to_list or a Flow.run on the pipeline.
When I ran the above pipeline it took around 630 seconds
to run and the CPU usage was around consistent 4.5 cpu cores
I wanted to see if using a regex in the Flow.map stage will make things any faster. Below is updated code with regex parsing in the Flow.map stage.
regex = Regex.compile!("[^\t]+\t([^\t]+)\t.+")
streams =
for file <- File.ls!("/Users/parth/temp/github_events/clickhouse_tsv") do
full_path = "/Users/parth/temp/github_events/clickhouse_tsv/#{file}"
File.stream!(full_path, [{:read_ahead, 100_000}])
end
streams
|> Flow.from_enumerables()
|> Flow.map(fn line ->
[[_, event_type]] = Regex.scan(regex, line)
event_type
end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn event_type, acc ->
Map.update(acc, event_type, 1, &(&1 + 1))
end)
|> Enum.to_list()
The above code ran in 470 secs
which is a ~25%
improvement in speed compared to 630 seconds
. I suspect the regex approach creates less garbage compared to the previous approach of splitting the string into a list and then picking the second item and hence might be faster.
In the next part we will explore if we can further optimise this code.
Featured ones: