Logo

dev-resources.site

for different kinds of informations.

Working with Parquet files in Java using Avro

Published at
11/26/2023
Categories
parquet
java
avro
bigdata
Author
Jerónimo López
Categories
4 categories in total
parquet
open
java
open
avro
open
bigdata
open
Working with Parquet files in Java using Avro

In the previous article, I wrote an introduction to using Parquet files in Java, but I did not include any examples. In this article, I will explain how to do this using the Avro library.

Parquet with Avro is one of the most popular ways to work with Parquet files in Java due to its simplicity, flexibility, and because it is the library with the most examples.

Both Avro and Parquet allow complex data structures, and there is a mapping between the types of one and the other.

The post will use the same example I used in previous articles talking about serialization. The code will be very similar to the article about Avro. For specific details about Avro, I refer you to that article.

In the example, we will work with a collection of Organization objects (Org), which have also a list of Attributes (Attr):

record Org(String name, String category, String country, Type type, List<Attr> attributes) {
}

record Attr(String id, byte quantity, byte amount, boolean active, double percent, short size) {
}

enum Type {
  FOO, BAR, BAZ
}

Similar to saving files in Avro format, this version of Parquet with Avro allows writing files using classes generated from the IDL or the GenericRecord data structure. This capability is specific to Avro, not Parquet, but is inherited by parquet-avro, the library that implements this integration.

Internally, the library transforms the Avro schema into the Parquet schema, so most tools and libraries that know how to work with Avro classes will be able to work indirectly with Parquet with few changes.

Using code generation

The only difference when compared to serializing in Avro format lies in the class used for writing or reading files; otherwise, the logic for building the Avro-generated classes and reading their data remains unchanged.

Serialization

We will need to instantiate a Parquet writer that supports the writing of objects created by Avro:

Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
ParquetWriter<Organization> writer = AvroParquetWriter.<Organization>builder(outputFile)
    .withSchema(new Organization().getSchema())
    .withWriteMode(Mode.OVERWRITE)
    .config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
    .build();

Parquet defines a class called ParquetWriter<T> and the parquet-avro library extends it implementing in AvroParquetWriter<T> the logic of converting Avro objects into calls to the Parquet API. The object we will serialize is Organization, which has been generated using the Avro utility and implements the Avro API.

The Path class is not the one existing in java.nio.file, but a Hadoop-specific abstraction for referencing file paths. Whereas the OutputFile class is Parquet's file abstraction with the capability to write to them.

Therefore:

  • Path, OutputFile, HadoopOutputFile, and ParquetWriter are classes defined by the Parquet API.
  • AvroParquetWriter is a class defined by the parquet-avro API, a library that encapsulates Parquet with Avro.
  • Organization and Attribute are classes generated by the Avro utility, not related to Parquet.

The way to create an instance of ParquetWriter is through a Builder, where you can configure many Parquet-specific parameters or those of the library we are using (Avro). For example:

  • withSchema: schema of the Organization class in Avro, which internally converts to a Parquet schema.
  • withCompressionCodec: compression method to use: SNAPPY, GZIP, LZ4, etc. By default, it doesn't configure any.
  • withWriteMode: by default it is CREATE, so if the file already existed, it would not overwrite it and would throw an exception. To avoid this, use OVERWRITE.
  • withValidation: if we want it to validate the data types passed against the defined schema.
  • withBloomFilterEnabled: if we want to enable the creation of bloom filters.

A most generic configuration of both libraries (not defined in the API) can be passed with the config(String property, String value) method. In this case, we configure it to internally use a 3-level structure to represent nested lists.

Once the ParquetWriter class is instantiated, the greatest complexity lies in transforming your POJOs into the Organization classes generated from Avro's IDL. The complete code would be as follows:

Path path = new Path("/tmp/my_output_file.parquet");
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<Organization> writer = AvroParquetWriter.<Organization>builder(outputFile)
    .withSchema(new Organization().getSchema())
    .withWriteMode(Mode.OVERWRITE)
    .config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
    .build()) {
  for (var org : organizations) {
    List<Attribute> attrs = org.attributes().stream()
      .map(a -> Attribute.newBuilder()
        .setId(a.id())
        .setQuantity(a.quantity())
        .setAmount(a.amount())
        .setSize(a.size())
        .setPercent(a.percent())
        .setActive(a.active())
        .build())
      .toList();
    Organization organization = Organization.newBuilder()
      .setName(org.name())
      .setCategory(org.category())
      .setCountry(org.country())
      .setOrganizationType(OrganizationType.valueOf(org.type().name()))
      .setAttributes(attrs)
      .build();
    writer.write(organization);
  }
}

Instead of converting the entire collection of organizations and then writing it, we can convert and persist each Organization one by one.

You can find the code on GitHub.

Deserialization

Deserialization is very straightforward if we agree to work with the classes generated by Avro.

To read the file, we will need to instantiate a Parquet reader:

Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
ParquetReader<Organization> reader = AvroParquetReader.<Organization>builder(inputFile).build();

Parquet defines a class called ParquetReader<T> and the parquet-avro library extends it by implementing in AvroParquetReader the logic of converting Parquet's internal data structures into the classes generated by Avro.

In this case, InputFile is Parquet's file abstraction with the capability to read from them.

Therefore:

  • Path, InputFile, HadoopInputFile, and ParquetReader are classes defined by the Parquet API.
  • AvroParquetReader implements ParquetReader and is defined in parquet-avro, a library that encapsulates Parquet with Avro.
  • Organization (and Attribute) are classes generated by the Avro utility, not related to Parquet.

The instantiation of the ParquetReader class is also done with a Builder, although the options to configure are much fewer, as all its configuration is determined by the file we are going to read. The reader does not need to know if the file uses dictionary encoding or if it is compressed, so it is not necessary to configure it; it discovers this by reading the file.

Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<Organization> reader = AvroParquetReader.<Organization>builder(inputFile).build()) {
    List<Organization> organizations = new ArrayList<>();
    Organization next = null;
    while ((next = reader.read()) != null) {
        organizations.add(next);
    }
    return organizations;
}

If the IDL used to generate the code contains a subset of the attributes persisted in the file, when reading it we would be ignoring all the columns not present in the IDL. This would save us from disk reads and the deserialization/decoding of data.

You can find the code on GitHub.

Using GenericRecord

Here, it will not be necessary to generate any code, and we will work with the GenericRecord class provided by Avro, but the code will be a bit more verbose.

Serialization

As we do not have generated files containing the embedded schema, we need to programmatically define the Avro schema we are going to use. The code is the same as in the article about Avro:

Schema attrSchema = SchemaBuilder.record("Attribute")
  .fields()
  .requiredString("id")
  .requiredInt("quantity")
  .requiredInt("amount")
  .requiredInt("size")
  .requiredDouble("percent")
  .requiredBoolean("active")
  .endRecord();
var enumSymbols = Stream.of(Type.values()).map(Type::name).toArray(String[]::new);
Schema orgsSchema = SchemaBuilder.record("Organizations")
  .fields()
  .requiredString("name")
  .requiredString("category")
  .requiredString("country")
  .name("organizationType").type().enumeration("organizationType")
  .symbols(enumSymbols).noDefault()
  .name("attributes").type().array().items(attrSchema).noDefault()
  .endRecord();
var typeField = orgsSchema.getField("organizationType").schema();
EnumMap<Type, EnumSymbol> enums = new EnumMap<>(Type.class);
enums.put(Type.BAR, new EnumSymbol(typeField, Type.BAR));
enums.put(Type.BAZ, new EnumSymbol(typeField, Type.BAZ));
enums.put(Type.FOO, new EnumSymbol(typeField, Type.FOO));

Instead of using an AvroParquetWriter of the type Organization, we create one of the type GenericRecord and construct instances of it as if it were a Map:

Path path = new Path(filePath);
OutputFile outputFile = HadoopOutputFile.fromPath(path, new Configuration());
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile)
    .withSchema(orgsSchema)
    .withWriteMode(Mode.OVERWRITE)
    .config(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false")
    .build()) {
  for (var org : organizations) {
    List<GenericRecord> attrs = new ArrayList<>();
    for (var attr : org.attributes()) {
      GenericRecord attrRecord = new GenericData.Record(attrSchema);
      attrRecord.put("id", attr.id());
      attrRecord.put("quantity", attr.quantity());
      attrRecord.put("amount", attr.amount());
      attrRecord.put("size", attr.size());
      attrRecord.put("percent", attr.percent());
      attrRecord.put("active", attr.active());
      attrs.add(attrRecord);
    }
    GenericRecord orgRecord = new GenericData.Record(orgsSchema);
    orgRecord.put("name", org.name());
    orgRecord.put("category", org.category());
    orgRecord.put("country", org.country());
    orgRecord.put("organizationType", enums.get(org.type()));
    orgRecord.put("attributes", attrs);
    writer.write(orgRecord);
  }
}

You can find the code on GitHub.

Deserialization

As in the original version of Avro, most of the work consists in converting the GenericRecord into our data structure. Because it behaves like a Map, we will have to cast the types of the values:

Path path = new Path(filePath);
InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration());
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile).build()) {
  List<Org> organizations = new ArrayList<>();
  GenericRecord record = null;
  while ((record = reader.read()) != null) {
    List<GenericRecord> attrsRecords = (List<GenericRecord>) record.get("attributes");
    var attrs = attrsRecords.stream().map(attr -> new Attr(attr.get("id").toString(),
        ((Integer) attr.get("quantity")).byteValue(),
        ((Integer) attr.get("amount")).byteValue(),
        (boolean) attr.get("active"),
        (double) attr.get("percent"),
        ((Integer) attr.get("size")).shortValue())).toList();
    Utf8 name = (Utf8) record.get("name");
    Utf8 category = (Utf8) record.get("category");
    Utf8 country = (Utf8) record.get("country");
    Type type = Type.valueOf(record.get("organizationType").toString());
    organizations.add(new Org(name.toString(), category.toString(), country.toString(), type, attrs));
  }
  return organizations;
}

As we are using the Avro interface, it maintains its logic that Strings are encoded within the Utf8 class and it will be necessary to extract their values.

You can find the code on GitHub.

By default, when reading the file, it deserializes all fields of each row because it does not know the schema of what you need to read, and processes everything. If you want a projection of the fields, you must pass it in the form of an Avro schema when creating the ParquetReader:

Schema projection = SchemaBuilder.record("Organizations")
  .fields()
  .requiredString("name")
  .requiredString("category")
  .requiredString("country")
  .endRecord();
Configuration configuration = new Configuration();
configuration.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, orgsSchema.toString());
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
  .withConf(configuration)
  .build()) {
....

The rest of the process would be the same, but with fewer fields. You can see the entire source code of the example here.

Performance

What performance does Parquet Avro offer when serializing and deserializing a large volume of data? To what extent do the different compression options influence? Do we choose compression with Snappy or no compression at all? And what about activating the dictionary or not?

Taking advantage of the analyses I did previously on different serialization formats, we can get an idea of their strengths and weaknesses. The benchmarks were done on the same computer, so they are comparable to give us an idea.

File Size

Both using code generation and GenericRecord, the result is the same, as they are different ways of defining the same schema and persisting the same data:

Uncompressed Snappy
Dictionay False 1,034 MB 508 MB
Dictionay True 289 MB 281 MB

Given the difference in sizes, we can see that in my synthetic example, the use of dictionaries compresses the information significantly, even more than the Snappy algorithm itself. The decision to activate compression or not will depend on the performance penalty it entails.

Serialization Time

Using code generation:

Uncompressed Snappy
Dictionary False 14,386 ms 14,920 ms
Dictionary True 15,110 ms 15,381 ms

Using GenericRecord:

Uncompressed Snappy
Dictionary False 15,287 ms 15,809 ms
Dictionary True 16,119 ms 16,432 ms

The time is very similar in all cases, and we can say that the different compression techniques do not significantly affect the time spent.

There are no notable time differences between generated code and the use of GenericRecord, so performance should not be a determining factor in choosing a solution.

Compared to other serialization formats, it takes between 40% (Jackson) and 300% (Protocol Buffers/Avro) more time, but in return achieves files that are between 70% (Protocol Buffers/Avro) and 90% (Jackson) smaller.

Deserialization Time

Using code generation:

Uncompressed Snappy
Dictionary False 10,722 ms 10,736 ms
Dictionary True 7,707 ms 7,665 ms

Using GenericRecord:

Uncompressed Snappy
Dictionary False 12,089 ms 11,931 ms
Dictionary True 8,374 ms 8,451 ms

In this case, the use of the dictionary has a significant impact on time, as it saves decoding information that is repeated. There is definitely no reason to disable this functionality.

If we compare with other formats, it is twice as slow as Protocol Buffers and on par with Avro, but more than twice as fast as Jackson.

To put the performance into perspective, on my laptop, it reads 50,000 Organizations per second, which in turn contain almost 3 million instances of type Attribute, per second!

Deserialization Time Using a Projection

What is the performance like if we use a projection and we only read three fields of the Organization object and ignore its collection of attributes?

Uncompressed Snappy
Dictionay False 289 ms 304 ms
Dictionay True 195 ms 203 ms

We confirm the promise that if we access a subset of columns, we will read and decode much less information. In this case, it takes only 2.5% of the time, or in other words, it is 40 times faster at processing the same file.

This is where Parquet shows its full power, by allowing to read and decode a subset of data quickly, taking advantage of how the data is distributed in the file.

Conclusion

If you are already using Avro or are familiar with it, most of the code and nuances related to Avro will be familiar to you. If you are not, it increases the entry barrier, as you have to learn about two different technologies, and it may not be clear what corresponds to each.

The major change compared to using only Avro is how the writer and reader objects are created, where we will have to deal with all the configuration and particularities specific to Parquet.

If I had to choose between using only Avro or Parquet with Avro, I would choose the latter, as it produces more compact files and we have the opportunity to take advantage of the columnar format.

The data I have used in the example are synthetic, and the results may vary depending on the characteristics of your data. I recommend doing tests, but unless all your values are very random, the compression rates will be high.

In environments where you write once and read multiple times, the time spent serializing should not be decisive. More important, for example, are the consumption of your storage, the file transfer time, or the processing speed (especially if you can filter the columns you access).

Despite using different compression and encoding techniques, the file processing time is quite fast. Along with its ability to work with a typed schema, this makes it a data interchange format to be considered in projects with a heavy load of data.

Featured ones: