Testing Kafka Applications —Libraries for Unit and Integration Tests | by Ivelina Yordanova | Apr, 2022

For KafkaConsumer, KafkaProducer, Streams and Spring-Kafka

Picture by Siora Photography on Unsplash

You’re given a job that includes working with Kafka or you might be designing a completely new software? It’s a very good observe to have testing in thoughts, if not even attempt to use TDD (test-driven improvement). My objective right here is to not provide you with check examples, there are lots of out there on-line and most are simply the skeleton of a check anyway, what I discover normally extremely helpful is summarised info in a single place, lists, and bullet factors (that’s my jam). What’s extra, when you’ve got just lately began working with Kafka you won’t even pay attention to what’s out there on the market.

So, this right here could be a listing of the lessons and libraries that you should use on your unit and integration assessments relying on what libraries and lessons you’ve got already selected utilizing in your foremost implementation.

To unit check your easy client you should use the MockConsumer offered by the org.apache.kafka:kafka-clients:X.X.X library. It’s very straightforward and intuitive to arrange all of your assessments. The one factor you want to remember when designing your software (or refactor later) is to make your lessons utilizing or wrapping the KafkaConsumer occasion testable. This implies having them count on a Shopper<Ok, V> to be handed in whether or not via calling a manufacturing unit technique, instantly within the constructor, or injected as a bean.

To create an occasion of the MockConsumer it is advisable to simply match the kind of the important thing and worth of your data and “inform” it the place to start out studying:

val mockConsumer = MockConsumer<String, String>(OffsetResetStrategy.LATEST)

Each KafkaConsumer and MockConsumer implement the Shopper<Ok,V> interface so when you move that into the category you’re testing, it’ll work together with it like an actual client. What’s totally different right here is that there are further strategies you should use to organize the circumstances on your assessments:

  • addRecord() to your mock to basically put together Information to be learn earlier than you begin your check
  • schedulePollTask() the place you’ll be able to move any Runnable to be executed on the next ballot
  • setPollException() to examine your client’s behaviour on exception

Since that is solely meant for unit testing, what you check right here is principally whether or not your actual client:

  • can deserialize the data it’s meant to learn
  • processes the data as anticipated — ideally there’s division of tasks , so “processing” right here would possibly simply imply passing anticipated values to a different class that’s the one coping with the enterprise logic and that class is unit-tested individually
  • filters any data (if that’s required)
  • handles errors in deserialization or processing
  • handles errors with the connection to Kafka — on subscribing, polling or committing
  • manipulates the offsets appropriately — whether or not, when and the way usually it commits

Be aware: In case your client is manipulating the offsets in a non-standard manner, then you’ll be able to preserve utilizing the identical occasion between all of your unit assessments however updateBeginningOffsets() and updateEndOffsets().

The kafka-clients library contains additionally a MockProducer class which implements the identical interface Producer<Ok, V>because the KafkaProducer . So, just like the buyer testing above, your manufacturing lessons needs to be designed in a manner that may mean you can move the mock in.

To create an occasion of the MockProducer it is advisable to match the kind of the important thing and worth of your data and “inform” it whether or not to robotically full the shiprequests efficiently (autocomplete=true) otherwise you wish to explicitly full them by calling completeNext() or errorNext() .

val mockProduer = MockProducer(true, StringSerializer(), StringSerializer())

To check the conventional circulation of labor of your producer you’ll wish to line up some work on your producer to do, use autocomplete after which examine what has been despatched utilizing the historical past() technique on the MockProducer. It will return a listing of all ProducerRecord-s despatched for the reason that final time you known asclear() on the mock.

To check the way you deal with exceptions you’ll be able to set autocomplete to false and errorNext() which can throw any RuntimeException you move in it on the following incomplete ship name.

Right here, additionally, the testing capabilities are restricted to unit testing, so what you’re going to be verifying with it’s whether or not your producer:

  • can serialize the data it have to
  • handles serialization errors
  • handles errors associated to the connection to Kafka- i.e on ship()
  • applies any filtering correctly- i.e the variety of data truly despatched matches the quantity you count on
  • sends the data within the anticipated format — any enrichment or reformatting is appropriately utilized

For streams, the check and manufacturing lessons are cut up into separate libraries so it is advisable to add the org.apache.kafka:kafka-streams:X.X.X dependency to make use of the streams after which the org.apache.kafka:kafka-streams-test-utils:X.X.X one to utilize the handy check lessons.

Right here, issues work the opposite manner round — as a substitute of making a mock and passing it to the category you check, you create an occasion of the TopologyTestDriver by passing in your topology and properties. Because of this, making your software unit-testable right here would imply having a strategy to create your topology and move it to the check driver.

val driver = TopologyTestDriver(myTopology, myProperties)

After you have a driver occasion, it is advisable to explicitly create all of the subjects on your topology:

val myInputTopic = driver.createInputTopic(
Serdes.String().serializer(), // key kind
Serdes.String().serializer() // worth kind
val myOutputTopic = driver.createOutputTopic(
Serdes.String().deserializer(), // key kind
Serdes.String().deserializer() // worth kind
.... // create as many output subjects as your topology hasval myDlqTopic = driver.createOutputTopic(
Serdes.String().deserializer(), // key kind
Serdes.String().deserializer() // worth kind

Having all of the TestInputTopics and TestOutputTopics arrange, you’re able to get testing! Thrilling!

myInputTopic.pipeInput(key, validValue)
val actualRecord = myOutputTopic.readRecord()
assertEquals(expectedRecord, actualRecord, "Oh no, data do not match)

You too can work with a number of enter values without delay:

  • pipeValueList(Record<V>) — in case your check class solely cares for the values
  • pipeKeyValueList(Record<KeyValue<Ok,V>> — in case your check class solely cares for key and worth
  • pipeRecordList(Record<TestRecord<Ok,V>> — in case your check class solely additionally makes use of any headers or timestamps

Equally, for the output:

  • readValuesToList() — if it is advisable to confirm solely the values of the output
  • readKeyValuesToList() or readKeyValuesToMap() — if it is advisable to confirm solely key and worth of the output
  • readRecordsToList() — if it is advisable to examine the headers and timestamps of the output

Since we’re nonetheless speaking unit assessments right here, the checks are fairly fundamental — you confirm that your stream:

  • can serialize and deserialize the data it must
  • handles any exception within the desired manner
  • processes the data as anticipated — the output format matches the count on one, deal with the stream as black field
  • does any filtering as anticipated — the variety of the output data matches the variety of anticipated ones irrespective of what number of have been enter

In case you are utilizing a Processor to deal with you should use a MockProcessorContext to initialize your implementation of that interface. With it, you’ll be able to examine whether or not data get forwarded to the precise matter and offsets are dedicated.

val mockContext = MockProcessorContext<String, String>()
val processor = MyProcessor() // implementing Processor
...processor.course of(report)
val forwardedRecords = mockContext.forwareded()
assertEquals(1, forwardedRecords.dimension)
assertEquals(expectedRecord, forwardedRecords.mapit.report().first())
assertEqual(expectedTopic, forwardedRecords[0].childName().get())
// when you've got scheduled commit (or different motion) manipulate time bymockContext.scheduledPunctuators()[0].punctuator.punctuate(time)// examine if scheduled job is finishedassertTrue(mockContext.dedicated())

There are a number of methods to do that so I’ll listing some you can discover and check out which works for you:

KafkaStreams’ EmbeddedKafkaCluster

Inside the org.apache.kafka:kafka-streams-test-utils library, there are few extra useful lessons, considered one of which is the EmbeddedKafkaCluster. It will startup an in-memory Kafka cluster with 1 zookeeper occasion and configurable variety of brokers.

class MyKafkaStreamIntegrationTest{
val cluster = EmbeddedSingleNodeKafkaCluster()

enjoyable setup()
// ... different setup

// ... startup app and check

Spring’s EmbeddedKafka

Utilizing the org.springframework.kafka:spring-kafka-test library you’ll get entry to the “embedded”, in-memory occasion of kafka working at localhost:9092. To make use of it you simply have to annotate your check class with EmbeddedKafka

@DirtiesContext // if have multiple check class utilizing kafka
@EmbeddedKafka(partitions=1, brokerProperties = ..
class MySpringIntegrationTest
// startup your app and assert


There’s a small likelihood that the in-memory situations differ from the precise implementation of Kafka and Zookeeper, so there’s additionally the choice to make use of a docker container on your KafkaContainer.

KafkaContainer kafka = KafkaContainer(DockerImageName.parse(“confluentinc/cp-kafka:6.2.1”))
// configure

Charithe’s Kafka-JUnit

It is a not-so-standard library however does the job and offers some handy strategies that you simply would possibly discover helpful so price mentioning it right here. It has a junit4 and 5 implementations:

public static KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());
@KafkaJunitExtensionConfig(startupMode = StartupMode.WAIT_FOR_STARTUP)

Hope you discovered some helpful bits right here.

Thanks for studying!

More Posts