Kafka Streams FTW
Today, I wanted to have a look at Kafka streams for building applications for data transformation and processing. There are two protagonists in this;
- KStream
- KTable
At a very high level, both of them have similar functionality as the Duality of Streams and Tables explain. With streams you can do your usual Lambda like functions to filter and transform data. KStreams are all about inserts.
KTable on the other hand resembles closely to a database table. With KTable, when you receive data with the same key, it does upserts as opposed to an insert that KStream does. One other special thing with a KTable is when you get null values it is considered as a delete. When you iterate through the Kafka topic, the delete pointers are basically skipped.
Log compactions is something that resonates closely with KTables. With compaction, what happens is the latest record with the same key is kept while the other records on the topic is deleted. When they are deleted, there will be no reordering on the topic itself, and the offset still remains the same and will skip through the deleted items. One thing to note is that the consumers can still see the deleted items until the delete.retention.ms
period has passed(which is 24 hours by default).
One of the most coolest features I see with Kafka streams is the fact that you can keep internal state on certain operations performed in order to do additional transformations/aggregations when new data arrives. For example, all aggregation and join operations are persisted by Kafka streams. Where are they persisted to you might ask.
To maintain state on stateful operations performed on Kafka streams, an internal topic is created with the topic name and -changelog
appended to it. So for example, if you created a topic called my-awesome-topic
and performed some aggregation or transformation on top of it, then you would see an internal topic called my-awesome-topic-changelog
created by the Kafka stream framework. This is something similar to transaction logs so that you have the ability to recover in the event of a failure. This in essence provides fault-tolerance. Kafka streams also caches data with the use of RocksDB where the key/value pairs are stored in memory. When the memory is insufficient, it writes data to local disk asynchronously(which means that RocksDB cannot be used for fault-tolerance). For more information on internal state stores you can have a read here.
Let us have a look at a sample Kafka Streams application. This example is taken from an Udemy course I followed sometime back . The goal of this application is to maintain a count of favourite colors inputs into a topic while only filtering for red, green and blue. So for example if the input was;
barry,red
clark,blue
hal,green
barry,blue
Then the final output needs to be;
red,0
green,1
blue,2
red
needs to be zero because we assign the key barry
to blue as the last input.
The code for this example can be found here.
To achieve this, we create three topics, one for the input, the second to start processing the filtered colors and finally the last topic to output the color and its respective count.
We have our first filter defined as follows;
KStream<String, String> filteredStream = lineStream.filter((key, val) -> val.contains(","))
.selectKey((key, val) -> val.split(",")[0])
.mapValues((val) -> val.split(",")[1].toLowerCase())
.filter((key, val) -> val.matches("green|blue|red"));
I believe the code is self-explanatory so we will not be doing into details here.
The interesting part comes when we move data into a KTable;
KTable<String, Long> colorCountedTable = favColorTable
.groupBy((key, val) -> new KeyValue<>(val, val))
.count("ColorCount");
Here, we start our transformation and grouping in order to get a count of the colors. For grouping purposes we create a new KeyValue
which is essentially the color so that a count on the colors can be obtained.
Internally on Kafka Streams, how this works is all the data pertaining to unique keys are stored and maintained on the internal store we discussed before. When a change comes through, for instance, with our example it would be an update to barry
to change this color preference to blue
then what happens is Kafka Streams will retrieve the existing value for the same key to do the grouping and counting.
From here on out, I will be going into some parts of the Kafka Streams codebase. I am by no means an expert on the codebase but wanted to dig around with some debug pointers to see what goes on under the hood.
In the Kafka code, on CachingKeyValueStore-> putAndMaybeForward()
method, the code is as follows;
private void putAndMaybeForward(DirtyEntry entry, InternalProcessorContext context) {
RecordContext current = context.recordContext();
try {
context.setRecordContext(entry.recordContext());
if (this.flushListener != null) {
this.flushListener.apply(this.serdes.keyFrom(entry.key().get()), this.serdes.valueFrom(entry.newValue()), this.serdes.valueFrom((byte[])this.underlying.get(entry.key())));
}
this.underlying.put(entry.key(), entry.newValue());
} finally {
context.setRecordContext(current);
}
}
Here we get the new value from the entry itself while fetching the previous value using the underlying
attribute. For example if you update an existing key with a different value. This is then passed on to KTableRepartitionMap->KTableMapProcessor->process()
which is as follows;
KeyValue<? extends K1, ? extends V1> newPair = change.newValue == null ? null : (KeyValue)KTableRepartitionMap.this.mapper.apply(key, change.newValue);KeyValue<? extends K1, ? extends V1> oldPair = change.oldValue == null ? null : (KeyValue)KTableRepartitionMap.this.mapper.apply(key, change.oldValue);if (oldPair != null && oldPair.key != null && oldPair.value != null) {
this.context().forward(oldPair.key, new Change((Object)null, oldPair.value));
}if (newPair != null && newPair.key != null && newPair.value != null) {
this.context().forward(newPair.key, new Change(newPair.value, (Object)null));
}
What happens here is that the if a new and old pair are found, the groupBy
method we used in our code above will be triggered twice. This then gets passed on to the count method.
As part of the count process, KTableAggregatorProcessor->process()
method is called with the key as the color, and the change value will have the new and old values.
public void process(K key, Change<V> value) {
if (key == null) {
throw new StreamsException("Record key for KTable aggregate operator with state " + KTableAggregate.this.storeName + " should not be null.");
} else {
T oldAgg = this.store.get(key);
if (oldAgg == null) {
oldAgg = KTableAggregate.this.initializer.apply();
}
T newAgg = oldAgg;
if (value.oldValue != null) {
newAgg = KTableAggregate.this.remove.apply(key, value.oldValue, oldAgg);
}
if (value.newValue != null) {
newAgg = KTableAggregate.this.add.apply(key, value.newValue, newAgg);
}
this.store.put(key, newAgg);
this.tupleForwarder.maybeForward(key, newAgg, oldAgg);
}
}
If the old value is present, a subtraction by one takes place with the current aggregate value and an addition for the new value. So if we look at our example input we looked at before. Note that on the ChangeValue
constructor, the first argument is the new value and the second one is the old value.
barry,red
=> process(red,ChangeValue(red,null))
where the first parameter indicates a new value.
barry,blue
=> process(red,ChangeValue(null,red)) & process(blue, ChangeValue(blue,null))
On the second call, as we are changing barry's
preference from red to blue, there will be two calls to the process method one to subtract from the old color and the other to add one to the new color. This is how the count method works in the end with Kafka Streams.
If you want to run the project on my github, everything is documented on the README.md
found in the project.
That is about it. Hope you enjoyed it and if there are any additional details things you would like to share, I would be glad if you can leave a comment as sharing is always caring!