Interacting with Kafka with Kotlin Coroutines

09 November 2023

Updated: 10 November 2023

Overview

The purpose of this post is to illustrate a method of interacting with Kafka using Kotlin in a functional programming style while using Kotlin coroutines for a multi-threading. We will be interacting with the Kafka Client for Java and will be building a small library on top of this for the purpose of simplifying communication and handling tasks like JSON Serialization

If you would like to view the completed source code, you can take a look at the kotlin-kafka GitHub repository

Kafka

According to then Kafka Website:

“Apache Kafka is an open-source distributed event streaming platform:

Generally we can think of Kafka as a platform that enables us to connect data producers to data.

Kafka is an event platform that provides us with a few core functions:

  1. Publishing and subscribing event data
  2. Processing of events in real-time or retrospectively
  3. Storage of event streams

From a more detailed perspective, Kafka internally handles storage of event streams, but we are given control over the means of data production, consumption, and processing via the Kafka API, namely:

  • The Producer API for production
  • The Consumer API for subscription
  • The Streams API for processing stream data

Kotlin

Kotlin is a statically typed programming language built on the Java Virtual Machine that provides interop with Java code

The Code

Config

To get some admin stuff out of the way, before you can really do any of this you will to have a .env file in the project that you can load which contains some application configuration, for the purpose of our application we require the following config in this file - below is some example content

.env

Terminal window
1
BOOTSTRAP_SERVERS=my-server-url:9092
2
SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required username="someUsername" password="somePassword";

Additionally, we have some non-sensitive config in our application.properties file in our application resources folder which contains the following:

resources/application.properties

1
sasl.mechanism=SCRAM-SHA-256
2
security.protocol=SASL_SSL
3
key.serializer=org.apache.kafka.common.serialization.StringSerializer
4
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
5
value.serializer=org.apache.kafka.common.serialization.StringSerializer
6
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
7
auto.offset.reset=earliest
8
group.id=$GROUP_NAME
9
application.id=example-app

Next, we need to load this in our application to create a Properties object along with all the other application config we require. We can create the Properties object using the application.properties and .env files as follows:

App.kt

1
package example
2
3
import io.github.cdimascio.dotenv.Dotenv
4
import java.io.FileInputStream
5
import java.util.*
6
7
fun loadProperties(): Properties {
8
val props = Properties()
9
val resource = ClassLoader.getSystemResource("application.properties")
10
println("File path: ${resource.path}")
11
FileInputStream(resource.path).use { stream ->
12
props.load(stream)
13
}
14
15
val dotenv = Dotenv.load()
16
props["bootstrap.servers"] = dotenv["BOOTSTRAP_SERVERS"]
17
props["sasl.jaas.config"] = dotenv["SASL_JAAS_CONFIG"]
18
19
return props
20
}

The above example uses the io.github.cdimascio:dotenv-java:3.0.0 package for loading the environment variables and some builtin Java utilities for loading the application properties file

Next, for the purpose of using it with our library we will create a Config class that wraps the properties file we defined so that we can use this a little more elegantly in our consumers. Realistically we probably should do some validation on the resulting Properties that we load in but we’ll just keep it simple and define Config as a class that contains the properties as a property:

Config.kt

1
package za.co.nabeelvalley.kafka
2
3
import java.util.Properties
4
5
open class Config(internal val properties: Properties) {}

Working with JSON Data

An important part of what we want our client to handle is the JSON serialization and deserialization when sending data to Kafka. Sending JSON data is not a requirement of Kafka as a platform, but it’s the usecase that we’re building our library around and so is something we need to consider

Serialization

Serialization in this context refers to the process of converting our Kotlin classes into a string and back to a Kotlin class. For this discussion we will refer to a class that is able to do this bidirectional conversion as a Serializer.

We can define generic representation of a serializer as a class that contains a method callsed serialize that takes in data of type T and returns a string, and contains a method called deserialize that takes in a string and returns an object of type T

Not that at this point we’re not considering that the serializer needs to return JSON. In our context a JSON serializer is just a specific implementation of the serialization concept that we have defined

An interface that describes the Serializer we mentioned above can be seen as follows:

Serializer.kt

1
package za.co.nabeelvalley.kafka
2
3
interface ISerializer<T : Any> {
4
fun serialize(data: T): String
5
fun deserialize(data: String): T
6
}

JSON Serialization

Given the definition of a serializer we can define a JSON serializer that uses the kotlinx.serialization library and implements our ISerializer as follows:

JsonSerializer.kt

1
package za.co.nabeelvalley.kafka
2
3
import kotlinx.serialization.KSerializer
4
import kotlinx.serialization.json.Json
5
import kotlinx.serialization.serializer
6
import kotlin.reflect.KClass
7
8
class JsonSerializer<T : Any>(type: KClass<T>) : ISerializer<T> {
9
private val serializer: KSerializer<T> = serializer(type.java) as KSerializer<T>
10
11
override fun serialize(data: T): String = Json.encodeToString(serializer, data)
12
13
override fun deserialize(data: String): T = Json.decodeFromString(serializer, data)
14
}

The above code is a little funky since we’re using reflection on the actual class of the input data to define our serializer, other than we’re just using the kotlinx serializer to handle the data transformation. The thing that matters in this context is that we are able abstract the reflection aspect of the serializer, this will help make the final interface we provide to the user for working with Kafka simpler

Serde Serializer

Now that we have defined a simple representation of a serializer that provides some interop with the Kotlin data types, we need to implement the other side of this which is a SerdeSerializer which is what the Kafka Clients need to work with. The requirements of this serializer are a little different to the one we defined above. This serializer needs to:

  1. Have a separate Serializer and Deserializer interfaces that need to be implemented
  2. Return a ByteArray instead of String

We can define these serializers such that they can be constructed from and ISerializer interface that we defined previously. This will make it possible for consumers of our library to swap our their serialization strategy to enable other usecases than the simple JSON communication we are considering

As mentioned above, we need to implement a separate Serializer and Deserializer respecively as:

SerdeSerializer.kt

1
package za.co.nabeelvalley.kafka
2
3
import org.apache.kafka.common.serialization.Serializer
4
5
class SerdeSerializer<T : Any>(private val serializer: ISerializer<T>) : Serializer<T> {
6
override fun serialize(topic: String?, data: T): ByteArray {
7
val result = serializer.serialize(data)
8
return result.toByteArray()
9
}
10
}

And

SerdeDeserializer.kt

1
package za.co.nabeelvalley.kafka
2
3
import org.apache.kafka.common.serialization.Deserializer
4
5
class SerdeDeserializer<T : Any>(private val serializer: ISerializer<T>) : Deserializer<T?> {
6
override fun deserialize(topic: String?, data: ByteArray?): T? {
7
try {
8
val string = String(data!!)
9
return serializer.deserialize(string)
10
} catch (error: Error) {
11
println("Error Deserializing Data: $error")
12
return null
13
}
14
}
15
}

Our implementation is a little basic and will just ignore any data that we can’t serialize, however depending on our usecase we may need to handle this differently

Lastly, we define the actual Serde Serializer implementation using the above implementations:

Serializer.kt

1
package za.co.nabeelvalley.kafka
2
3
import org.apache.kafka.common.serialization.Serde
4
5
class Serializer<T : Any>(private val serializer: ISerializer<T>) : Serde<T> {
6
override fun serializer() = SerdeSerializer<T>(serializer)
7
8
override fun deserializer() = SerdeDeserializer<T>(serializer)
9
}

As far as serialization and deserialization goes, this should be everything we need for working with JSON data

Producing Data

Producing data is a method by which a client sends data to a Kafka topic. We can define this as a type as follows:

Producer.kt

1
typealias Send<T> = (topic: String, message: T) -> Unit

Now, to provide a functional library interface we will want to provider application code a space in which they will be able to work with the producer that we populate without needing to create a new producer for each message we want to send

We’ll codify this intent as a type as follows:

Producer.kt

1
typealias Produce<T> = suspend (send: Send<T>) -> Unit

Note that we define this as a suspend function that will enable users to send messages from within a coroutine context

Next, we define the type of our producer as method with a way to create a producer instance for users who may want to manage the lifecycle of the KafkaProducer on their own. This however also means they lose access to the automatic serialization and deserialization that we will provide via our producer method

This interface is defined as follows:

Producer.kt

1
package za.co.nabeelvalley.kafka
2
3
import org.apache.kafka.clients.producer.KafkaProducer
4
import java.util.*
5
6
interface IProducer<T> {
7
/**
8
* Returns the raw producer used by Kafka
9
*/
10
fun createProducer(): KafkaProducer<String, String>
11
fun produce(callback: Produce<T>)
12
}

For the purpose of our implementation we can define some functions ourside of our class that will provde the implementation we require

For the createProducer function, we simply provide a wrapper around the KafkaProducer provided to us by the Java Kafka Client Library:

Producer.kt

1
package za.co.nabeelvalley.kafka
2
3
import org.apache.kafka.clients.producer.KafkaProducer
4
import org.apache.kafka.clients.producer.ProducerRecord
5
import java.util.*
6
import kotlinx.coroutines.runBlocking
7
8
fun createProducer(properties: Properties) =
9
KafkaProducer<String, String>(properties)

For the sake of consistency, we will do the same for the concept of a ProducerRecord which will be used by the produce function:

Producer.kt

1
fun createRecord(topic: String, message: String) =
2
ProducerRecord<String, String>(topic, message)

Next, the produce function can be defined. The role of this function is to handle serialization of data and provide a means for a user to send data to a topic

The producer will take a callback which is the context in which any usage of the send function should be used before the producer is disposed:

Producer.kt

1
fun <T : Any> produce(properties: Properties, serializer: ISerializer<T>, callback: Produce<T>) {
2
createProducer(properties).use { producer ->
3
val send = fun(topic: String, message: T) {
4
val payload = serializer.serialize(message)
5
val record = createRecord(topic, payload)
6
producer.send(record)
7
}
8
9
runBlocking {
10
callback(send)
11
}
12
}
13
}

We have also added the properties and serializer values as an input to the producer as this is needed by Kafka, lastly, we will define our actual Producer implementation which builds on the functions we defined above

Note that our Producer class implements IProducer and extends Config, this is because we use the Config class as the source of truth of the configuration to be used for our Kafka instance and we want to able to access this config

Producer.kt

1
class Producer<T : Any>(
2
properties: Properties,
3
private val serializer: ISerializer<T>
4
) : Config(properties),
5
IProducer<T> {
6
override fun createProducer() = createProducer(properties)
7
8
override fun produce(callback: Produce<T>) = produce<T>(properties, serializer, callback)
9
}

At this point we have a complete implementation of a producer

Using the Producer

In our application code we can instantiate and use the producer as follows:

Firstly, we need to define the type of data we are goind to send with the @Serializable annotation

App.kt

1
import kotlinx.coroutines.*
2
import kotlinx.serialization.Serializable
3
import za.co.nabeelvalley.kafka.*
4
import java.util.*
5
6
7
@Serializable
8
data class ProducerData(val message: String, val key: Int)

Next, we can define a function for producing data, this will require the properties we loaded previously:

App.kt

1
fun instantiateAndProduce(properties: Properties): Unit {
2
val serializer = JsonSerializer(ProducerData::class)
3
val producer = Producer(properties, serializer)
4
5
runBlocking {
6
producer.produce { send ->
7
val data = ProducerData("Hello world", 1)
8
send("my-topic", data)
9
}
10
}
11
}

We use runBlocking since our producer needs a coroutine scope in which to send data. Sending data us used within the produce method in which we create some data and call the send method provide by the produce function

An interesting to note is that we are passing the class of our data to the serializer to create an instance - this is the usage of the funky reflection thing we saw previously

Consuming Data

Our code for consuming data will follow a similar pattern to what we use to consume the data in the previous section

For consuming data, Kafka relies on the concept of polling for records from the part of the consumer, for our client, we will expose using the following type which defines a poll as a method that takes nothing and returns a list of data of type T

Consumer.kt

1
typealias Poll<T> = () -> List<T>

Next, we can define the type that defines how we want our data to be consumed. For our sake, this is a suspend function that will receive a poll method that it can call to get data

Consumer.kt

1
typealias Consume<T> = suspend (poll: Poll<T>) -> Unit

Next, as before, we can define an interface for a Consumer in which we have a method to create a KafkaConsumer and a method for actually consuming the data. In the case of consuming we need a list of topics to read from as well as the polling frequency duration.

Consumer.kt

1
package za.co.nabeelvalley.kafka
2
3
import kotlinx.coroutines.runBlocking
4
import org.apache.kafka.clients.consumer.ConsumerRecord
5
import org.apache.kafka.clients.consumer.KafkaConsumer
6
import java.time.Duration
7
import java.util.*
8
9
interface IConsumer<T> {
10
/**
11
* Returns the raw consumer used by Kafka
12
*/
13
fun createConsumer(): KafkaConsumer<String, String>
14
fun consume(topics: List<String>, duration: Long, callback: Consume<T>)
15
}

Next, we can define our createConsumer method quite simply as:

Consumer.kt

1
fun createConsumer(properties: Properties) =
2
KafkaConsumer<String, String>(properties)

And we can define our consume method such that it takes in the properties and serializer as with the producer, but will also take som patterns to be used for subscribing to and the duration above, and finally the callback Consume function:

Consumer.kt

1
fun <T : Any> consume(
2
properties: Properties,
3
serializer: ISerializer<T>,
4
patterns: List<String>,
5
duration: Long,
6
callback: Consume<T>
7
) {
8
createConsumer(properties).use { consumer ->
9
consumer.subscribe(patterns)
10
val poll = fun(): List<T> {
11
val records = consumer.poll(Duration.ofMillis(duration))
12
val data = records.toList()
13
.map(ConsumerRecord<String, String>::value)
14
.map(serializer::deserialize)
15
16
return data
17
}
18
19
runBlocking {
20
callback(poll)
21
}
22
}
23
}

The consume function is very similar to the produce function we defined previously, however now instead of being provided a function to send data we now have a function that will return that data

Lastly, we can finish off the definition of our Consumer using what we have above:

Consumer.kt

1
class Consumer<T : Any>(
2
properties: Properties,
3
private val serializer: ISerializer<T>
4
) : Config(properties), IConsumer<T> {
5
override fun createConsumer() = createConsumer(properties)
6
7
override fun consume(topics: List<String>, duration: Long, callback: Consume<T>) =
8
consume(properties, serializer, topics, duration, callback)
9
}

Using the Consumer

Using the Consumer follows a very similar pattern to the producer, however we need to create a loop that will poll for data and handle as necessary when data is received:

App.kt

1
@Serializable
2
data class ConsumerData(val message: String, val key: Int)
3
4
fun instantiateAndConsume(properties: Properties): Unit {
5
val serializer = JsonSerializer(ConsumerData::class)
6
val consumer = Consumer(properties, serializer)
7
8
runBlocking {
9
consumer.consume(listOf("my-topic"), 1000) { poll ->
10
while (true) {
11
val messages = poll()
12
println("Received ${messages.size} messages")
13
messages.forEach(fun(message) {
14
println("Received: $message")
15
})
16
}
17
}
18
}
19
}

In the above, we use a while(true) loop to re-poll continuously but this can freely change on the implementation, similar to with the producer code

Stream Processing

In Kafka, we can think of a stream process as a combination of a consumer and producer such that data comes in from a topic and is sent to a different topic

The thing that makes streams interesting is the builder API that the Kafka Java Library provides to us for defining the operations to be done on the stream data. For our implementation we’ll be referring to this as a TransformProcessor, this processor needs to take in some data of type TConsume and return data of type TProduce, however, since we want to provide users complete flexibility in working with this data, we will instead more generally allow a user to convert a stream between the predefined data types, using the underlying library this is called a KStream

From a type perspective, we can define a TransformProcessor as follows:

SerializedStream.kt

1
typealias TransformProcessor<TConsume, TProduce> = (stream: KStream<String, TConsume>) -> KStream<String, TProduce>

Now, we’re going to be starting this implementation from what we want, assuming that it is possible for us to in some way define a KStream that is instantiated to work with our connection and the respective TConsume and TProduce data.

We will also be using a type called Produced which is what the Kafka Client uses to represent the data that the stream will return since this is what we need in order to send data to a processor

Our implementation will be called a SerializedStream and this looks like the following:

SerializedStream.kt

1
package za.co.nabeelvalley.kafka
2
3
import kotlinx.coroutines.CoroutineScope
4
import kotlinx.coroutines.Dispatchers
5
import kotlinx.coroutines.Job
6
import kotlinx.coroutines.launch
7
import org.apache.kafka.streams.KafkaStreams
8
import org.apache.kafka.streams.StreamsBuilder
9
import org.apache.kafka.streams.kstream.KStream
10
import org.apache.kafka.streams.kstream.Produced
11
import java.util.*
12
13
typealias TransformProcessor<TConsume, TProduce> = (stream: KStream<String, TConsume>) -> KStream<String, TProduce>
14
typealias Close = () -> Unit
15
typealias Process = suspend (close: Close) -> Unit
16
17
class SerializedStream<TConsume : Any, TProduce : Any>(
18
private val properties: Properties,
19
private val builder: StreamsBuilder,
20
private val producer: Produced<String, TProduce>,
21
private val stream: KStream<String, TConsume>
22
) {
23
fun startStreaming(
24
topic: String,
25
processor: KStream<String, TProduce>,
26
process: Process
27
): Job {
28
processor.to(topic, producer)
29
30
val streams = KafkaStreams(
31
builder.build(),
32
properties
33
)
34
35
val scope = CoroutineScope(Dispatchers.IO)
36
return scope.launch {
37
streams.start()
38
process()
39
streams.close()
40
}
41
}
42
43
fun getProcessor(
44
processor: TransformProcessor<TConsume, TProduce>
45
): KStream<String, TProduce> = processor(stream)
46
}

In the above implementation we have an input to our startStreaming function called process, the process function is a callback that needs to call close once it is done running. When the process function returns the processing will stop, the scope of this function also defines lifecycle of the stream processor and is used for that purpose

So we have defined the processing methodology using a KStream but have not provided a way to create a KStream. Since the stream can be defined in many different ways, we can define this using a builder class called StreamBuilder. This class will be instantiated with the Kafka connection properties and input/output serializers, thereafter it can produce methods for instantiating the SerializedStream instance that we can use for data processing

For the sake of our example we will provide a method called fromTopic which returns a SerializedStream that is configured to work on a single topic, and a fromTopics method which will return a SerializedStream that listens to multiple topics:

StreamBuilder.kt

1
package za.co.nabeelvalley.kafka
2
3
import org.apache.kafka.common.serialization.Serdes
4
import org.apache.kafka.streams.StreamsBuilder
5
import org.apache.kafka.streams.kstream.Consumed
6
import org.apache.kafka.streams.kstream.Produced
7
import java.util.*
8
9
interface IStreamBuilder<TConsume : Any, TProduce : Any> {
10
fun fromTopic(topic: String): SerializedStream<TConsume, TProduce>
11
fun fromTopics(topics: List<String>): SerializedStream<TConsume, TProduce>
12
}

An implementation of this interface is as follows:

StreamBuilder.kt

1
class StreamBuilder<TConsume : Any, TProduce : Any>(
2
properties: Properties,
3
consumeSerializer: ISerializer<TConsume>,
4
producerSerializer: ISerializer<TProduce>,
5
) : Config(properties), IStreamBuilder<TConsume, TProduce> {
6
private val inputSerde = Serializer<TConsume>(consumeSerializer)
7
private val consumed = Consumed.with(Serdes.String(), inputSerde)
8
9
private val outputSerde = Serializer<TProduce>(producerSerializer)
10
private val produced = Produced.with(Serdes.String(), outputSerde)
11
12
override fun fromTopic(topic: String): SerializedStream<TConsume, TProduce> {
13
val builder = StreamsBuilder()
14
val stream = builder.stream(mutableListOf(topic), consumed)
15
16
return SerializedStream(properties, builder, produced, stream)
17
}
18
19
override fun fromTopics(topics: List<String>): SerializedStream<TConsume, TProduce> {
20
val builder = StreamsBuilder()
21
val stream = builder.stream(topics.toMutableList(), consumed)
22
23
return SerializedStream(properties, builder, produced, stream)
24
}
25
}

The above class makes use of the produced and consumed properties which are what Kafka will use for serializing and deserializing data in the stream

And that’s about it as far as our implementation for streaming goes

Using the Stream Processor

We can use the stream processor code:

App.kt

1
fun initializeAndProcess(properties: Properties): Job {
2
val producedSerializer = JsonSerializer(ProducerData::class)
3
val consumedSerializer = JsonSerializer(ConsumerData::class)
4
val streamBuilder = StreamBuilder(properties, consumedSerializer, producedSerializer)
5
val stream = streamBuilder.fromTopic("input-topic")
6
7
val processor = stream.getProcessor { kStream ->
8
kStream.mapValues { key, value ->
9
ProducerData("Message processed: $key", value.key)
10
}
11
}
12
13
val scope = CoroutineScope(Dispatchers.IO)
14
return scope.launch {
15
stream.startStreaming("output-topic", processor) { close ->
16
coroutineScope {
17
println("Processor starting")
18
// Non-blocking loop as long as the coroutine is active
19
while (isActive) {
20
delay(10_000)
21
}
22
23
// close when no longer active
24
close()
25
println("Processor closed")
26
}
27
}
28
}
29
}

Most of this is just the normal construction that you will have for any instance of the stream client, what is interesting is the part where we define the processor:

App.kt

1
val processor = stream.getProcessor { kStream ->
2
kStream.mapValues { key, value ->
3
ProducerData("Message processed: $key", value.key)
4
}
5
}

In the above example we are simply mapping a single record using mapValues, this is very similar to the Collection methods available in Kotlin but is instead used to define how data will be transformed in the stream

The processor we define is what will be executed on records or groups of records depending on how we want to handle the resulting data

Conclusion

In this post we’ve covered the basic implementation of how we can interact with Kafka using the Kotlin programming language and built a small library that takes us through the basic use cases of Serializing, Producing, Consuming, and Processing stream data

References