Monitoring Kafka Applications — Implementing Healthchecks and Tracking Lag
For KafkaConsumer, Streams, Spring-Kafka, Kafka-Connect
Microservices often use a model of sending or replying to heartbeats/health checks as a way of providing status information to reporting, scheduling, or orchestrating services. Those are not only important during the normal life cycle of an application but often also during the new version roll-out.
We’ll have a look at how the health check can be implemented using the different libraries and abstractions, but first, let’s agree on what we’ll consider when checking the health of an application.
What Does Health Mean for Kafka Applications?
For a Kafka application the health check should purposefully fail in two cases:
- There is an issue with the communication with Kafka — this can be a temporary network blip, an unusually longer processing time that caused failed heartbeat, and the consumer got kicked out or failed to commit. These are the types of errors where it makes sense to cause a restart (that’s what an orchestration system would do whenever an instance is considered to be unhealthy) because they would most definitely resolve on restart
- There is a critical issue with the app that cannot be ignored and needs to be resolved before any further processing. What I mean is, if the app is processing streamed data and it cannot skip or miss a single record. In that case, if there is a bug in the app or another app it interacts with and/or depends on, that is rendering all outstanding records unprocessable, the app should not commit any offsets and move ahead. The only solution is to stop and one easy way to do that is to report being unhealthy, be restarted and hopefully, someone fixes the bug quickly.
Failures to process individual records due to problems with those records themselves, whether that’s their format, structure, or anything else, should not be considered in the health check. Those minor issues should be handled by sending those records to a DLQ (Dead Letter Queue), logging out the problem, and alert on it. What to do with the records in the DLQ is going out of the scope for this story.
Simple Kafka Consumer
The implementations of KafkaConsumer
do not really provide a “state” out of the box, so the application needs to track that across the whole lifecycle — from subscribing/assigning topic and partition, through polling, potentially pausing, resuming, and closing. An exception can be thrown at each of these stages and the transitions between them. The good and the bad of the situation is that you as a developer have full control of those key moments so it’s up to you to handle it right — catch any Kafka related exception, log it, alert on it and change the cached state, used as a health indicator.
The key thing to note here is that KafkaConsumer
will be running in one thread and the request checking the health will be handled in another so make sure to handle the cached state properly.
In one of my previous posts, I described in detail one solution dealing with multiple threads and states in an app like this. Essentially, the app needs to keep a State and have one of them being ERROR.
enum State {
CONSUMING,
PAUSED,
...
ERROR; fun isHealthy(): Boolean {
return this != ERROR
}
}
Then in your health check endpoint, all you need is to check state.isHealthy()
. With Spring, that would look something like this:
class HealthCheckIndicator: HealthCheckIndicator {
override fun health(): Health{
return if(state.isHealthy()) {
Health.up()
} else {
Health.down()
}.build()
}
Kafka Streams
Internally, KafkaStreams
uses a normal KafkaProducer
and KafkaConsumer
, but the abstraction adds a couple of neat features (well not only a couple but a couple useful in this context) — it provides a method to get the current state, and what’s more, calling it is thread-safe.
There are seven possible values for the KafkaStreams.State
:
CREATED
— at the start of the life cycleRUNNING
— ready to consume or consumingREBALANCING
— the consumer group is rebalancingPENDING_SHUTDOWN
— transition state from any of the above toNOT_RUNNING
NOT_RUNNING
— stopped as a result of the normal life cycle by callingclose()
PENDING_ERROR
— transition state toERROR
ERROR
— the stream cannot recover on its own
The last two can be reported as “down” or “unhealthy.”
Spring Kafka Streams
The Spring implementation is even further detached from the low-level concepts and provides a fancier way of tracking the state. You can register a StateListener
once you create your stream successfully. The interface is practically a consumer, receiving the new and the old state of the stream on each transition.
If your app needs to reply to another service polling the state, then you can’t use the listener directly, you still need to cache the health and use that variable to reply to the health check request.
// create the 'stream'
stream.setStateListener{ newState, oldState ->
if (newState == ERROR || newState == PENDING_ERROR)
healthy = false
}
However, if you are supposed to be pushing the state at scheduled intervals, then you can use the listener implementation directly.
As you might have guessed, the enum for the state is the same KafkaStreams.State
mentioned in the previous section.
Kafka-Connect
Kafka plays a huge role in a data-driven company, but it alone is not enough. Often, there are a variety of internal and external “non-kafka” streams that need to be integrated with Kafka, and here comes Kafka-Connect. It provides a pretty rich ecosystem of ready-to-use connectors from data sinks like S3, Snowflake, Mongo and even tracking CDC (change data capture) from SQL DB.
Each connector has a set of tasks copying the data from the source and with those running in parallel it might happen that one or more fail.
There is no out-of-box health check endpoint provided at this time, but there is a way to extend your deployment and add it yourself. To do that you need to:
- create a small Java project and add the
org.apache.kafka:connect-api:XXX
dependency. - extend the
ConnectRestExtension
and implement theregister
method:
@Override
public void register(ConnectRestExtensionContext restPluginContext){
restPluginContext
.configurable()
.register(new HealthcheckController(new HealthcheckService(restPluginContext.clusterState())));
}
- define an endpoint in your controller:
- implement the logic for the actual health check in the service:
The ConnectorState
is as simple as it gets:
enum ConnectorState {
HEALTHY,
UNHEALTHY;
public boolean isHealthy() {
return this == HEALTHY;
}
}
- build and put the jar into the plugin folder (deployment-specific)
- add this to your configuration to activate the extension:
rest.extension.classes=my.kafkaconnect.extension.HealthcheckRestExtension
This is the most extreme, simple, and frankly not ideal implementation where you report kafka-connect
as unhealthy even if a single task has failed. This is a good starting point when you start developing and experimenting with a connector. A better way of handling this, however, once you have multiple connectors with multiple tasks each, is to attempt to recover the individual tasks and connectors first.
Kafka-connect
exposes endpoints for restarts of
- a connector by name:
/connectors/<name>/restart
- a task by connector name and task id
/connectors/<name>/tasks/<id>/restart
One way to do this is to add a third value in the ConnectorState
enum, something like UNHEALTHY_TASKS
, and the service class having access to all that information can return a wrapper class instead, with the name of the connector and the ids of the unhealthy tasks:
Then, either the extension itself or another service (not only the orchestrator or scheduler which normally would) could use the health check to trigger the restarts and alerts. If a connector or a task is unhealthy, you could spin off a thread in the extension to hit the restarting endpoint and retry N times before alerting.
Responding with “unhealthy” in this scenario will only happen when all connectors are down and something horrific has happened. You will probably need to intervene in some way if all N attempts for self-healing have failed and you receive an alert.
Spring Gotcha
If you are using Spring’s HealthIndicator
make sure whatever you are reporting this health to can “read” the response, because you can add all the details in the world in there and report any state and the response code will be 200. If the orchestration or other system only relies on you replying OK for healthy and 5XX for non-healthy, then make sure to add this config:
management:
endpoint:
health:
status:
http-mapping:
UP: 200
DOWN: 503 // or whatever fits
Why Track Consumer Lag
Consumer lag is practically the difference between the last committed offset from a consumer group and the last available for read offset. If the rate of production of data far exceeds the rate at which it’s consumed or the consumer is having an issue processing the incoming messages, the consumer group will lag.
This can be used as a performance indicator. If you have agreed on an SLO (Service Level Objective) for the speed at which data should arrive from source to destination and it’s not being met, then a quick look at the consumer lag alert or dashboard will tell you which app in the pipeline is the guilty participant.
Another potential use case is if you have a sudden influx of data that the applications are not designed to handle in a timely manner. This might be to do with the seasonality or one of the businesses being the seasonality of the business or it being affected by a one-off event. In those cases, if not planned for, you’ll notice a spike in the lag graph and you might need to manually change the configuration and scale your consumers in the affected groups.
Additionally, the lag might be a symptom of a bug in the processing that can only be spotted once the app is put under more pressure. This would be quite dangerous since the app is actively processing, however slow, but depending on the type of bug it might not be doing that right.
Also, the app can even be “dead” but falsely reporting being OK, hence why I was talking about the healthcheck. It’s not the most exciting task to work on but no one wants zombie apps in production.
Tracking Consumer Lag — Third-Party cloud provider
If you are using a cloud provider, they most probably expose an endpoint for you to get that metric and tie it to a visualization and alerting tool of your choice.
Confluent for example provides a variety of metrics through a REST API. You can get information about the consumer lag at different levels of granularity — per group only, per group and topic, and even per group, topic and partition. This is the example query from their site:
You can look up other Confluent-specific ways to track the lag here https://docs.confluent.io/cloud/current/monitoring/monitor-lag.html
Tracking Consumer Lag — programmatically
If you have an in-house deployment of Kafka and you need to develop your own metrics reporting service, then you can do that programmatically through the AdminClient
API.
Here is the most compressed example in Kotlin, with no care for exception handling and the return format, just to demo the idea:
Tracking Consumer Lag — CLI
This is more of a local, intermittent use kind of solution, but it’s worth mentioning:
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group my_group \
--describe
This will return the information in the following format:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
There are more metrics relevant to the overall health of the data pipeline, but those two indicators should be maybe the first two you set up.
Hope you found something useful here. Thanks for reading!