Logo

dev-resources.site

for different kinds of informations.

Working with Parquet files in Java using Protocol Buffers

Published at
12/7/2023
Categories
parquet
java
protocolbuffers
bigdata
Author
Jerónimo López
Working with Parquet files in Java using Protocol Buffers

This post continues the series of articles about working with Parquet files in Java. This time, I'll explain how to do it using the Protocol Buffers (PB) library.

Finding examples and documentation on how to use Parquet with Avro is challenging, but with Protocol Buffers, it's even more complicated.

Protocol Buffers and Parquet both support complex data structures, offering a direct and clear mapping between them.

I will base this on the same example I used in previous articles talking about serialization. The code will be very similar to that in the article about Protocol Buffers.

In the example, we will work with a collection of Organization objects, which in turn have a list of Attributes:

record Org(String name, String category, String country, Type type, List<Attr> attributes) {
}

record Attr(String id, byte quantity, byte amount, boolean active, double percent, short size) {
}

enum Type {
  FOO, BAR, BAZ
}

In Parquet with PB we must also use classes generated from Protocol Buffers IDL. This capability is specific to PB, not Parquet, but it is inherited by parquet-protobuf, the library that implements this integration.

Internally, the library transforms the PB schema into the Parquet schema, so most tools and libraries that can work with PB classes will be able to work indirectly with Parquet with few changes.

Apart from the classes used to write or read the files, the remaining logic for building the objects generated by PB or reading their data remains unchanged compared to PB serialization.

Serialization

We will need to instantiate a Parquet writer that supports the writing of objects created by PB:

Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
ParquetWriter<Organization> writer = ProtoParquetWriter.<Organization>builder(outputFile)
      .withMessage(Organization.class)
      .withWriteMode(Mode.OVERWRITE)
      .config(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, "true")
      .build();

Parquet defines a class named ParquetWriter<T> and the parquet-protobuf library extends it by implementing in ProtoParquetWriter<T> the logic of converting PB objects into calls to the Parquet API. The object we will serialize is Organization, which has been generated using the PB utility and implements the PB API.

The Path class is not the one from java.nio.file, but a Hadoop-specific abstraction for referencing file paths. Whereas the OutputFile class is Parquet's file abstraction with the capability to write to them.

Therefore:

  • Path, OutputFile, HadoopOutputFile, and ParquetWriter are classes defined by the Parquet API
  • ProtoParquetWriter is a class defined by the parquet-protobuf API, a library that encapsulates Parquet with Protocol Buffers
  • Organization and Attribute are classes generated by the PB utility, not related to Parquet

The way to create an instance of ParquetWriter is through a Builder, where you can configure many Parquet-specific parameters or those of the library we are using (Protocol Buffers). For example:

  • withMessage: class generated with Protocol Buffers that we want to persist (which internally defines the schema)
  • withCompressionCodec: compression method to use: SNAPPY, GZIP, LZ4, etc. By default, it doesn't configure any.
  • withWriteMode: by default it is CREATE, so if the file already existed, it would not overwrite it and would throw an exception. To avoid this, use OVERWRITE.
  • withValidation: if we want it to validate the data types passed against the defined schema.

Generic configuration can be passed using the config(String property, String value) method. In this case, we configure it to internally use a three-level structure to represent nested lists.

Once the ParquetWriter class is instantiated, the major complexity lies in transforming your POJOs into the Organization classes generated from the PB IDL:

Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<Organization> writer = ProtoParquetWriter.<Organization>builder(outputFile)
      .withMessage(Organization.class)
      .withWriteMode(Mode.OVERWRITE)
      .config(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, "true")
      .build()) {
    for (Org org : organizations) {
        var organizationBuilder = Organization.newBuilder()
            .setName(org.name())
            .setCategory(org.category())
            .setCountry(org.country())
            .setType(OrganizationType.forNumber(org.type().ordinal()));
        for (Attr attr : org.attributes()) {
            var attribute = Attribute.newBuilder()
                .setId(attr.id())
                .setQuantity(attr.quantity())
                .setAmount(attr.amount())
                .setActive(attr.active())
                .setPercent(attr.percent())
                .setSize(attr.size())
                .build();
            organizationBuilder.addAttributes(attribute);
        }
        writer.write(organizationBuilder.build());
    }
}

Instead of converting the entire collection of organizations and then writing it, we can convert and persist each Organization one by one.

You can find the code on GitHub.

Deserialization

Deserialization is very straightforward if we are willing to work with the classes generated by Protocol Buffers.

To read the file, we will need to instantiate a Parquet reader:

Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
ParquetReader<Organization.Builder> reader =
    ProtoParquetReader.<Organization.Builder>builder(inputFile).build()

Parquet defines a class named ParquetReader<T> and the parquet-protobuf library extends it by implementing in ProtoParquetReader the logic of converting Parquet's internal data structures into classes generated by Protocol Buffers.

InputFile is Parquet's file abstraction with the capability to read from them.

Therefore:

  • Path, InputFile, HadoopInputFile, and ParquetReader are classes defined by the Parquet API
  • ProtoParquetReader implements ParquetReader and is defined in parquet-protobuf, a library that encapsulates Parquet with PB
  • Organization (and Attribute) are classes generated by the PB utility, not related to Parquet

The instantiation of the ParquetReader class is also done with a Builder, although the options to configure are much fewer, as all its configuration is determined by the file we are going to read. The reader does not need to know if the file uses dictionary encoding or if it is compressed, so it is not necessary to configure it; it discovers this by reading the file.

It's important to note that the data type the reader will return is a Builder for Organization, rather than the Organization itself, and we will need to call the build() method:

Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<Organization.Builder> reader = ProtoParquetReader.<Organization.Builder>builder(inputFile).build()) {
    List<Organization> organizations = new ArrayList<>();
    Organization.Builder next = null;
    while ((next = reader.read()) != null) {
        organizations.add(next.build());
    }
    return organizations;
}

If the IDL used to generate the code contained only a subset of the columns existing in the file, when reading it, it would be ignoring all the columns not present in the IDL. This would save us disk reads and data deserialization/decoding.

You can find the code on GitHub.

Performance

What performance does Parquet with Protocol Buffers offer when serializing and deserializing large volumes of data? To what extent do different compression options affect it? Should we choose compression with Snappy or no compression at all? And what about enabling the dictionary or not?

Leveraging the analyses I conducted previously on different serialization formats, we can get an idea of their strengths and weaknesses. The benchmarks were conducted on the same computer, making them comparable.

File Size

Uncompressed Snappy
Dictionary False 1 034 MB 508 MB
Dictionary True 289 MB 281 MB

Given the difference in sizes, we can see that in my synthetic example, the use of dictionaries compresses the information quite effectively, almost better than the Snappy algorithm itself. The decision to enable compression or not will depend on the performance penalty it entails.

Serialization Time

Uncompressed Snappy
Dictionary False 14,802 ms 15,450 ms
Dictionary True 16,018 ms 16,174 ms

The time is very similar in all cases, and we can say that the different compression techniques do not significantly affect the time taken.

Compared with other serialization formats, it takes between 50% (Jackson) and 300% (Protocol Buffers/Avro) longer, but in return, it achieves files that are between 60% (Protocol Buffers/Avro) and 90% (Jackson) smaller.

Deserialization Time

Uncompressed Snappy
Dictionary False 12 ,64 ms 13,028 ms
Dictionary True 10,492 ms 11,025 ms

In this case, the use of the dictionary has a significant impact on time, saving the effort of decoding information that is repeated. There is definitely no reason to disable this feature.

Compared to other formats, it is 100% slower than pure Protocol Buffers and 30% slower than pure Avro, but it is almost twice as fast as Jackson.

To put the performance into perspective, on my laptop, it reads 45,000 Organizations per second, which in turn contain almost 2.6 million instances of the Attribute type.

Conclusion

If you were already familiar with Protocol Buffers, most of the code and particularities related to PB will be recognizable to you. If not, it increases the entry barrier, as you need to learn about two distinct but similar technologies, and it's not always clear which part belongs to each one.

The biggest change compared to pure Protocol Buffers is the way to create the writer and reader objects, where we have to deal with all the configuration and particularities specific to Parquet.

Although possible, Protocol Buffers is not intended for serializing large amounts of data, so it's not comparable to Parquet using Protocol Buffers. If I had to choose between using Parquet with PB or Parquet with Avro, I would probably opt for the Avro version, as Avro is often used in the data engineering world and you could benefit more from the experience.

The data I used in the example is random and results may vary depending on the characteristics of your data. Test with your data to draw conclusions.

In environments where you write once and read multiple times, the time spent on serialization should not be a determining factor. More important are the file size, transfer time, or processing speed (especially if you can filter columns by making projections).

Despite using different compression and encoding techniques, the processing speed of files is quite high. Coupled with its ability to work with a typed schema, this makes Parquet a data exchange format to consider in projects with a high volume of data.

Featured ones: