Justice League in need of a Kafka Streams application
Dark times for the Justice League with Darkseid’s invasion getting ever close. As Bruce Wayne is the tech guy in this group, he needs to get an application running to get in the new recruits to the Justice League team. Of course we only want certain people we know to join. Last thing we want is the Hawk to join the Justice League(Yikes).
As the Justice League always starts with the end in mind, Bruce aptly creates this image of what they want to be;
Yea not the perfect team, but let us run along with it(As that is like the only image I could find with the word Recruitment on it and I am no photoshop wizard).
The topology of our application is as follows;
The superhero-power-topic
will contain details regarding the superheroes allowed to join the Justice League and their respective powers. jl-reg-input-topic
will be the input for our registration process to join the Justice League.
Finally we have a stream processor that will be processing the input and then registering the member if they are worthy of course. Let us give into the implementation of this Kafka Stream application. Note that you can find the full code here.
Let us look at the code for our Kafka Stream application and then talk more about the implementation details;
GlobalKTable<String, String> superHeroTable = kStreamBuilder.globalTable(AppConstants.SUPERHERO_POWER_TOPIC);
KStream<String, String> superHeroRegistrationStream = kStreamBuilder.stream(AppConstants.JL_REG_INPUT_TOPIC);
KTable<String, Long> filteredStream = superHeroRegistrationStream
.selectKey((key, value) -> value)
.join(superHeroTable, (key, value) -> key,
(registration, superHeroPower) -> superHeroPower)
.flatMapValues(value -> Arrays.asList(value.split(":")))
.selectKey((key, value) -> value).groupByKey().count(Materialized.as("JLPowerCounter"));
filteredStream.toStream().to(AppConstants.JL_FINAL_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
We start off with looking at the superhero-powers-topic
which in our case will be read in as a GlobalKTable
. Why a GlobalKTable
you ask? This is so that we can do a join on the stream without causing a repartitioning in our application. The data is replicated across all the Kafka stream application when it is read in as a GlobalKTable.
Next up, we read the registration details through the jl-reg-input-topic
. For this example, we are not populating a key when we set values on this topic which is why as the first action, we do a selectKey
operation whereby we set the name of the super-hero as the key(Why we do this will be revealed soon).
The join operation we do next is where the selectKey
we did before comes into play. First we need to look at how data is populated in the superhero-powers-topic
and jl-reg-input-topic
to understand how the join works.
On the superheror-powers-topic
we generate data by first starting up a producer with the following command;
./kafka-console-producer.sh --broker-list localhost:9092 --topic superhero-powers-topic \
--property parse.key=true \
--property key.separator=,
We notify the the producer that the key for the stream is comma separated. Now we enter the values as follows(note that all these steps are described on the README
in the GitHub project I mentioned at the start of the article).
We then populate this topic with the data as follows;
batman,wealth:stealth
superman,speed:strength
flash,speed:timetravel
aquaman,strength:talkwithfishies
As you can see, the key in this instance is the name of the super hero. We join this with the input data we get from our input topic which is again the name of the super hero. Using these two, we can do a successful join and retrieve the details.
flatMapValues
is used to break up the super hero powers to their own key-value pairs so that we can finally do the groupByKey
to get a count of each super power of our Justice League. It would be nice to know how many super-heroes we have with a given power to better decide whom to send on a given mission(yea Bruce is smart).
Next up, it is time to start up our beloved Justice League Kafka Streams application and let the registration process being to save planet earth from Darkseid.
Finally to register the super-heroes, we will start another producer as follows and enter in the details. Note that only the known super heroes already in the superheror-powers-topic
will be processed. Before starting off the producer, let us open up a Kafka consumer in order to see the output from our Kafka Streams application;
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jl-final-topic --from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Then we start the producer for the application as follows;
./kafka-console-producer.sh --broker-list localhost:9092 --topic jl-reg-input-topic>batman
>superman
>flash
As we keep entering the details you will see the super hero power counts being output on the consumer you started as follows;
>wealth,1
>stealth,1
>speed,1
>strength,1
>speed,2 (This happens when we add flash as now you have him and super man who has super speed)
That is it. We were able to get a nifty little Justice League registration system up and running thanks to Kafka Streams with as little effort as possible.
You can check out how best Bruce has tested his Kafka Stream application using the TopologyTestDriver
which is a nice way to test your Streams application without having to start a Kafka server.
Planet earth is saved for another day until of course the next villain comes(most of the usually start attacking New York first for no apparent reason but that is a story for another time).
May the force of the Justice League be with you all!