Logo

dev-resources.site

for different kinds of informations.

Processing parquet files in Golang

Published at
12/10/2019
Categories
go
parquet
Author
eminetto
Categories
2 categories in total
go
open
parquet
open
Author
8 person written this
eminetto
open
Processing parquet files in Golang

In this post, I will talk about a relatively new data file format, and how to use it in Go.

The format is called Parquet and is currently a project supported by the Apache Foundation. It is a binary file format to store and facilitate data processing a columnar storage format. It supports different types of compression and is widely used in data science and big data environment, with tools like Hadoop.

At Codenation we are using this format to store statistical data in S3 buckets. That way, we can do parallel processing using Lambda Functions without overloading our database servers.

In this post, I will show how to generate and process files in this format using the Go language.

The first step is to create a struct that will represent the data we will process in this example:

type user struct {
  ID        string    `parquet:"name=id, type=UTF8, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}
Enter fullscreen mode Exit fullscreen mode

The important detail in this code is the tags, which state how each field of the struct will be handled when generating the parquet file. To process the data I am using the package github.com/xitongsys/parquet-go and in the repository you can see more examples of available tags.

Let's now generate our first file in parquet format:

package main

import (
  "fmt"
  "log"
  "time"
  "github.com/bxcodec/faker/v3"
  "github.com/xitongsys/parquet-go-source/local"
  "github.com/xitongsys/parquet-go/parquet"
  "github.com/xitongsys/parquet-go/reader"
  "github.com/xitongsys/parquet-go/writer"
)

type user struct {
  ID        string    `parquet:"name=id, type=UTF8, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

const recordNumber = 10000

func main() {
  var data []*user
  //create fake data
  for i := 0; i < recordNumber; i++ {
    u := &user{
      ID:        faker.UUIDDigit(),
      FirstName: faker.FirstName(),
      LastName:  faker.LastName(),
      Email:     faker.Email(),
      Phone:     faker.Phonenumber(),
      Blog:      faker.URL(),
      Username:  faker.Username(),
      Score:     float64(i),
      CreatedAt: time.Now(),
    }
    data = append(data, u)
  }
  err := generateParquet(data)
  if err != nil {
    log.Fatal(err)
  }

}

func generateParquet(data []*user) error {
  log.Println("generating parquet file")
  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }
  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()
  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil
}
Enter fullscreen mode Exit fullscreen mode

The next snippet shows how we read content in a parquet file:

func readParquet() ([]*user, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }
  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }
  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()
  return u, nil
}
Enter fullscreen mode Exit fullscreen mode

The above example is just a didactic one. As I am reading the entire file and putting all 10,000 records into memory this can be a problem when talking about gigabytes of data. In real-life applications we will use functions that the package provides to fetch only part of the file:

func readPartialParquet(pageSize, page int) ([]*user, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }
  pr, err := reader.NewParquetReader(fr, new(user), int64(pageSize))
  if err != nil {
    return nil, err
  }
  pr.SkipRows(int64(pageSize * page))
  u := make([]*user, pageSize)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()
  return u, nil
}
Enter fullscreen mode Exit fullscreen mode

As the definition makes clear, we are using a columnar storage format. So, we can take just the Score column and calculate its average:

func calcScoreAVG() (float64, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return 0.0, err
  }
  pr, err := reader.NewParquetColumnReader(fr, recordNumber)
  if err != nil {
    return 0.0, err
  }
  num := int(pr.GetNumRows())

  data, _, _, err := pr.ReadColumnByPath("parquet_go_root.score", num)
  if err != nil {
    return 0.0, err
  }
  var result float64
  for _, i := range data {
    result += i.(float64)
  }
  return (result / float64(num)), nil
}
Enter fullscreen mode Exit fullscreen mode

The purpose of this post was to introduce this format which can be very useful for data transfer, replacing CSV or JSON files in projects of different scales.

You can dig deeper into the format and package documentation to find more complex and detailed examples, but I hope I have brought some useful advice for some Go projects.

The complete example code presented in this post can be found in this repository.

Featured ones: