Logo

dev-resources.site

for different kinds of informations.

Java serialization with Avro

Published at
12/5/2022
Categories
avro
java
serialization
bigdata
Author
jerolba
Categories
4 categories in total
avro
open
java
open
serialization
open
bigdata
open
Author
7 person written this
jerolba
open
Java serialization with Avro

In previous posts I've analyzed Protocol Buffers and FlatBuffers, using JSON as the baseline. In this post, I will analyze Apache Avro and compare it with the previously studied formats.

Apache Avro was developed as a component of the Apache Hadoop project and was released in 2009 under the Apache 2.0 license.

Avro should not be confused with Parquet. Often when searching for documentation on Parquet, you end up reading about Avro, confusing them. Although the Avro library is useful for generating Parquet files and both are widely used in the world of Big Data, the formats have no relation to each other.

As Protocol Buffers and Flat Buffers, you can predefine a schema (in JSON), and generates binary content that is not human-readable. It also has support for multiple languages.

It is not only used to persist large batches of data. For example, Avro is widely used in Kafka to serialize the information in its messages.

Because the format or schema can be included in the serialized data, code generation is optional, which makes it easy to build systems that process their files generically. It provides the flexibility of JSON, but with a more compact and efficient format.

I'm going to explore both approaches: code generation from the schema and generated programmatically.


IDL and code generation

The file with the equivalent schema to the example in the previous articles would be this one:

{
  "name": "Organization",
  "type": "record",
  "namespace": "com.jerolba.xbuffers.avro",
  "fields": [
    { "name": "name", "type": "string" }, 
    { "name": "category", "type": "string"}, 
    { "name": "organizationType",
      "type": {
        "type": "enum",
        "name": "OrganizationType",
        "symbols": ["FOO", "BAR", "BAZ"]
      }
    }, 
    { "name": "country", "type": "string" }, 
    { "name": "attributes",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "Attribute",
          "fields": [
            { "name": "id", "type": "string" }, 
            { "name": "quantity", "type": "int"}, 
            { "name": "amount", "type": "int"}, 
            { "name": "size", "type": "int"}, 
            { "name": "percent", "type": "double"}, 
            { "name": "active", "type": "boolean"}
          ]
        }
      }
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

To generate all Java classes you need to download avro-tools,
and run it with parameters that reference where the IDL file is located and the target path of the generated files:

 java -jar avro-tools-1.11.1.jar compile schema ./src/main/resources/organizations.avsc ./src/main/java/
Enter fullscreen mode Exit fullscreen mode

Or directly using Docker with an image ready to execute the command:

docker run --rm -v $(pwd)/src:/avro/src kpnnv/avro-tools:1.11.1 compile schema /avro/src/main/resources/organizations.avsc /avro/src/main/java/
Enter fullscreen mode Exit fullscreen mode

Using generated classes

Serialization

Like Protocol Buffers, Avro does not directly serialize your POJOs, and you need to copy the information to the objects generated by the schema compiler.

But in this case, if you are persisting a collection of objects you don't need to have all them in memory because you can serialize objects one by one as you instantiate them.

The code needed to serialize the information starting from the POJOs would look like this:

DatumWriter<Organization> datumWriter = new SpecificDatumWriter<>(Organization.class);
var dataFileWriter = new DataFileWriter<>(datumWriter);
try (var os = new FileOutputStream("/tmp/organizations.avro")) {
  dataFileWriter.create(new Organization().getSchema(), os);
  for (var org : organizations) {
    List<Attribute> attrs = org.attributes().stream()
      .map(a -> Attribute.newBuilder()
        .setId(a.id())
        .setQuantity(a.quantity())
        .setAmount(a.amount())
        .setSize(a.size())
        .setPercent(a.percent())
        .setActive(a.active())
      .build())
    .toList();
    Organization organization = Organization.newBuilder()
      .setName(org.name())
      .setCategory(org.category())
      .setCountry(org.country())
      .setOrganizationType( OrganizationType.valueOf(org.type().name()))
      .setAttributes(attrs)
      .build();
    dataFileWriter.append(organization);
  }
  dataFileWriter.close();
}
Enter fullscreen mode Exit fullscreen mode

Instead of converting the whole collection and then persisting it, we can convert and persist each Organization one by one.

  • Serialization time: 5,409 ms
  • File size: 846 MB
  • Compressed file size: 530 MB
  • Memory required: because we are directly serializing to OutputStream, it does not consume anything other than the necessary internal IO buffers (and original objects)
  • Library size (avro-1.11.1.jar + dependencies): 3,552,326 bytes
  • Size of generated classes: 37,283 bytes

Deserialization

Due to the internal representation of the data, Avro needs to move around the file parsing the data and you need to provide a seekable InputStream or directly a File. For example, you can not use directly an InputStream from an HTTP request.

With few lines you can read and process the whole object graph:

File file = new File("/tmp/organizations.avro");
DatumReader<Organization> datumReader = new SpecificDatumReader<>(Organization.class);
List<Organization> organizations = new ArrayList<>();
try (var dataFileReader = new DataFileReader<>(file, datumReader)) {
  while (dataFileReader.hasNext()) {
    organizations.add(dataFileReader.next());
  }
}
Enter fullscreen mode Exit fullscreen mode

The objects are instances of the classes generated from the schema, not the original records. Because we are iterating a reader, we can transform each instance into our representation if necessary, without having both representations repeated in memory.

  • Deserialization time: 8,197 ms
  • Memory required: reconstructing all the object structures defined by the schema takes up 2,520 MB

Using Generic Record

In Avro, you can have the schema embedded in the binary file, and it allows you to read a serialized record without needing to know or agree on the schema in advance. This enables us to deserialize and know the contents of any file, without requiring code generation

You can define the schema at runtime and decide, based on your needs, the fields and structure of the serialized information.

Serialization

Instead of copying the data to the generated classes, you can do it through an Avro GenericRecord, which behaves like a Map. First, you need to define the Avro schema using code (or load it from a static JSON file):

Schema attrSchema = SchemaBuilder.record("Attribute")
    .fields()
    .requiredString("id")
    .requiredInt("quantity")
    .requiredInt("amount")
    .requiredInt("size")
    .requiredDouble("percent")
    .requiredBoolean("active")
    .endRecord();
var enumSymbols = Stream.of(Type.values()).map(Type::name)
    .toArray(String[]::new);
Schema orgsSchema = SchemaBuilder.record("Organizations")
    .fields()
    .requiredString("name")
    .requiredString("category")
    .requiredString("country")
    .name("organizationType").type().enumeration("organizationType")
                             .symbols(enumSymbols).noDefault()
    .name("attributes").type().array().items(attrSchema).noDefault()
    .endRecord();
//Auxiliar Map to encode Enums
var typeField = orgsSchema.getField("organizationType").schema();
EnumMap<Type, EnumSymbol> enums = new EnumMap<>(Type.class);
enums.put(Type.BAR, new EnumSymbol(typeField, Type.BAR));
enums.put(Type.BAZ, new EnumSymbol(typeField, Type.BAZ));
enums.put(Type.FOO, new EnumSymbol(typeField, Type.FOO));
Enter fullscreen mode Exit fullscreen mode

And the code necessary to serialize the collection would look like this:

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(orgsSchema);
var dataFileWriter = new DataFileWriter<>(datumWriter);
try (var os = new FileOutputStream("/tmp/organizations.avro")) {
  dataFileWriter.create(orgsSchema, os);
  for (var org : organizations) {
    List<GenericRecord> attrs = new ArrayList<>();
    for (var attr : org.attributes()) {
      GenericRecord attrRecord = new GenericData.Record(attrSchema);
      attrRecord.put("id", attr.id());
      attrRecord.put("quantity", attr.quantity());
      attrRecord.put("amount", attr.amount());
      attrRecord.put("size", attr.size());
      attrRecord.put("percent", attr.percent());
      attrRecord.put("active", attr.active());
      attrs.add(attrRecord);
    }
    GenericRecord orgRecord = new GenericData.Record(orgsSchema);
    orgRecord.put("name", org.name());
    orgRecord.put("category", org.category());
    orgRecord.put("country", org.country());
    orgRecord.put("organizationType", enums.get(org.type()));
    orgRecord.put("attributes", attrs);
    dataFileWriter.append(orgRecord);
  }
  dataFileWriter.close();
}
Enter fullscreen mode Exit fullscreen mode

Because we are using the same Schema, and we are only changing how we serialize the data, the file size and memory used are the same.

The serialization time grows to 5,903 ms, 10% more than using generated code. The implementation of GenericRecord introduces a slight overhead.

Deserialization

Using a different Reader, the result of the deserialized data is again an untyped GenericRecord object. In this case, we need to convert each instance to the original data structure, mapping each type:

List<Org> organizations = new ArrayList<>();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (var dataFileReader = new DataFileReader<>(file, datumReader)) {
  while (dataFileReader.hasNext()) {
    GenericRecord record = dataFileReader.next();
    List<GenericRecord> attrsRecords = (List<GenericRecord>) record.get("attributes");
    var attrs = attrsRecords.stream().map(attr -> new Attr(attr.get("id").toString(),
      ((Integer) attr.get("quantity")).byteValue(),
      ((Integer) attr.get("amount")).byteValue(),
      (boolean) attr.get("active"),
      (double) attr.get("percent"),
      ((Integer) attr.get("size")).shortValue())).toList();
    Utf8 name = (Utf8) record.get("name");
    Utf8 category = (Utf8) record.get("category");
    Utf8 country = (Utf8) record.get("country");
    Type type = Type.valueOf(record.get("organizationType").toString());
    organizations.add(new Org(name.toString(), category.toString(), country.toString(), type, attrs));
  }
}
Enter fullscreen mode Exit fullscreen mode

The code is verbose and is plenty of castings. Avro does not support the byte and short types, and they are persisted as int, so we need to downcast their values. As an optimization, Strings are created with an internal representation called Utf8.

The deserialization time grows to 8,471 ms, around 5% of overhead compared to the static code version.


Using optimized Generic Record

If you inspect the implementation of the GenericRecord class, you can see that get(String key) and put(String key, Object value) access by that key to a Map to get the index in an array. Since each attribute will always be in the same position when reading the file, we can access it only once and reuse its value with a variable, improving the execution time.

Serialization

Because you need to store the index of each field in the Schema, the code is even more verbose. After creating the Schema, the code is:

int idPos = attrSchema.getField("id").pos();
int quantityPos = attrSchema.getField("quantity").pos();
int amountPos = attrSchema.getField("amount").pos();
int activePos = attrSchema.getField("active").pos();
int percentPos = attrSchema.getField("percent").pos();
int sizePos = attrSchema.getField("size").pos();
int namePos = orgsSchema.getField("name").pos();
int categoryPos = orgsSchema.getField("category").pos();
int countryPos = orgsSchema.getField("country").pos();
int organizationTypePos = orgsSchema.getField("organizationType").pos();
int attributesPos = orgsSchema.getField("attributes").pos();

try (var os = new FileOutputStream("/tmp/organizations.avro")) {
  DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(orgsSchema);
  DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
  dataFileWriter.create(orgsSchema, os);
  for (var org : organizations) {
    List<GenericRecord> attrs = new ArrayList<>();
    for (var attr : org.attributes()) {
      GenericRecord attrRecord = new GenericData.Record(attrSchema);
      attrRecord.put(idPos, attr.id());
      attrRecord.put(quantityPos, attr.quantity());
      attrRecord.put(amountPos, attr.amount());
      attrRecord.put(sizePos, attr.size());
      attrRecord.put(percentPos, attr.percent());
      attrRecord.put(activePos, attr.active());
      attrs.add(attrRecord);
    }
    GenericRecord orgRecord = new GenericData.Record(orgsSchema);
    orgRecord.put(namePos, org.name());
    orgRecord.put(categoryPos, org.category());
    orgRecord.put(countryPos, org.country());
    orgRecord.put(organizationTypePos, enums.get(org.type()));
    orgRecord.put(attributesPos, attrs);
    dataFileWriter.append(orgRecord);
  }
  dataFileWriter.close();
}
Enter fullscreen mode Exit fullscreen mode

The serialization time is 5,381 ms, very close to the time used in the version with generated code..

Deserialization

The code is very similar, we only add variables to know the position of each field:

List<Org> organizations = new ArrayList<>();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader)) {
  Schema attributes = dataFileReader.getSchema().getField("attributes").schema().getElementType();
  int idPos = attributes.getField("id").pos();
  int quantityPos = attributes.getField("quantity").pos();
  int amountPos = attributes.getField("amount").pos();
  int activePos = attributes.getField("active").pos();
  int percentPos = attributes.getField("percent").pos();
  int sizePos = attributes.getField("size").pos();
  Schema orgs = dataFileReader.getSchema();
  int namePos = orgs.getField("name").pos();
  int categoryPos = orgs.getField("category").pos();
  int countryPos = orgs.getField("country").pos();
  int organizationTypePos = orgs.getField("organizationType").pos();
  while (dataFileReader.hasNext()) {
    GenericRecord record = dataFileReader.next();
    List<GenericRecord> attrsRecords = (List<GenericRecord>) record.get("attributes");
    var attrs = attrsRecords.stream().map(attr -> new Attr(attr.get(idPos).toString(),
      ((Integer) attr.get(quantityPos)).byteValue(),
      ((Integer) attr.get(amountPos)).byteValue(),
      (boolean) attr.get(activePos),
      (double) attr.get(percentPos),
      ((Integer) attr.get(sizePos)).shortValue())).toList();
    Utf8 name = (Utf8) record.get(namePos);
    Utf8 category = (Utf8) record.get(categoryPos);
    Utf8 country = (Utf8) record.get(countryPos);
    Type type = Type.valueOf(record.get(organizationTypePos).toString());
    organizations.add(new Org(name.toString(), category.toString(), country.toString(), type, attrs));
  }
}
Enter fullscreen mode Exit fullscreen mode

The deserialization time drops to 7,353 ms, around 10% faster than the generated code version. Why? I don't have enough knowledge of its internals to venture an answer, but I was surprised by the result.

Avro Summary

Generated Code Generic Record Optimized
Generic Record
Serialization time 5,409 ms 5,903 ms 5,381 ms
Deserialization time 8,197 ms 8,471 ms 7,353 ms

Using GenericRecord allows us to gain some flexibility in the process without losing performance, but it makes the code much more verbose and prone to errors due to the manual mapping of fields..

The included dependencies are the same in all cases, and we can only save the generation of the source code.

Analysis and impressions

JSON Protocol Buffers FlatBuffers Avro
Serialization time 11,718 ms 5,823 ms 3,803 ms 5,409 ms
File size 2,457 MB 1,044 MB 600 MB 846 MB
GZ file size 525 MB 448 MB 414 MB 530 MB
Memory serializing N/A 1.29 GB 0.6 GB - 1 GB N/A
Deserialization time 20,410 ms 4,535 ms 202 - 1,876 ms 8,197 ms
Memory deserialization 2,193 MB 2,710 MB 0 - 600 MB 2,520 MB
JAR library size 1,910 KB 1,636 KB 64 KB 3,469 KB
Size of generated classes N/A 40 KB 9 KB 36 KB
  • If we don't consider the optimizations applied in FlatBuffers, the Avro file takes up less space.
  • In the example, although all fields are required, you can easily make them nullable (consuming slightly more space).
  • For me, its main strength is that it doesn't require having all the data in memory to serialize the information. For example, you can read it from a database or a file, transform or enrich it according to your logic, and at the same time persist it in some type of OutputStream (created from a file or HTTP connection).
  • The possibility of defining a schema programmatically gives us the option of modifying the output format depending on the business logic and creating our own serialization tooling.
  • Avro is halfway between JSON and binary formats like Protocol Buffers and Flatbuffers:
    • Avro supports flexible schema
    • It is not necessary to know or agree on the schema beforehand to be able to read an Avro file
    • In JSON you can not get the schema from the file itself (without fully parsing it first)
    • Avro is more compact than JSON (but not human-readable)
    • Serialization and deserialization time is faster than JSON
    • You can easily serialize/deserialize all objects in a loop/stream without needing to have it all in memory

Featured ones: