For KafkaConsumer, KafkaProducer, Streams and Spring-Kafka
You’re given a job that includes working with Kafka or you might be designing a completely new software? It’s a very good observe to have testing in thoughts, if not even attempt to use TDD (test-driven improvement). My objective right here is to not provide you with check examples, there are lots of out there on-line and most are simply the skeleton of a check anyway, what I discover normally extremely helpful is summarised info in a single place, lists, and bullet factors (that’s my jam). What’s extra, when you’ve got just lately began working with Kafka you won’t even pay attention to what’s out there on the market.
So, this right here could be a listing of the lessons and libraries that you should use on your unit and integration assessments relying on what libraries and lessons you’ve got already selected utilizing in your foremost implementation.
To unit check your easy client you should use the
MockConsumer offered by the
org.apache.kafka:kafka-clients:X.X.X library. It’s very straightforward and intuitive to arrange all of your assessments. The one factor you want to remember when designing your software (or refactor later) is to make your lessons utilizing or wrapping the KafkaConsumer occasion testable. This implies having them count on a
Shopper<Ok, V> to be handed in whether or not via calling a manufacturing unit technique, instantly within the constructor, or injected as a bean.
To create an occasion of the MockConsumer it is advisable to simply match the kind of the important thing and worth of your data and “inform” it the place to start out studying:
val mockConsumer = MockConsumer<String, String>(OffsetResetStrategy.LATEST)
MockConsumer implement the
Shopper<Ok,V> interface so when you move that into the category you’re testing, it’ll work together with it like an actual client. What’s totally different right here is that there are further strategies you should use to organize the circumstances on your assessments:
addRecord()to your mock to basically put together Information to be learn earlier than you begin your check
schedulePollTask()the place you’ll be able to move any Runnable to be executed on the next ballot
setPollException()to examine your client’s behaviour on exception
Since that is solely meant for unit testing, what you check right here is principally whether or not your actual client:
- can deserialize the data it’s meant to learn
- processes the data as anticipated — ideally there’s division of tasks , so “processing” right here would possibly simply imply passing anticipated values to a different class that’s the one coping with the enterprise logic and that class is unit-tested individually
- filters any data (if that’s required)
- handles errors in deserialization or processing
- handles errors with the connection to Kafka — on subscribing, polling or committing
- manipulates the offsets appropriately — whether or not, when and the way usually it commits
Be aware: In case your client is manipulating the offsets in a non-standard manner, then you’ll be able to preserve utilizing the identical occasion between all of your unit assessments however
kafka-clients library contains additionally a
MockProducer class which implements the identical interface
Producer<Ok, V>because the
KafkaProducer . So, just like the buyer testing above, your manufacturing lessons needs to be designed in a manner that may mean you can move the mock in.
To create an occasion of the
MockProducer it is advisable to match the kind of the important thing and worth of your data and “inform” it whether or not to robotically full the
shiprequests efficiently (autocomplete=true) otherwise you wish to explicitly full them by calling
val mockProduer = MockProducer(true, StringSerializer(), StringSerializer())
To check the conventional circulation of labor of your producer you’ll wish to line up some work on your producer to do, use autocomplete after which examine what has been despatched utilizing the
historical past() technique on the MockProducer. It will return a listing of all ProducerRecord-s despatched for the reason that final time you known as
clear() on the mock.
To check the way you deal with exceptions you’ll be able to set autocomplete to false and
errorNext() which can throw any RuntimeException you move in it on the following incomplete
Right here, additionally, the testing capabilities are restricted to unit testing, so what you’re going to be verifying with it’s whether or not your producer:
- can serialize the data it have to
- handles serialization errors
- handles errors associated to the connection to Kafka- i.e on
- applies any filtering correctly- i.e the variety of data truly despatched matches the quantity you count on
- sends the data within the anticipated format — any enrichment or reformatting is appropriately utilized
For streams, the check and manufacturing lessons are cut up into separate libraries so it is advisable to add the
org.apache.kafka:kafka-streams:X.X.X dependency to make use of the streams after which the
org.apache.kafka:kafka-streams-test-utils:X.X.X one to utilize the handy check lessons.
Right here, issues work the opposite manner round — as a substitute of making a mock and passing it to the category you check, you create an occasion of the
TopologyTestDriver by passing in your topology and properties. Because of this, making your software unit-testable right here would imply having a strategy to create your topology and move it to the check driver.
val driver = TopologyTestDriver(myTopology, myProperties)
After you have a driver occasion, it is advisable to explicitly create all of the subjects on your topology:
val myInputTopic = driver.createInputTopic(
Serdes.String().serializer(), // key kind
Serdes.String().serializer() // worth kind
)val myOutputTopic = driver.createOutputTopic(
Serdes.String().deserializer(), // key kind
Serdes.String().deserializer() // worth kind
).... // create as many output subjects as your topology hasval myDlqTopic = driver.createOutputTopic(
Serdes.String().deserializer(), // key kind
Serdes.String().deserializer() // worth kind
assertFalse(myOutputTopic.isEmpty)val actualRecord = myOutputTopic.readRecord()
assertEquals(expectedRecord, actualRecord, "Oh no, data do not match)
You too can work with a number of enter values without delay:
pipeValueList(Record<V>)— in case your check class solely cares for the values
pipeKeyValueList(Record<KeyValue<Ok,V>>— in case your check class solely cares for key and worth
pipeRecordList(Record<TestRecord<Ok,V>>— in case your check class solely additionally makes use of any headers or timestamps
Equally, for the output:
readValuesToList()— if it is advisable to confirm solely the values of the output
readKeyValuesToMap()— if it is advisable to confirm solely key and worth of the output
readRecordsToList()— if it is advisable to examine the headers and timestamps of the output
Since we’re nonetheless speaking unit assessments right here, the checks are fairly fundamental — you confirm that your stream:
- can serialize and deserialize the data it must
- handles any exception within the desired manner
- processes the data as anticipated — the output format matches the count on one, deal with the stream as black field
- does any filtering as anticipated — the variety of the output data matches the variety of anticipated ones irrespective of what number of have been enter
In case you are utilizing a Processor to deal with you should use a MockProcessorContext to initialize your implementation of that interface. With it, you’ll be able to examine whether or not data get forwarded to the precise matter and offsets are dedicated.
val mockContext = MockProcessorContext<String, String>()
val processor = MyProcessor() // implementing Processor
val forwardedRecords = mockContext.forwareded()
assertEqual(expectedTopic, forwardedRecords.childName().get())// when you've got scheduled commit (or different motion) manipulate time bymockContext.scheduledPunctuators().punctuator.punctuate(time)// examine if scheduled job is finishedassertTrue(mockContext.dedicated())
There are a number of methods to do that so I’ll listing some you can discover and check out which works for you:
org.apache.kafka:kafka-streams-test-utils library, there are few extra useful lessons, considered one of which is the EmbeddedKafkaCluster. It will startup an in-memory Kafka cluster with 1 zookeeper occasion and configurable variety of brokers.
val cluster = EmbeddedSingleNodeKafkaCluster()
// ... different setup
// ... startup app and check
org.springframework.kafka:spring-kafka-test library you’ll get entry to the “embedded”, in-memory occasion of kafka working at
localhost:9092. To make use of it you simply have to annotate your check class with EmbeddedKafka
@DirtiesContext // if have multiple check class utilizing kafka
@EmbeddedKafka(partitions=1, brokerProperties = ..
// startup your app and assert
There’s a small likelihood that the in-memory situations differ from the precise implementation of Kafka and Zookeeper, so there’s additionally the choice to make use of a docker container on your
KafkaContainer kafka = KafkaContainer(DockerImageName.parse(“confluentinc/cp-kafka:6.2.1”))
It is a not-so-standard library however does the job and offers some handy strategies that you simply would possibly discover helpful so price mentioning it right here. It has a junit4 and 5 implementations:
public static KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());@ExtendWith(KafkaJunitExtension.class)
@KafkaJunitExtensionConfig(startupMode = StartupMode.WAIT_FOR_STARTUP)
Hope you discovered some helpful bits right here.
Thanks for studying!