Skip to main content

New app streaming via Kafka?

If you've recently created a new app that is going to be using our Kafka platform, or if your existing app will start streaming through our Kafka platform, you've come to the right place.

One Kafka user per application

Our policy is generally to create one Kafka user per application, and to grant this principal (i.e. Kafka user) its required permissions. For instance, if an app my-app needs to produce to a topic my-topic, a Kafka user with ACLs granting the Kafka user permission to write messages to this topic's stream, as well as to write schemas to the schema registry for this topic, would be needed.

In order to create a Kafka user, you may send a request to Team Dataplattform in #talk-data on Slack. The minimum information we require to create a user is the application name, and preferably the topic(s) the user requires permissions for, along with which permissions (i.e. produce/consume). This will then be used to create a user internally in one of our infrastructure repositories (using Aiven's terraform registry, so that we have this defined as code).

Once you have attained a user from us, you may also create topics in a self-served manner, should you require to do so (which will then be owner and managed by your team) as described in Kafka-admin. Further, you may then grant the required access permissions for this topic to your application's Kafka user (e.g. my-app).

How to get started

Once you have credentials for a Kafka user with correct access permissions, the next step would be to start streaming via our Kafka platform. Note that the credentials should be secured sufficiently as described by Team Sikkerhet, such as by using Google KMS or in the application project's Kubernetes secrets.

Libraries

Team Dataplattform has written and maintains a Spring-boot client library, which may be found here: Entur Kafka spring-starter

This client library also documents what we recommend as common and sensible client parameters, and as such may be consulted when setting up clients using different libraries, should you require to do so.

Schema - best practices

For designing and using Avro schemas, please refer to Using Avro schemas and Desining a schema

Remember that there is one schema for the key, and one for the value, in a record. A common usage is to use a String for the key, and an Avro schema for the vaue of a message. These are then used for serialization of the message (i.e. the Record containing the key and the value). Remember that the key is used for assigning a message to a partition (you may think of this as a hash-function over the key), and for log compaction - it is not used for locating a message. The location would be the offset and partition.

Common build.gradle.kts config

As described in the Readme of the repo Entur Kafka spring-starter:

    plugins {
//Not strictly needed by the starter, but you will need something like this if using avro schemas
id("com.github.davidmc24.gradle.plugin.avro") version "1.3.0"
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.entur.data:entur-kafka-spring-starter:1.0.2")
}

Error handling

You may wish to consider exception handling and/or logging as well as writing to a topic's DLQ-topic upon client failure, depending on the criticality of the application and the streaming. When your client fails to consume or handle a message from a topic, one default designed behaviour is to retry a set number of times, and then to optionally put the message that could not be consumed on a DLT-topic. Note that this should not be expected for the vast majority of messages, and so you will probably only need one or very few partitions for a DLT topic (unless you need to maintain strict ordering also wrt the original topic and partitions).

For more information on error handling, as well as examples on how this may be done, please consult the Spring-starter doc here

Testing

In order to test the integration, you may supply the Kafka user credentials locally, and try to produce and/or consume messages in our test clusters (which are set up for this purpose). Under normal circumstances, you would want to write an Avro schema and put this in an avro/-directory. This allows building Avro-generated classes, which are then used for serialisation and deserialisation (see the section above for a plugin you may use to build Avro-generated classes).

Yet another excerpt from the spring-starter repo's documentation, a minimal configuration of a client using the spring-starter, i.e. a client config in yaml-format, would look like this:

entur:
kafka:
kafkaCluster: "TEST_INT"
sasl:
username: "entur"
password: "entur-password"
consumer:
group: "test"

More generally speaking, when instantiating a client, it requires you to provide a bootstrap broker URL (for connecting to the Kafka cluster), a set of credentials (here SASL and basic auth credentials, i.e. a username and password), and a consumer group. It may be worth reminding the reader that the consumer group is used to assign partitions among a set of consumers - i.e. the consumers within a consumer groups. This results in that each consumer client within a consumer group only reads a subset of the messages (i.e. from its assigned set of partitions). For more information regarding consumer groups, please se consumer groups.

Application (unit and integration) tests

Your Kafka client should when configured correctly function out of the box, and the Spring-starter library includes integration-test coverage for its client-functionality pertaining to producing to and consuming from Kafka. As such, tests should be focussed on the business logic surrounding streaming. I.e. what event does a message trigger based on its contents, or how is a message produced as the result of an event? What is the expected behaviour, and what cases does your code need to cover?

As usual, you may mock and/or construct objects of the classes surrounding your application code as required in order to test the functionality that you wish to cover. I.e.; remember that you may instantiate records by using Avro-generated classes and provide these to specific parts of your application for testing (e.g. in Kotlin MyKafkaTopicClass(..args)).

You may also mock records for a listener in a wider context, in order to cover the propagation of effects from an event in a wider scope within the application. Or in an E2E-test, an example includes:

import org.springframework.kafka.test.context.EmbeddedKafka
@EmbeddedKafka(
brokerProperties = ["listeners=PLAINTEXT://localhost:9092", "port=9092"],
partitions = 3,
controlledShutdown = true,
topics = ["\${app.kafka.producer.kafkaTopic}"]
)
class MyIntTest {

@Autowired
lateinit var myTestProducer: EnturKafkaProducer<KafkaTopicForApp>

val myAppSubjectUnderTesting = MyAppSubjectUnderTesting(<.. args>)

@Test
fun `should trigger application logic/feature upon consuming kafka message`() {
val message = KafkaTopicForApp(<.. insert args here for message construction>)
myTestProducer.send(appKafkaTopic, key, message, myTestCorrelationId)

verify(exactly = 1) {
myAppSubjectUnderTesting.messageConsumptionEffect(messageConsumedByApp)
}
}
}

Schema Validation in CircleCI builds

A dedicated Kafka user circleci with schema read-only permission exists for each Aiven managed Kafka cluster, that can be used to validate schema compatibility during development stage.

Following environment variables are set in CircleCI global context to be used in your build environment to authenticate towards a Schema Registry

AIVEN_KAFKA_USER
AIVEN_KAFKA_PASSWORD

Here is a code snippet on how to use it in a gradle build file using Confluent schema registry plugin for AVRO schemas

schemaRegistry {
url = "https://entur-kafka-test-int-entur-test.aivencloud.com:11867"
credentials {
username = (project.properties["aiven_kafka_circleci_user"] ?: System.getenv("AIVEN_KAFKA_USER")).toString()
password =
(project.properties["aiven_kafka_circleci_password"] ?: System.getenv("AIVEN_KAFKA_PASSWORD")).toString()
}
compatibility {
subject(
"my-topic-in-staging-value",
"src/main/avro/my-topic-schema.avsc",
"AVRO"
)
}
}

tasks.check {
dependsOn(tasks.testSchemasTask)
}

One can also run the validation task locally (./gradlew testSchemasTask) with right credentials in your gradle.properties file otherwise seek for help in _#talk-data slack channel