dev-resources.site
for different kinds of informations.
OpenID Connect authentication with Apache Kafka 3.1
Dear reader, this is not going to be fun because today we're talking about security. However, to make it less boring, this is about taking advantage of the support of OpenID Connect (OIDC) in Kafka 3.1, the foundation of Confluent Platform 7.1.
OpenID Connect
Let's start with a few words about OIDC. It's an open standard that completes OAuth2.0. The aim of this paper is not to get a proper introduction to OIDC, but let's emphasize some key differences with OAuth2.0.
First of all, in OAuth2.0, the token is nothing more than an opaque string that has to be verified against the authorization server to be trusted. OIDC uses JSON Web Token. It's a signed JSON document, base64 encoded. The cool thing is, as it's signed, applications can trust it without requiring any requests to the authorization server, so it implies only processing resources, which scales way better than point-to-point connections. The only element the application (here the application is a Kafka broker) needs is the public key for validating the token, and it's published by the authorization server, with another open specification, JWKS, and is easily cacheable.
Obviously, this is an extremely incomplete summary of OIDC. What I like about it is that it frees the application from authentication method complexity. With OIDC, the organization can opt for simple user/password authentication, MFA, biometrics, SSO, multiple authentication flows and other options: it has no impact on the application as long as it complies with the standard.
Putting it together with Kafka
Here, we're keeping it simple as the use case is to make an application to application authentication. So I'm using only client id and client secret. In order to make it as light as possible, the authorization server is Auth0, a fully managed service with a free tier. To set it up, I recommend reading the section Backend/API of the documentation. Kafka is part of the listed backend, but the Configure Auth0 APIs paragraph of any kind of backend fits for this PoC. You can feel free to opt for any other authentication provider as the standard is open and there are multiple implementation alternatives, self-managed as well as as-a-service.
The support of OIDC in Kafka 3.1 is an extension of an existing feature and is defined in KIP-768. The authentication flow is pretty simple:
During startup, the broker collects the public key set from the authorization server. The client starts by authenticating against the authorization server, then the latter issues a JWT. This token is then used for the SASL/OAUTHBEARER authentication. The broker now validates the token by verifying the signature and claims to clear the client.
To make it more fun, I'm using Kafka in KRaft mode (so without Zookeeper) based on this example running in Docker provided by Confluent.
The first step is to validate the Auth0 setup, and Kafka comes with a handy command line tool:
$ docker run -ti --rm confluentinc/cp-kafka:7.1.0 kafka-run-class org.apache.kafka.tools.OAuthCompatibilityTool \
--clientId XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX \
--clientSecret XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX \
--sasl.oauthbearer.jwks.endpoint.url https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json \
--sasl.oauthbearer.token.endpoint.url https://xxxx-xxxxx.us.auth0.com/oauth/token \
--sasl.oauthbearer.expected.audience https://kafka.auth
PASSED 1/5: client configuration
PASSED 2/5: client JWT retrieval
PASSED 3/5: client JWT validation
PASSED 4/5: broker configuration
PASSED 5/5: broker JWT validation
SUCCESS
All configurations come from the authentication provider. Any other kind of output would require you to have a look at the Auth0 configuration.
Now let's tweak the broker configuration:
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.1.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,OIDC://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: $JWKS_ENDPOINT_URL
KAFKA_OPTS: -Djava.security.auth.login.config=/tmp/kafka_server_jaas.conf
KAFKA_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: $OIDC_AUD
KAFKA_LISTENER_NAME_OIDC_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
volumes:
- ./update_run.sh:/tmp/update_run.sh
- ./kafka_server_jaas.conf:/tmp/kafka_server_jaas.conf
- ./client.properties:/tmp/client.properties
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
Here are the differences with the original example:
$ diff compose.yml compose.ori.yml
14,15c14,15
< KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT'
< KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,OIDC://localhost:9092'
---
> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
> KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
25c25
< KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092'
---
> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
29,33d28
< KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
< KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: $JWKS_ENDPOINT_URL
< KAFKA_OPTS: -Djava.security.auth.login.config=/tmp/kafka_server_jaas.conf
< KAFKA_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: $OIDC_AUD
< KAFKA_LISTENER_NAME_OIDC_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
36,37d30
< - ./kafka_server_jaas.conf:/tmp/kafka_server_jaas.conf
< - ./client.properties:/tmp/client.properties
Long story short, the external listener has been renamed and configured to use SASL_PLAINTEXT with the OAUTHBEARER mechanism. Notice that the coordinates of the authorization service are provided with environment variables in order to keep it generic.
The JAAS configuration is pretty basic:
KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
};
Now let’s start the broker:
$ JWKS_ENDPOINT_URL=https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json OIDC_AUD=https://kafka.auth compose up
[+] Running 1/1
â ¿ Container broker Created 0.1s
Attaching to broker
broker | ===> User
broker | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
broker | ===> Configuring ...
broker | ===> Running preflight checks ...
broker | ===> Check if /var/lib/kafka/data is writable ...
broker | ===> Check if Zookeeper is healthy ...
broker | ignore zk-ready 40
broker | Formatting /tmp/kraft-combined-logs
broker | ===> Launching ...
broker | ===> Launching kafka ...
[...]
broker | [2022-04-15 06:35:13,095] INFO KafkaConfig values:
broker | advertised.listeners = PLAINTEXT://broker:29092,OIDC://localhost:9092
[...]
broker | listener.security.protocol.map = CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OIDC:SASL_PLAINTEXT
broker | listeners = PLAINTEXT://broker:29092,CONTROLLER://broker:29093,OIDC://0.0.0.0:9092
[...]
broker | sasl.enabled.mechanisms = [OAUTHBEARER]
[...]
broker | sasl.oauthbearer.expected.audience = [https://kafka.auth]
[...]
broker | sasl.oauthbearer.jwks.endpoint.url = https://xxxx-xxxxx.us.auth0.com/.well-known/jwks.json
[...]
broker | [2022-04-15 06:35:13,159] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)
Good stuff! Next, let's configure the client:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.login.connect.timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=https://xxxx-xxxxx.us.auth0.com/oauth/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" \
clientSecret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" ;
We're now good to go with some basic produce and consume tests. You may have noticed that I also mounted the client configuration file in the broker container, it's pure convenience to run the clients in the same container:
$ docker exec -ti broker kafka-console-producer --producer.config /tmp/client.properties --bootstrap-server localhost:9092 --topic test
>Hello OIDC!
>%
And in a different terminal:
$ docker exec -ti broker kafka-console-consumer --consumer.config /tmp/client.properties --bootstrap-server localhost:9092 --topic test
Hello OIDC!
^CProcessed a total of 1 messages
That's it!
Running the client without the proper configuration raises errors on both sides, testifying that the broker is rejecting it as expected:
$ docker exec -ti broker kafka-console-consumer --bootstrap-server localhost:9092 --topic test
[2022-04-15 06:57:28,564] WARN [Consumer clientId=console-consumer, groupId=console-consumer-9357] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
broker | [2022-04-15 06:57:28,247] INFO [SocketServer listenerType=BROKER, nodeId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
If you increase the level of the org.apache.kafka.common.security
logger, you'll be able to see the token parsed.
For reference, I also recommend reading the SASL/OAUTHBEARER documentation.
Featured ones: