Today, I wanted to have a look at Kafka streams for building applications for data transformation and processing. There are two protagonists in this;

  • KStream
  • KTable

At a very high level, both of them have similar functionality as the Duality of Streams and Tables explain. With streams you can do your usual Lambda like functions to filter and transform data. KStreams are all about inserts.

KTable on the other hand resembles closely to a database table. With KTable, when you receive data with the same key, it does upserts as opposed to an insert that KStream does. One other special thing with a KTable is when you get null values it is considered as a delete. When you iterate through the Kafka topic, the delete pointers are basically skipped.

Log compactions is something that resonates closely with KTables. With compaction, what happens is the latest record with the same key is kept while the other records on the topic is deleted. When they are deleted, there will be no reordering on the topic itself, and the offset still remains the same and will skip through the deleted items. One thing to note is that the consumers can still see the deleted items until the period has passed(which is 24 hours by default).

One of the most coolest features I see with Kafka streams is the fact that you can keep internal state on certain operations performed in order to do additional transformations/aggregations when new data arrives. For example, all aggregation and join operations are persisted by Kafka streams. Where are they persisted to you might ask.

To maintain state on stateful operations performed on Kafka streams, an internal topic is created with the topic name and appended to it. So for example, if you created a topic called and performed some aggregation or transformation on top of it, then you would see an internal topic called created by the Kafka stream framework. This is something similar to transaction logs so that you have the ability to recover in the event of a failure. This in essence provides fault-tolerance. Kafka streams also caches data with the use of RocksDB where the key/value pairs are stored in memory. When the memory is insufficient, it writes data to local disk asynchronously(which means that RocksDB cannot be used for fault-tolerance). For more information on internal state stores you can have a read here.

Let us have a look at a sample Kafka Streams application. This example is taken from an Udemy course I followed sometime back . The goal of this application is to maintain a count of favourite colors inputs into a topic while only filtering for red, green and blue. So for example if the input was;


Then the final output needs to be;


needs to be zero because we assign the key to blue as the last input.

The code for this example can be found here.

To achieve this, we create three topics, one for the input, the second to start processing the filtered colors and finally the last topic to output the color and its respective count.

We have our first filter defined as follows;

KStream<String, String> filteredStream = lineStream.filter((key, val) -> val.contains(","))
.selectKey((key, val) -> val.split(",")[0])
.mapValues((val) -> val.split(",")[1].toLowerCase())
.filter((key, val) -> val.matches("green|blue|red"));

I believe the code is self-explanatory so we will not be doing into details here.

The interesting part comes when we move data into a KTable;

KTable<String, Long> colorCountedTable = favColorTable
.groupBy((key, val) -> new KeyValue<>(val, val))

Here, we start our transformation and grouping in order to get a count of the colors. For grouping purposes we create a new which is essentially the color so that a count on the colors can be obtained.

Internally on Kafka Streams, how this works is all the data pertaining to unique keys are stored and maintained on the internal store we discussed before. When a change comes through, for instance, with our example it would be an update to to change this color preference to then what happens is Kafka Streams will retrieve the existing value for the same key to do the grouping and counting.

From here on out, I will be going into some parts of the Kafka Streams codebase. I am by no means an expert on the codebase but wanted to dig around with some debug pointers to see what goes on under the hood.

In the Kafka code, on method, the code is as follows;

private void putAndMaybeForward(DirtyEntry entry, InternalProcessorContext context) {
RecordContext current = context.recordContext();

try {
if (this.flushListener != null) {
this.flushListener.apply(this.serdes.keyFrom(entry.key().get()), this.serdes.valueFrom(entry.newValue()), this.serdes.valueFrom((byte[])this.underlying.get(entry.key())));

this.underlying.put(entry.key(), entry.newValue());
} finally {


Here we get the new value from the entry itself while fetching the previous value using the attribute. For example if you update an existing key with a different value. This is then passed on to which is as follows;

KeyValue<? extends K1, ? extends V1> newPair = change.newValue == null ? null : (KeyValue)KTableRepartitionMap.this.mapper.apply(key, change.newValue);KeyValue<? extends K1, ? extends V1> oldPair = change.oldValue == null ? null : (KeyValue)KTableRepartitionMap.this.mapper.apply(key, change.oldValue);if (oldPair != null && oldPair.key != null && oldPair.value != null) {
this.context().forward(oldPair.key, new Change((Object)null, oldPair.value));
if (newPair != null && newPair.key != null && newPair.value != null) {
this.context().forward(newPair.key, new Change(newPair.value, (Object)null));

What happens here is that the if a new and old pair are found, the method we used in our code above will be triggered twice. This then gets passed on to the count method.

As part of the count process, method is called with the key as the color, and the change value will have the new and old values.

public void process(K key, Change<V> value) {
if (key == null) {
throw new StreamsException("Record key for KTable aggregate operator with state " + KTableAggregate.this.storeName + " should not be null.");
} else {
T oldAgg =;
if (oldAgg == null) {
oldAgg = KTableAggregate.this.initializer.apply();

T newAgg = oldAgg;
if (value.oldValue != null) {
newAgg = KTableAggregate.this.remove.apply(key, value.oldValue, oldAgg);

if (value.newValue != null) {
newAgg = KTableAggregate.this.add.apply(key, value.newValue, newAgg);
}, newAgg);
this.tupleForwarder.maybeForward(key, newAgg, oldAgg);

If the old value is present, a subtraction by one takes place with the current aggregate value and an addition for the new value. So if we look at our example input we looked at before. Note that on the constructor, the first argument is the new value and the second one is the old value.

=> where the first parameter indicates a new value.


On the second call, as we are changing preference from red to blue, there will be two calls to the process method one to subtract from the old color and the other to add one to the new color. This is how the count method works in the end with Kafka Streams.

If you want to run the project on my github, everything is documented on the found in the project.

That is about it. Hope you enjoyed it and if there are any additional details things you would like to share, I would be glad if you can leave a comment as sharing is always caring!

A coding geek, gamer, guitarist and a disciple of Christ Jesus. That would be me in a nutshell!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store