Skip to main content

Kafka client libraries

The majority of Kafka clients in Entur are Spring Boot applications. A Spring Boot Starter with convenient defaults has been provided for this common use case. See the sub-page at “Using entur-kafka-spring-starter” for usage and hints on this.

In-depth documentation beyond the scope of this handbook, including sample code for common client libraries in other languages, may be found at Kafka Clients | Confluent Documentation.

Entur Kafka Spring Starter

Team Data provides a Spring Boot starter with sensible, overridable defaults for Spring Kafka. The starter allows applications in Entur to perform minimal amounts of configuration and code to get started with Kafka, and helps ensure uniform usage of Kafka across teams in Entur.

Details of the library’s usage are described in its README.MD. A minimalistic example application using the library may be found here.

Using the Apache Kafka client library

Include the following dependencies:

repositories {
mavenCentral()
maven { url = uri("https://packages.confluent.io/maven/") }
}

dependencies {
implementation("org.apache.kafka:kafka-clients:3.1.1")
implementation("org.apache.avro:avro:1.11.1")
implementation("io.confluent:kafka-avro-serializer:7.2.1")
}

Creating consumers

Consumers are created by instantiation of KafkaConsumer. The type arguments provided in the KafkaConsumer constructor are the deserialized key and value types respectively. Configuration properties should be supplied according to the mandatory and optional consumer properties listed in Kafka client configuration. Remember to choose an appropriate name for your consumer group.

In the example below, TestEvent is assumed to be a class implementing SpecificRecord.

val consumer = KafkaConsumer<String, TestEvent>( 
mapOf(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG to "bootstrap.kafka-host.io:9095", //replace with other environments as needed
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://schema-registry.kafka-host.io:8001/" //as above
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$kafkaUsername\" password=\"$kafkaPassword\";",
ConsumerConfig.GROUP_ID_CONFIG to myGroupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to "true",
// Following for Aiven only
"basic.auth.credentials.source" to "USER_INFO",
"basic.auth.user.info" to "$kafkaUsername:$kafkaPassword",
//(any optional configuration properties)
}\";"
)
)

New messages are processed by regularly calling the consumer’s poll method. Keep in mind that Kafka is a streaming platform, and as such it is assumed that consumers will be continuously polling for new messages. Failing to issue another poll for too long will lead to the Kafka cluster assuming your consumer is in a fail state and redistribute the consumer’s assigned partitions to other consumers in the group (if any). Following is a simple example of polling, i.e. calling the poll-method:

consumer.poll(Duration.ofSeconds(pollTimeout))
.map {it.value}
.forEach { record -> //do something with the record}

Creating producers

Producers are created in a fashion similar to consumers. Producers are somewhat simpler, as no concept like consumer groups or polling intervals are necessary.

In the example below, TestEvent is assumed to be a class implementing SpecificRecord.

val producer = KafkaProducer<String, TestEvent>(
mapOf(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG to "bootstrap.kafka-service.io:9095", //replace with other environments as needed
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://schema-registry.kafka-service.io:8001/" //as above
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$kafkaUsername\" password=\"$kafkaPassword\";",
// Following for Aiven only
"basic.auth.credentials.source" to "USER_INFO",
"basic.auth.user.info" to "$kafkaUsername:$kafkaPassword",
)
)

Producing new messages is done by passing in a ProducerRecord containing the topic name, key and value for your message to KafkaProducer’s send() method.

producer.send(
ProducerRecord(
MY_TOPIC_NAME,
myKey,
TestEvent(
timestamp = java.time.Instant.now(),
message = "Trains are neat"
)
)
)