dev-resources.site
for different kinds of informations.
Working with Parquet files in Java using Protocol Buffers
This post continues the series of articles about working with Parquet files in Java. This time, I'll explain how to do it using the Protocol Buffers (PB) library.
Finding examples and documentation on how to use Parquet with Avro is challenging, but with Protocol Buffers, it's even more complicated.
Protocol Buffers and Parquet both support complex data structures, offering a direct and clear mapping between them.
I will base this on the same example I used in previous articles talking about serialization. The code will be very similar to that in the article about Protocol Buffers.
In the example, we will work with a collection of Organization objects, which in turn have a list of Attributes:
record Org(String name, String category, String country, Type type, List<Attr> attributes) {
}
record Attr(String id, byte quantity, byte amount, boolean active, double percent, short size) {
}
enum Type {
FOO, BAR, BAZ
}
In Parquet with PB we must also use classes generated from Protocol Buffers IDL. This capability is specific to PB, not Parquet, but it is inherited by parquet-protobuf
, the library that implements this integration.
Internally, the library transforms the PB schema into the Parquet schema, so most tools and libraries that can work with PB classes will be able to work indirectly with Parquet with few changes.
Apart from the classes used to write or read the files, the remaining logic for building the objects generated by PB or reading their data remains unchanged compared to PB serialization.
Serialization
We will need to instantiate a Parquet writer that supports the writing of objects created by PB:
Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
ParquetWriter<Organization> writer = ProtoParquetWriter.<Organization>builder(outputFile)
.withMessage(Organization.class)
.withWriteMode(Mode.OVERWRITE)
.config(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, "true")
.build();
Parquet defines a class named ParquetWriter<T>
and the parquet-protobuf
library extends it by implementing in ProtoParquetWriter<T>
the logic of converting PB objects into calls to the Parquet API. The object we will serialize is Organization
, which has been generated using the PB utility and implements the PB API.
The Path
class is not the one from java.nio.file
, but a Hadoop-specific abstraction for referencing file paths. Whereas the OutputFile
class is Parquet's file abstraction with the capability to write to them.
Therefore:
-
Path
,OutputFile
,HadoopOutputFile
, andParquetWriter
are classes defined by the Parquet API -
ProtoParquetWriter
is a class defined by theparquet-protobuf
API, a library that encapsulates Parquet with Protocol Buffers -
Organization
andAttribute
are classes generated by the PB utility, not related to Parquet
The way to create an instance of ParquetWriter
is through a Builder, where you can configure many Parquet-specific parameters or those of the library we are using (Protocol Buffers). For example:
-
withMessage
: class generated with Protocol Buffers that we want to persist (which internally defines the schema) -
withCompressionCodec
: compression method to use: SNAPPY, GZIP, LZ4, etc. By default, it doesn't configure any. -
withWriteMode
: by default it is CREATE, so if the file already existed, it would not overwrite it and would throw an exception. To avoid this, use OVERWRITE. -
withValidation
: if we want it to validate the data types passed against the defined schema.
Generic configuration can be passed using the config(String property, String value)
method. In this case, we configure it to internally use a three-level structure to represent nested lists.
Once the ParquetWriter
class is instantiated, the major complexity lies in transforming your POJOs into the Organization
classes generated from the PB IDL:
Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<Organization> writer = ProtoParquetWriter.<Organization>builder(outputFile)
.withMessage(Organization.class)
.withWriteMode(Mode.OVERWRITE)
.config(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, "true")
.build()) {
for (Org org : organizations) {
var organizationBuilder = Organization.newBuilder()
.setName(org.name())
.setCategory(org.category())
.setCountry(org.country())
.setType(OrganizationType.forNumber(org.type().ordinal()));
for (Attr attr : org.attributes()) {
var attribute = Attribute.newBuilder()
.setId(attr.id())
.setQuantity(attr.quantity())
.setAmount(attr.amount())
.setActive(attr.active())
.setPercent(attr.percent())
.setSize(attr.size())
.build();
organizationBuilder.addAttributes(attribute);
}
writer.write(organizationBuilder.build());
}
}
Instead of converting the entire collection of organizations and then writing it, we can convert and persist each Organization
one by one.
You can find the code on GitHub.
Deserialization
Deserialization is very straightforward if we are willing to work with the classes generated by Protocol Buffers.
To read the file, we will need to instantiate a Parquet reader:
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
ParquetReader<Organization.Builder> reader =
ProtoParquetReader.<Organization.Builder>builder(inputFile).build()
Parquet defines a class named ParquetReader<T>
and the parquet-protobuf
library extends it by implementing in ProtoParquetReader
the logic of converting Parquet's internal data structures into classes generated by Protocol Buffers.
InputFile
is Parquet's file abstraction with the capability to read from them.
Therefore:
-
Path
,InputFile
,HadoopInputFile
, andParquetReader
are classes defined by the Parquet API -
ProtoParquetReader
implementsParquetReader
and is defined inparquet-protobuf
, a library that encapsulates Parquet with PB -
Organization
(andAttribute
) are classes generated by the PB utility, not related to Parquet
The instantiation of the ParquetReader
class is also done with a Builder, although the options to configure are much fewer, as all its configuration is determined by the file we are going to read. The reader does not need to know if the file uses dictionary encoding or if it is compressed, so it is not necessary to configure it; it discovers this by reading the file.
It's important to note that the data type the reader will return is a Builder
for Organization
, rather than the Organization
itself, and we will need to call the build()
method:
Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<Organization.Builder> reader = ProtoParquetReader.<Organization.Builder>builder(inputFile).build()) {
List<Organization> organizations = new ArrayList<>();
Organization.Builder next = null;
while ((next = reader.read()) != null) {
organizations.add(next.build());
}
return organizations;
}
If the IDL used to generate the code contained only a subset of the columns existing in the file, when reading it, it would be ignoring all the columns not present in the IDL. This would save us disk reads and data deserialization/decoding.
You can find the code on GitHub.
Performance
What performance does Parquet with Protocol Buffers offer when serializing and deserializing large volumes of data? To what extent do different compression options affect it? Should we choose compression with Snappy or no compression at all? And what about enabling the dictionary or not?
Leveraging the analyses I conducted previously on different serialization formats, we can get an idea of their strengths and weaknesses. The benchmarks were conducted on the same computer, making them comparable.
File Size
Uncompressed | Snappy | |
---|---|---|
Dictionary False | 1 034 MB | 508 MB |
Dictionary True | 289 MB | 281 MB |
Given the difference in sizes, we can see that in my synthetic example, the use of dictionaries compresses the information quite effectively, almost better than the Snappy algorithm itself. The decision to enable compression or not will depend on the performance penalty it entails.
Serialization Time
Uncompressed | Snappy | |
---|---|---|
Dictionary False | 14,802 ms | 15,450 ms |
Dictionary True | 16,018 ms | 16,174 ms |
The time is very similar in all cases, and we can say that the different compression techniques do not significantly affect the time taken.
Compared with other serialization formats, it takes between 50% (Jackson) and 300% (Protocol Buffers/Avro) longer, but in return, it achieves files that are between 60% (Protocol Buffers/Avro) and 90% (Jackson) smaller.
Deserialization Time
Uncompressed | Snappy | |
---|---|---|
Dictionary False | 12 ,64 ms | 13,028 ms |
Dictionary True | 10,492 ms | 11,025 ms |
In this case, the use of the dictionary has a significant impact on time, saving the effort of decoding information that is repeated. There is definitely no reason to disable this feature.
Compared to other formats, it is 100% slower than pure Protocol Buffers and 30% slower than pure Avro, but it is almost twice as fast as Jackson.
To put the performance into perspective, on my laptop, it reads 45,000 Organization
s per second, which in turn contain almost 2.6 million instances of the Attribute
type.
Conclusion
If you were already familiar with Protocol Buffers, most of the code and particularities related to PB will be recognizable to you. If not, it increases the entry barrier, as you need to learn about two distinct but similar technologies, and it's not always clear which part belongs to each one.
The biggest change compared to pure Protocol Buffers is the way to create the writer and reader objects, where we have to deal with all the configuration and particularities specific to Parquet.
Although possible, Protocol Buffers is not intended for serializing large amounts of data, so it's not comparable to Parquet using Protocol Buffers. If I had to choose between using Parquet with PB or Parquet with Avro, I would probably opt for the Avro version, as Avro is often used in the data engineering world and you could benefit more from the experience.
The data I used in the example is random and results may vary depending on the characteristics of your data. Test with your data to draw conclusions.
In environments where you write once and read multiple times, the time spent on serialization should not be a determining factor. More important are the file size, transfer time, or processing speed (especially if you can filter columns by making projections).
Despite using different compression and encoding techniques, the processing speed of files is quite high. Coupled with its ability to work with a typed schema, this makes Parquet a data exchange format to consider in projects with a high volume of data.
Featured ones: