Skip to main content

Kafka client configuration

Common Consumer Properties

Kafka has defaults for most properties. These attempt to guess safe/useful values for most use cases, but it is up to you to judge whether your case requires a more specific configuration. Certain properties, such as protocols, security settings and endpoint URLs are necessarily mandatory.

Note that entur-kafka-spring-starter sets some otherwise mandatory properties for you; see that library’s documentation for details.

Several of the settings below pertain to the Kafka Java libraries specifically. For non-JVM languages, examine that language’s client library for the equivalents.

Mandatory properties

  • bootstrap.servers should be set to the bootstrap URL of the Kafka cluster your application connects to.
  • schema.registry.url should be set to the URL for the schema registry for the Kafka cluster your application connects to.
  • While not strictly mandatory from a technical point of view, it is policy that all topics have a schema associated with their values. This property should thus always be set for producer clients, and is highly recommended for consumers as well.
    • security.protocol must be set to SASL_SSL. We require one-way SSL authentication for our clusters. No plaintext listeners are enabled.
  • sasl.mechanism must be set to SCRAM-SHA-512
  • sasl.jaas.config must be set to org.apache.kafka.common.security.scram.ScramLoginModule required username="\<username\>" password="\<password\>", populated with the username and password for your application's Kafka user.

Mandatory properties on Aiven only

  • basic.auth.credentials.source should be set to USER_INFO. Necessary as Aiven requires auth for all schema registry operations.
  • basic.auth.user.info should be set to username:password, with username and password being the same as from sasl.jaas.config above.

Mandatory consumer-only properties

  • group.id should be set to your application’s consumer group ID. For more on consumer groups and how they are used, see this page.
  • key.deserializer (for JVM clients): Which class to use for deserialization of keys. Commonly org.apache.kafka.common.serialization.StringDeserializer is used, but there are deserializers for more specific types as well as io.confluent.kafka.serializers.KafkaAvroDeserializer for keys with Avro schemas.
  • value.deserializer (for JVM clients): Which class to use for deserialization of values. Should under normal circumstances be io.confluent.kafka.serializers.KafkaAvroDeserializer, although protobuf is also supported through io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer. Schemaless deserializers should only ever be used for legacy cases where migration to a schema type is not feasible.

Mandatory producer-only properties

  • key.serializer (for JVM clients): Which class to use for serialization of keys. Commonly org.apache.kafka.common.serialization.StringSerializer is used, but there are serializers both for more specific types as well as io.confluent.kafka.serializers.KafkaAvroSerializer for keys with Avro schemas.
  • value.serializer (for JVM clients): Which class to use for serialization of values. Should under normal circumstances be io.confluent.kafka.serializers.KafkaAvroSerializer, although protobuf is also supported through io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer. Schemaless serializers should only ever be used for legacy cases where migration to a schema type is not feasible.

Optional properties

Kafka provides a fairly large amounts of configurable properties. This section describes some settings more commonly used in Entur. For a complete listing, see Kafka Consumer Configurations for Confluent Platform | Confluent Documentation and Kafka Producer Configurations for Confluent Platform | Confluent Documentation.

Optional consumer properties

  • specific.avro.reader determines whether to use SpecificRecord or GenericRecord for deserializing Avro messages. SpecificRecord is nearly always preferred in Entur. See "Using Avro Schemas" for details.
  • session.timeout.ms determines the maximum length of time between received heartbeats before consumer is to be considered failed by Kafka, which then leads to rebalancing the consumer group. A too low value may also result in (forcing) rebalancing and rejoining, in turn impacting performance. Setting a too high value will also introduce long delays in failure detection (such as an ungraceful shutdown of consumers), potentially causing lengthy delays before partitions assigned to failed consumers are rebalanced to fresh ones.
  • max.poll.interval.ms sets the maximum interval between polls from the consumer. This is useful for recovering from a consumer getting stuck in some sort of processing failure or loop, even if it is still emitting heartbeats
  • max.poll.records sets the maximum number of records sent by Kafka in each polling response. Use this to balance length of time between calls and latency. Increasing this setting can be useful if processing of records is expensive enough to cause a large batch to exceed the max.poll.interval.ms setting.
  • auto.offset.reset determines behavior if no prior offset has been committed for the consumer group and partition in question. latest will set the starting offset to the latest message, while earliest starts from the lowest offset present on the partition. In short, an offset is a unique id and index for a message (in a partition), i.e. the position for a message, which may be used to look up a message. Offsets are used by consumers in order to consume messages (from the offset position).

Optional producer properties

  • acks determines the number of acknowledgements of a message from separate brokers the producer requires before considering a request complete. This provides a degree of control over the balance between performance and guaranteed delivery. Common settings are 0, 1 or all
  • retries controls the number of times to resend a request that fails with a potentially transient error, such as network failures. Typically one uses either 0 or maxint, in the latter case allowing the delivery.timeout.ms configuration to control retry behavior.
  • enable.idempotence sets whether to guarantee exactly one copy of each message is written. This requires retries to be greater than 0 (otherwise, idempotence is implicitly guaranteed), acks set to all and max.in.flight.requests.per.connection no greater than 5.
  • max.in.flight.requests.per.connection controls the maximum number of unacknowledged requests the client will send before blocking.
  • delivery.timeout.ms controls the amount of time to wait for a call to send() to return before considering the request failed.
  • compression.type sets the compression type to use, if any. Valid values are none, gzip, snappy, lz4 or zstd. This saves both network capacity and disk storage space, especially for repetitive structures like XML or JSON. When using this, it is a good idea to consider the CPU implications for producers and consumers, as well as your batch.size setting. For more on compression, see "Compression".
  • batch.size determines the upper bound of a batch size before it is sent to the cluster, measured in bytes. The producer will wait for at most the linger.ms time before sending a batch if there are fewer bytes than the batch.sizesetting before sending. Note that linger.ms defaults to 0, meaning all messages are sent immediately without regard for the batch.size setting unless linger.ms is also set.