Better Programming

Advice for programmers.

Testing Kafka Applications —Libraries for Unit and Integration Tests

Ivelina Yordanova
Better Programming
Published in
6 min readApr 13, 2022

--

Photo by Siora Photography on Unsplash

You are given a task that involves working with Kafka or you are designing an entirely new application? It’s a good practice to have testing in mind, if not even try to use TDD (test-driven development). My goal here is not to give you test examples, there are many available online and most are just the skeleton of a test anyway, what I find usually incredibly useful is summarised information in one place, lists, and bullet points (that’s my jam). What’s more, if you have recently started working with Kafka you might not even be aware of what’s available out there.

So, this here would be a list of the classes and libraries that you can use for your unit and integration tests depending on what libraries and classes you have already decided on using in your main implementation.

KafkaConsumer

To unit test your simple consumer you can use the MockConsumer provided by the org.apache.kafka:kafka-clients:X.X.X library. It’s very easy and intuitive to set up all your tests. The only thing you need to keep in mind when designing your application (or refactor later) is to make your classes using or wrapping the KafkaConsumer instance testable. This means having them expect a Consumer<K, V> to be passed in whether through calling a factory method, directly in the constructor, or injected as a bean.

To create an instance of the MockConsumer you need to just match the type of the key and value of your records and “tell” it where to start reading:

val mockConsumer = MockConsumer<String, String>(OffsetResetStrategy.LATEST)

Both KafkaConsumer and MockConsumer implement the Consumer<K,V> interface so once you pass that into the class you’re testing, it will interact with it like a real consumer. What’s different here is that there are additional methods you can use to prepare the conditions for your tests:

  • addRecord() to your mock to essentially prepare Records to be read before you start your test
  • schedulePollTask() where you can pass any Runnable to be executed on the subsequent poll
  • setPollException() to check your consumer’s behaviour on exception

Since this is only meant for unit testing, what you test here is basically whether your real consumer:

  • can deserialize the records it is meant to read
  • processes the records as expected — ideally there is division of responsibilities , so “processing” here might just mean passing expected values to another class that is the one dealing with the business logic and that class is unit-tested separately
  • filters any records (if that’s required)
  • handles errors in deserialization or processing
  • handles errors with the connection to Kafka — on subscribing, polling or committing
  • manipulates the offsets correctly — whether, when and how often it commits

Note: If your consumer is manipulating the offsets in a non-standard way, then you can keep using the same instance between all your unit tests but updateBeginningOffsets() and updateEndOffsets().

KafkaProducer

The kafka-clients library includes also a MockProducer class which implements the same interface Producer<K, V>as the KafkaProducer . So, similar to the consumer testing above, your production classes should be designed in a way that will allow you to pass the mock in.

To create an instance of the MockProducer you need to match the type of the key and value of your records and “tell” it whether to automatically complete the sendrequests successfully (autocomplete=true) or you want to explicitly complete them by calling completeNext() or errorNext() .

val mockProduer = MockProducer(true, StringSerializer(), StringSerializer())

To test the normal flow of work of your producer you’ll want to line up some work for your producer to do, use autocomplete and then check what has been sent using the history() method on the MockProducer. This will return a list of all ProducerRecord-s sent since the last time you calledclear() on the mock.

To test how you handle exceptions you can set autocomplete to false and errorNext() which will throw any RuntimeException you pass in it on the next incomplete send call.

Here, also, the testing capabilities are limited to unit testing, so what you are going to be verifying with it is whether your producer:

  • can serialize the records it need to
  • handles serialization errors
  • handles errors related to the connection to Kafka- i.e on send()
  • applies any filtering correctly- i.e the number of records actually sent matches the number you expect
  • sends the records in the expected format — any enrichment or reformatting is correctly applied

KafkaStreams

For streams, the test and production classes are split into separate libraries so you need to add the org.apache.kafka:kafka-streams:X.X.X dependency to use the streams and then the org.apache.kafka:kafka-streams-test-utils:X.X.X one to make use of the convenient test classes.

Here, things work the other way around — instead of creating a mock and passing it to the class you test, you create an instance of the TopologyTestDriver by passing in your topology and properties. As a result, making your application unit-testable here would mean having a way to create your topology and pass it to the test driver.

val driver = TopologyTestDriver(myTopology, myProperties)

Once you have a driver instance, you need to explicitly create all the topics for your topology:

val myInputTopic = driver.createInputTopic(
inputTopicName,
Serdes.String().serializer(), // key type
Serdes.String().serializer() // value type
)
val myOutputTopic = driver.createOutputTopic(
outputTopicName,
Serdes.String().deserializer(), // key type
Serdes.String().deserializer() // value type
)
.... // create as many output topics as your topology hasval myDlqTopic = driver.createOutputTopic(
dlqTopicName,
Serdes.String().deserializer(), // key type
Serdes.String().deserializer() // value type
)

Having all the TestInputTopics and TestOutputTopics set up, you’re ready to get testing! Exciting!

myInputTopic.pipeInput(key, validValue)
...
assertTrue(myDlqTopic.isEmpty)
assertFalse(myOutputTopic.isEmpty)
val actualRecord = myOutputTopic.readRecord()
assertEquals(expectedRecord, actualRecord, "Oh no, records don't match)

You can also work with multiple input values at once:

  • pipeValueList(List<V>) — if your test class only cares for the values
  • pipeKeyValueList(List<KeyValue<K,V>> — if your test class only cares for key and value
  • pipeRecordList(List<TestRecord<K,V>> — if your test class only also uses any headers or timestamps

Similarly, for the output:

  • readValuesToList() — if you need to verify only the values of the output
  • readKeyValuesToList() or readKeyValuesToMap() — if you need to verify only key and value of the output
  • readRecordsToList() — if you need to check the headers and timestamps of the output

Since we are still talking unit tests here, the checks are pretty basic — you verify that your stream:

  • can serialize and deserialize the records it needs to
  • handles any exception in the desired way
  • processes the records as expected — the output format matches the expect one, treat the stream as black box
  • does any filtering as expected — the number of the output records matches the number of expected ones no matter how many have been input

Stream Processor

If you are using a Processor to handle you can use a MockProcessorContext to initialize your implementation of that interface. With it, you can check whether records get forwarded to the right topic and offsets are committed.

val mockContext = MockProcessorContext<String, String>()
val processor = MyProcessor() // implementing Processor
processor.init(mockContext)
...processor.process(record)
val forwardedRecords = mockContext.forwareded()
assertEquals(1, forwardedRecords.size)
assertEquals(expectedRecord, forwardedRecords.map{it.record()}.first())
assertEqual(expectedTopic, forwardedRecords[0].childName().get())
// if you have scheduled commit (or other action) manipulate time bymockContext.scheduledPunctuators()[0].punctuator.punctuate(time)// check if scheduled task is doneassertTrue(mockContext.committed())

Integration tests

There are multiple ways to do this so I’ll list some that you can explore and test out which works for you:

KafkaStreams’ EmbeddedKafkaCluster

Within the org.apache.kafka:kafka-streams-test-utils library, there are few more helpful classes, one of which is the EmbeddedKafkaCluster. This will startup an in-memory Kafka cluster with 1 zookeeper instance and configurable number of brokers.

class MyKafkaStreamIntegrationTest{
@ClassRule
val cluster = EmbeddedSingleNodeKafkaCluster()

@BeforeAll
fun setup() {
cluster.createTopic(myTopic)
// ... other setup
}

// ... startup app and test

Spring’s EmbeddedKafka

Using the org.springframework.kafka:spring-kafka-test library you’ll get access to the “embedded”, in-memory instance of kafka running at localhost:9092. To use it you just need to annotate your test class with EmbeddedKafka

@SpringBootTest
@DirtiesContext // if have more than one test class using kafka
@EmbeddedKafka(partitions=1, brokerProperties = {..}
class MySpringIntegrationTest{
// startup your app and assert
}

TestContainer

There is a small chance that the in-memory instances differ from the actual implementation of Kafka and Zookeeper, so there is also the option to use a docker container for your KafkaContainer.

KafkaContainer kafka = KafkaContainer(DockerImageName.parse(“confluentinc/cp-kafka:6.2.1”))
// configure
kafka.start()

Charithe’s Kafka-JUnit

This is a not-so-standard library but does the job and provides some convenient methods that you might find useful so worth mentioning it here. It has a junit4 and 5 implementations:

@ClassRule
public static KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());
@ExtendWith(KafkaJunitExtension.class)
@KafkaJunitExtensionConfig(startupMode = StartupMode.WAIT_FOR_STARTUP)

Hope you found some useful bits here.

Thanks for reading!

--

--

No responses yet