This is the third in a series of articles on our “Home Office Challenges App (HOCA)”. It describes several advanced features of Apache Kafka as well as some challenges we faced with the Kafka platform.
Offset / Consumer Groups
In the last article, we already heard about the way that Kafka assigns records to topics. Now we will look in more detail at how this is accomplished.
Each record stored in a topic gets a numerical offset, which acts as a unique identifier and also describes the position of a consumer. For example, a consumer at position 7 has consumed the records from offset 0 to 6 and will receive the record with offset 7 next.
To handle the load balancing when processing records, multiple consumers can be part of one consumer group. Kafka delivers each event of the subscribed topics to one consumer of a consumer group. To make this work, a topic must be stored in multiple partitions, which can be configured on topic creation. Each partition is then assigned to exactly one consumer, but a consumer can consume from multiple partitions. So the parallelism is limited by the number of partitions a topic consists of.
Transaction Security
One common challenge of messaging systems is to ensure that a message from the producer always reaches the consumer. There are several strategies, called message semantics, to accomplish this goal. Developers of an application can choose between the following message semantics:
- at-most-once
- at-least-once
- exactly-once
At-most-once means that a message is only sent once. If it does not reach the consumer due to a system error, the message is lost permanently.
If we use at-least-once, messages are sent multiple times. This ensures that those messages are received even when there is a system error. However, this strategy can easily lead to duplication.
If we want to ensure a consistent message delivery, lost messages and duplicates must be avoided, which means that the previous two message semantics are unsuitable.
The last semantics, exactly-once, guarantees that the message is transmitted consistently and without duplicates. This is why Apache Kafka uses the exactly-once message semantics for its Transaction API.
Kafka as a Database
As we have already heard, Kafka events can contain data in many forms, both simple or very complex. Kafka will also store that data indefinitely, unless we tell it to do otherwise. Moreover, there is even a (limited) query functionality. There are also several data management tools available for Kafka which allow users to view events, topics, producers etc.
So, Kafka is not just similar to regular databases, it can actually be classified as such. For uses cases which involve event logs, it might therefore be unnecessary to add another database for storage of these logs, when Kafka alone is sufficient.
Since the core data of our Home Office Challenges App can be modelled as events, and does not require any mutability, we decided to use Apache Kafka as our only database. For this specific use case, we found it to be a reliable, stable, and highly performant means to store and retrieve data.
Custom Serdes
To produce and consume records in an application you need Serializers and Deserializers (SerDes) to convert an object into a stream of bytes or the other way round. When using Spring, it is easy to configure this by using a default Serializer (e.g. String, Integer, Long..) or by using a JsonSerializer:
new DefaultKafkaConsumerFactory<>(
consumerConfigsJson(),
new StringDeserializer(),
new JsonDeserializer<>(UserChallengeRelation.class)
);
KafkaConsumerConfig.java
When coding with Kafka Streams and plain Java, it gets a bit more complicated. You can either register default Serializers via properties or configure them when consuming from or writing to a topic. Therefore, we first implement the Serializer and Deserializer interfaces in the package org.apache.kafka.common.serialization. With these implementations, we can now prepare the Serdes for our application:
public final class CustomSerdes { <code>static public final class ActivitySerde extends
Serdes.WrapperSerde<Activity> {
public ActivitySerde() {
super(new JsonSerializer<>(),
new JsonDeserializer<>(Activity.class));
}
}
public static Serde<Activity> Activity() {
return new CustomSerdes.ActivitySerde();
}
}
CustomSerdes.java
When the preparations are done, we can use our CustomSerdes in the StreamsApplication to consume, group or write to topics:
final KStream<String, Activity> input = builder.stream("activities", Consumed.<em>with</em>(
Serdes.<em>String</em>(), /* key serde */
CustomSerdes.<em>Activity</em>() /* value serde */
));
Consume
.groupByKey(Grouped.<em>with</em>(
Serdes.<em>String</em>(),
CustomSerdes.<em>ActivityStatistics</em>()
))
Group
.to("activity-stats", Produced.<em>with</em>(
Serdes.<em>String</em>(),
CustomSerdes.<em>ActivityStatistics</em>()
));
Produce
Serialization Issues
While Kafka can usually serialize and deserialize objects quite well, we have encountered a couple of problems with that functionality.
If an object contains an attribute of the type LocalDateTime, the (de)serialization might not work. In this case, Jackson’s LocalDateTime(De)Serializer annotation has to be added.
While this will fix some (de)serialization problems, it is still a good idea to prevent any such issues from crashing the application. We can achieve this goal by creating a custom ErrorHandler and adding it to each KafkaListenerContainerFactory.
Kafka Streams
Kafka Streams is there to simplify application development by building on the Apache Kafka Producer and Consumer APIs. By leveraging the native capabilities from Kafka, it can offer distribution coordination, fault tolerance, operational simplicity and data parallelism.
The messaging part of Kafka will partition data for the transportation and storage of it. For data processing, Kafka Streams also partitions data, which allows for data locality, scalability, better performance, and fault tolerance.
Each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition. A data record maps to a Kafka message from the topic. Keys will then determine the partitioning of data.
With Kafka Streams the user has the ability to configure the number of threads that the library can use for processing the stream and its tasks within an application instance.
In the next article, we will introduce Kubernetes and describe how we used it in combination with Kafka during the development of our “Home Office Challenges App”.
Stay tuned!