Logo

dev-resources.site

for different kinds of informations.

Multiplos schemas no mesmo tópico Kafka na linha de comando

Published at
1/4/2024
Categories
kafka
schema
schemaregistry
avro
Author
adrianoavelino
Categories
4 categories in total
kafka
open
schema
open
schemaregistry
open
avro
open
Author
14 person written this
adrianoavelino
open
Multiplos schemas no mesmo tópico Kafka na linha de comando

Recentemente, tive a oportunidade de trabalhar em um projeto que utiliza três tipos distintos de eventos (schemas) no mesmo tópico do Kafka. Até então, só havia utilizado um único schema Avro por tópico. Isso só foi possível devido ao uso da estratégia de nome do subject (Subject name strategy) no Schema Registry.

Teoria

Um subject no Schema Registry é uma coleção de schemas associados a um tópico ou a um schema específico usado na serialização e desserialização de dados em um tópico do Kafka. Em termos simples, um "subject" é uma chave sob a qual os schemas são registrados no Schema Registry. Como exemplos de nomes de subjects podemos citar: transacoes-value e transacoes-key, repectivamente relacionados ao valor e a chave de identificação de um evento enviado a um tópico Kafka.

O Schema Registry suporta diversos tipos de schemas, mas estaremos abordando a utilização de schemas avro, por exemplo:



{
  "type": "record",
  "name": "Transacao",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "valor", "type": "double"},
    {"name": "data", "type": "string"},
    {"name": "tipo", "type": "string"},
    {"name": "conta_origem", "type": "string"},
    {"name": "conta_destino", "type": "string"},
    {"name": "descricao", "type": "string"}
  ]
}


Enter fullscreen mode Exit fullscreen mode


{
  "type": "record",
  "name": "DetalhesConta",
  "fields": [
    {"name": "conta_id", "type": "string"},
    {"name": "titular", "type": "string"},
    {"name": "saldo", "type": "double"},
    {"name": "tipo_conta", "type": "string"},
    {"name": "agencia", "type": "string"},
  ]
}


Enter fullscreen mode Exit fullscreen mode

Mais informações relacionadas a arquivos avro podem ser encontradas na documentação do Apache Avro.

Seguindo a estratégia de nome de subject padrão, o nome do subject num tópico chamado transacoes seria: transacoes-value.

O uso de mais de um schema no mesmo tópico Kafka é possível através do uso da estratégia de nome do subject (Subject name strategy). Existem três opções: TopicNameStrategy, RecordNameStrategy e TopicRecordNameStrategy.

O TopicNameStrategy é o padrão e gera o nome do subject com base no nome do tópico seguido do caractere -, mais a palavra key ou value. No RecordNameStrategy o nome do subject é baseado no nome do record do schema avro. Já o TopicRecordNameStrategy, o nome subject é baseado no nome do tópico seguido do caractere -, mais o nome do record do schema avro. Veja a tabela abaixo com exemplos dos nomes gerados num tópico chamado transacoes e nome do record br.com.DetalhesConta:

Nome da estratégia do Subject Exemplo Subject gerado
TopicNameStrategy transacoes-value ou transacoes-key
RecordNameStrategy DetalhesConta
TopicRecordNameStrategy transacoes-DetalhesConta

Seguindo essa lógica fica fácil entender porque não conseguimos utilizar mais de um schema utilizando a estratégia padrão (TopicNameStrategy). A formação do nome do subject é a mesma, tanto para um schema Transacao ou DetalhesConta, por exemplo. Ao cadastrar os schemas, ambos geram o mesmo nome do subject: transacoes-value.

O problema

Vamos à prática, executando o ambiente Kafka no docker-compose.yml com o seguinte conteúdo:



version: '3.7'

services:
  fast-data-dev:
    image: landoop/fast-data-dev:3.3
    ports:
      - "2181:2181"
      - "3030:3030"
      - "8081-8083:8081-8083"
      - "9581-9585:9581-9585"
      - "9092:9092"
    environment:
      ADV_HOST: 127.0.0.1


Enter fullscreen mode Exit fullscreen mode

E executando o comando abaixo:



docker-compose up -d


Enter fullscreen mode Exit fullscreen mode

Agora vamos acessar o bash do container:



docker-compose exec fast-data-dev bash


Enter fullscreen mode Exit fullscreen mode

Dentro do container podemos criar um producer que por padrão cria os subjects e os schemas de forma automática, primeiramente criamos um producer para o schema Transacoes:



kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic transacoes \
--property value.schema='{ "type": "record", "name": "Transacao", "fields": [ {"name": "id", "type": "string"}, {"name": "valor", "type": "double"}, {"name": "data", "type": "string"}, {"name": "tipo", "type": "string"}, {"name": "conta_origem", "type": "string"}, {"name": "conta_destino", "type": "string"}, {"name": "descricao", "type": "string"} ] }' \
--property schema.registry.url=http://localhost:8081


Enter fullscreen mode Exit fullscreen mode

Logo após, colamos o seguinte código:



{ "id": "123456789", "valor": 500.0, "data": "2023-01-01", "tipo": "transferencia", "conta_origem": "987654321", "conta_destino": "567890123", "descricao": "Transferência entre contas" }


Enter fullscreen mode Exit fullscreen mode

Com isso podemos visualizar os subjects cadastrados através da interface gráfica do container landoop, através do terminal utilizando o curl ou qualquer outra aplicação como o Postman ou Insomnia. Para acessar via interface gráfica do landoop, acesse o endereço http://localhost:3030/schema-registry-ui/#/ e filtre pelo nome do subject transacoes-value, conforme a imagem abaixo:

Print da interface gráfica do landoop filtrando pelo termo transacoes-value

Se preferir também pode utilizar o curl em um novo terminal:



curl -X GET http://localhost:8081/subjects


Enter fullscreen mode Exit fullscreen mode

O resultado deve ser algo semelhante a imagem abaixo:

Print com o exemplo da resposta para uma requisição na API do schema registry para listar os subjects registrados

Obs: a reposta deve conter diversos subjects, mas vamos focar somente no subject transacoes-value para realizar a validação.

Vamos ver o que acontece ao criarmos mais um producer, mas para o schema DetalhesConta. Como comentado anteriormente, por padrão, o producer cria os subjects e os schemas de forma automática. O nosso objetivo ao utilizar um segundo producer é adicionar mais um schema no mesmo tópico e para isso primeiro abrimos um novo terminal e iniciamos o bash:



docker-compose exec fast-data-dev bash


Enter fullscreen mode Exit fullscreen mode

Agora iniciamos o nosso producer:



kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic transacoes \
--property value.schema='{ "type": "record", "name": "DetalhesConta", "fields": [ {"name": "conta_id", "type": "string"}, {"name": "titular", "type": "string"}, {"name": "saldo", "type": "double"}, {"name": "tipo_conta", "type": "string"}, {"name": "agencia", "type": "string"} ] }' \
--property schema.registry.url=http://localhost:8081


Enter fullscreen mode Exit fullscreen mode

Logo após, colamos o seguinte evento para o schema DetalhesConta:



{ "conta_id": "987654321", "titular": "Joaquim", "saldo": 1000.0, "tipo_conta": "corrente", "agencia": "123" }


Enter fullscreen mode Exit fullscreen mode

Devemos receber uma mensagem de erro semelhante ao erro abaixo:



org.apache.kafka.common.errors.InvalidConfigurationException: Schema being registered is incompatible with an earlier schema for subject "transacoes-value", 
details: [Incompatibility{type:NAME_MISMATCH, location:/name, message:expected: Transacao, reader:{"type":"record","name":"DetalhesConta","fields":
[{"name":"conta_id","type":"string"},{"name":"titular","type":"string"},{"name":"saldo","type":"double"},{"name":"tipo_conta","type":"string"},
{"name":"agencia","type":"string"}]}, writer:{"type":"record","name":"Transacao","fields":[{"name":"id","type":"string"},{"name":"valor","type":"double"},
{"name":"data","type":"string"},{"name":"tipo","type":"string"},{"name":"conta_origem","type":"string"},{"name":"conta_destino","type":"string"},
{"name":"descricao","type":"string"}]}}, Incompatibility{type:READER_FIELD_MISSING_DEFAULT_VALUE, location:/fields/0, message:conta_id, reader:{"type":"record",
"name":"DetalhesConta","fields":[{"name":"conta_id","type":"string"},{"name":"titular","type":"string"},{"name":"saldo","type":"double"},{"name":"tipo_conta",
"type":"string"},{"name":"agencia","type":"string"}]}, writer:{"type":"record","name":"Transacao","fields":[{"name":"id","type":"string"},{"name":"valor",
"type":"double"},{"name":"data","type":"string"},{"name":"tipo","type":"string"},{"name":"conta_origem","type":"string"},{"name":"conta_destino",
"type":"string"},{"name":"descricao","type":"string"}]}}, Incompatibility{type:READER_FIELD_MISSING_DEFAULT_VALUE, location:/fields/1, message:titular, reader:
{"type":"record","name":"DetalhesConta","fields":[{"name":"conta_id","type":"string"},{"name":"titular","type":"string"},{"name":"saldo","type":"double"},
{"name":"tipo_conta","type":"string"},{"name":"agencia","type":"string"}]}, writer:{"type":"record","name":"Transacao","fields":[{"name":"id","type":"string"},
{"name":"valor","type":"double"},{"name":"data","type":"string"},{"name":"tipo","type":"string"},{"name":"conta_origem","type":"string"},
{"name":"conta_destino","type":"string"},{"name":"descricao","type":"string"}]}}, Incompatibility{type:READER_FIELD_MISSING_DEFAULT_VALUE, location:/fields/2, 
message:saldo, reader:{"type":"record","name":"DetalhesConta","fields":[{"name":"conta_id","type":"string"},{"name":"titular","type":"string"},{"name":"saldo",
"type":"double"},{"name":"tipo_conta","type":"string"},{"name":"agencia","type":"string"}]}, writer:{"type":"record","name":"Transacao","fields":[{"name":"id",
"type":"string"},{"name":"valor","type":"double"},{"name":"data","type":"string"},{"name":"tipo","type":"string"},{"name":"conta_origem","type":"string"},
{"name":"conta_destino","type":"string"},{"name":"descricao","type":"string"}]}}, Incompatibility{type:READER_FIELD_MISSING_DEFAULT_VALUE, location:/fields/3, 
message:tipo_conta, reader:{"type":"record","name":"DetalhesConta","fields":[{"name":"conta_id","type":"string"},{"name":"titular","type":"string"},
{"name":"saldo","type":"double"},{"name":"tipo_conta","type":"string"},{"name":"agencia","type":"string"}]}, writer:{"type":"record","name":"Transacao",
"fields":[{"name":"id","type":"string"},{"name":"valor","type":"double"},{"name":"data","type":"string"},{"name":"tipo","type":"string"},{"name":"conta_origem",
"type":"string"},{"name":"conta_destino","type":"string"},{"name":"descricao","type":"string"}]}}, Incompatibility{type:READER_FIELD_MISSING_DEFAULT_VALUE, 
location:/fields/4, message:agencia, reader:{"type":"record","name":"DetalhesConta","fields":[{"name":"conta_id","type":"string"},{"name":"titular",
"type":"string"},{"name":"saldo","type":"double"},{"name":"tipo_conta","type":"string"},{"name":"agencia","type":"string"}]}, writer:{"type":"record",
"name":"Transacao","fields":[{"name":"id","type":"string"},{"name":"valor","type":"double"},{"name":"data","type":"string"},{"name":"tipo","type":"string"},
{"name":"conta_origem","type":"string"},{"name":"conta_destino","type":"string"},{"name":"descricao","type":"string"}]}}]; error code: 409


Enter fullscreen mode Exit fullscreen mode

A mensagem de erro é extensa, porém, resumidamente, o problema ocorreu ao tentar registrar automaticamente um novo schema, seguindo a estratégia de nome TopicNameStrategy que gera um subject chamado transacoes-value. Como este subject já foi registrado anteriormente, foi realizada uma tentativa de atualizar um schema tentado adicionar campos obrigatórios gerando erros de incompatibilidade. Esse erro está relacionado aos tipos de compatibilidade dos schemas, onde o padrão é BACKWARD. Esse padrão permite a exclusão de campos e a adição de campos opcionais. Mais informações sobre compatibilidade dos schemas podem ser encontradas na documentação sobre Compatibility types.

Dica: para finalizar o producer em qualquer um dos terminais abertos, basta digitar Ctrl + C

Solução

Para resolver esse problema devemos utilizar a estratégia de nome de subject TopicRecordNameStrategy ou RecordNameStrategy. Aqui vamos utilizar a estratégia TopicRecordNameStrategy, mas antes vamos recriar a nossa infraestrutura seguindo alguns passos:

Obs: esses passos não são obrigatórios, mas para mantermos o ambiente sem estados iremos recriar a infraestrutura

Desligue o container:



docker-compose down -v


Enter fullscreen mode Exit fullscreen mode

Recrie a infraestrutura:



docker-compose up -d


Enter fullscreen mode Exit fullscreen mode

Agora vamos registrar dois schemas para o mesmo tópico da seguinte forma:
Acesse o bash do container:



docker-compose exec fast-data-dev bash


Enter fullscreen mode Exit fullscreen mode

Crie um producer para o schema Transacoes:



kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic transacoes \
--property value.schema='{ "type": "record", "name": "Transacao", "fields": [ {"name": "id", "type": "string"}, {"name": "valor", "type": "double"}, {"name": "data", "type": "string"}, {"name": "tipo", "type": "string"}, {"name": "conta_origem", "type": "string"}, {"name": "conta_destino", "type": "string"}, {"name": "descricao", "type": "string"} ] }' \
--property schema.registry.url=http://localhost:8081 \
--property value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy


Enter fullscreen mode Exit fullscreen mode

Agora cole o seguinte evento:



{ "id": "123456789", "valor": 500.0, "data": "2023-01-01", "tipo": "transferencia", "conta_origem": "987654321", "conta_destino": "567890123", "descricao": "Transferência entre contas" }


Enter fullscreen mode Exit fullscreen mode

Agora vamos criar um producer para o schema DetalhesConta, primeiro abrimos um novo terminal e inciamos um bash para o container:



docker-compose exec fast-data-dev bash


Enter fullscreen mode Exit fullscreen mode

Agora iniciamos o nosso producer:



kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic transacoes \
--property value.schema='{ "type": "record", "name": "DetalhesConta", "fields": [ {"name": "conta_id", "type": "string"}, {"name": "titular", "type": "string"}, {"name": "saldo", "type": "double"}, {"name": "tipo_conta", "type": "string"}, {"name": "agencia", "type": "string"} ] }' \
--property schema.registry.url=http://localhost:8081 \
--property value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy


Enter fullscreen mode Exit fullscreen mode

Logo após, colamos o seguinte evento para o schema DetalhesConta:



{ "conta_id": "987654321", "titular": "Joaquim", "saldo": 1000.0, "tipo_conta": "corrente", "agencia": "123" }


Enter fullscreen mode Exit fullscreen mode

Com isso podemos validar os subjects na interface gráfica do landoop e pesquisar pela palavra transacoes:

Print da interface gráfica do landoop pesquisando pelos schemas que contém a palavra transacoes

Ou através do curl com o seguinte comando:



curl -X GET http://localhost:3030/schema-registry-ui/#/subjects


Enter fullscreen mode Exit fullscreen mode

Ao analisar a resposta do comando acima podemos identificar que foram criados 2 subjects para o tópico transacoes: transacoes-Transacao e transacoes-DetalhesConta.

Também podemos visualizar os eventos recebidos no tópico Kafka usando a interface gráfica do landoop:

Print da interface gráfica do landoop listando os eventos do tópico transacoes

Ou através de um consumer na linha de comando, num outro terminal, após iniciar o bash com o comando docker-compose exec fast-data-dev bash:



kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic transacoes \
--from-beginning \
--property schema.registry.url=http://localhost:8081


Enter fullscreen mode Exit fullscreen mode

Ao finalizar o passo a passo será possível registrar dois schemas no mesmo tópico utilizando o kafka-avro-console-producer, de forma automática, através da estratégia de nome de subject TopicRecordNameStrategy para não sofrer com problemas de conflito de subjects no mesmo tópico. Você pode encontrar um exemplo dessa implantação em Multi schemas in one Kafka topic utilizando a linguagem Java ou outro exemplo em python

Referências

schema Article's
30 articles in total
Favicon
Schema Markup can boost your click-through rates by up to 30%?
Favicon
Custom schema specific Supabase Server Component clients in Grida Form workspace
Favicon
Zod for TypeScript Schema Validation: A Comprehensive Guide
Favicon
Database schema design of Splitwise application
Favicon
Validating JSON Schema with Fixed and User-Defined Keys in Python
Favicon
Using Zod's z.union: An Important Pitfall to Avoid
Favicon
Desvendando o Atlas: Gerencie Seus Esquemas de Banco de Dados com Estilo🚀
Favicon
Customize Schema with @extend_schema_view
Favicon
Hotel reservation Schema design (PostgreSQL)
Favicon
How to Insert Data into Specific Table of PostgreSQL Schemas with Node.js
Favicon
Typesafe Supabase Flutter Queries
Favicon
Using yup to build schema with value parsing and validation.
Favicon
Designing an Optimal Database Schema for a Followers-Following System in a Blog-Post App
Favicon
JobPosting from Schema.org
Favicon
Unlocking the Power of Mongoose Schemas: Enhancements for Better Data Handling
Favicon
Unraveling the Power of Schema Markup: Elevating Your Website's Visibility and Performance
Favicon
How to handle complex json schema
Favicon
Navigating Django Schemas with Ease: How Django-schema-viewer Helps Developers
Favicon
🐣Your First Database Schema Change in 5 Minutes with Bytebase
Favicon
Multiplos schemas no mesmo tópico Kafka na linha de comando
Favicon
How pgroll works under the hood
Favicon
Automating API Documentation: A Journey from TypeScript to OpenAPI and Schema Governence with Optic
Favicon
Master schema validation in TypeScript with Zod
Favicon
🚛 Deploy Database Schema Migrations with Bytebase
Favicon
Random Data Generator Website
Favicon
Xata's JSON Column Type
Favicon
Introducing pgroll: Zero-downtime, Reversible, Schema Migrations for Postgres
Favicon
Designing Your Database Schema
Favicon
WHAT ARE THE TYPES OF SCHEMA MARKEUP?
Favicon
Iceberg Schema Evolution in Trino

Featured ones: