Monitoring Kafka Applications — Implementing Healthchecks and Tracking Lag | by Ivelina Yordanova | Apr, 2022

Photograph by Zhen Hu on Unsplash

Microservices usually use a mannequin of sending or replying to heartbeats/well being checks as a manner of offering standing data to reporting, scheduling, or orchestrating providers. These will not be solely necessary in the course of the regular life cycle of an software however usually additionally throughout the brand new model roll-out.

We’ll take a look at how the well being examine will be carried out utilizing the totally different libraries and abstractions, however first, let’s agree on what we’ll take into account when checking the well being of an software.

For a Kafka software the well being examine ought to purposefully fail in two instances:

  • There is a matter with the communication with Kafka — this is usually a momentary community blip, an unusually longer processing time that brought about failed heartbeat, and the patron bought kicked out or did not commit. These are the varieties of errors the place it is smart to trigger a restart (that’s what an orchestration system would do at any time when an occasion is taken into account to be unhealthy) as a result of they might most undoubtedly resolve on restart
  • There’s a important problem with the app that can’t be ignored and must be resolved earlier than any additional processing. What I imply is, if the app is processing streamed information and it can not skip or miss a single document. In that case, if there’s a bug within the app or one other app it interacts with and/or is dependent upon, that’s rendering all excellent data unprocessable, the app mustn’t commit any offsets and transfer forward. The one resolution is to cease and one straightforward manner to try this is to report being unhealthy, be restarted and hopefully, somebody fixes the bug shortly.

Failures to course of particular person data attributable to issues with these data themselves, whether or not that’s their format, construction, or anything, shouldn’t be thought of within the well being examine. These minor points must be dealt with by sending these data to a DLQ (Lifeless Letter Queue), logging out the issue, and alert on it. What to do with the data within the DLQ goes out of the scope for this story.

The implementations of KafkaConsumer do not likely present a “state” out of the field, so the appliance wants to trace that throughout the entire lifecycle — from subscribing/assigning matter and partition, by polling, doubtlessly pausing, resuming, and shutting. An exception will be thrown at every of those phases and the transitions between them. The great and the dangerous of the scenario is that you just as a developer have full management of these key moments so it’s as much as you to deal with it proper — catch any Kafka associated exception, log it, alert on it and alter the cached state, used as a well being indicator.

The important thing factor to notice right here is that KafkaConsumer might be operating in a single thread and the request checking the well being might be dealt with in one other so be sure to deal with the cached state correctly.

In one of my previous posts, I described intimately one resolution coping with a number of threads and states in an app like this. Primarily, the app must hold a State and have certainly one of them being ERROR.

enum State 
enjoyable isHealthy(): Boolean
return this != ERROR

Then in your well being examine endpoint, all you want is to examine state.isHealthy(). With Spring, that might look one thing like this:

class HealthCheckIndicator: HealthCheckIndicator {
override enjoyable well being(): Well being
return if(state.isHealthy())
Well being.up()
Well being.down()

Internally, KafkaStreams makes use of a traditional KafkaProducer and KafkaConsumer, however the abstraction provides a few neat options (effectively not solely a pair however a pair helpful on this context) — it supplies a technique to get the present state, and what’s extra, calling it’s thread-safe.

There are seven potential values for the KafkaStreams.State:

  • CREATED — at the beginning of the life cycle
  • RUNNING — able to devour or consuming
  • REBALANCING — the patron group is rebalancing
  • PENDING_SHUTDOWN — transition state from any of the above to NOT_RUNNING
  • NOT_RUNNING — stopped because of the conventional life cycle by calling shut()
  • PENDING_ERROR — transition state to ERROR
  • ERROR — the stream can not get well by itself

The final two will be reported as “down” or “unhealthy.”

The Spring implementation is even additional indifferent from the low-level ideas and supplies a fancier manner of monitoring the state. You’ll be able to register a StateListenerwhen you create your stream efficiently. The interface is virtually a client, receiving the brand new and the previous state of the stream on every transition.

In case your app must reply to a different service polling the state, then you may’t use the listener immediately, you continue to must cache the well being and use that variable to answer to the well being examine request.

// create the 'stream'
stream.setStateListener newState == PENDING_ERROR)
wholesome = false

Nonetheless, in case you are imagined to be pushing the state at scheduled intervals, then you need to use the listener implementation immediately.

As you might need guessed, the enum for the state is identical KafkaStreams.State talked about within the earlier part.

Kafka performs an enormous position in a data-driven firm, nevertheless it alone shouldn’t be sufficient. Usually, there are a number of inner and exterior “non-kafka” streams that have to be built-in with Kafka, and right here comes Kafka-Join. It supplies a reasonably wealthy ecosystem of ready-to-use connectors from information sinks like S3, Snowflake, Mongo and even monitoring CDC (change information seize) from SQL DB.

Every connector has a set of duties copying the information from the supply and with these operating in parallel it would occur that a number of fail.

There is no such thing as a out-of-box well being examine endpoint offered at the moment, however there’s a technique to lengthen your deployment and add it your self. To try this you could:

  • create a small Java venture and add the org.apache.kafka:connect-api:XXX dependency.
  • lengthen the ConnectRestExtension and implement the register technique:
public void register(ConnectRestExtensionContext restPluginContext)
.register(new HealthcheckController(new HealthcheckService(restPluginContext.clusterState())));
  • outline an endpoint in your controller:
  • implement the logic for the precise well being examine within the service:

The ConnectorState is so simple as it will get:

enum ConnectorState 

public boolean isHealthy()
return this == HEALTHY;

  • construct and put the jar into the plugin folder (deployment-specific)
  • add this to your configuration to activate the extension:

That is essentially the most excessive, easy, and albeit not ideally suited implementation the place you report kafka-connect as unhealthy even when a single job has failed. This can be a good place to begin whenever you begin creating and experimenting with a connector. A greater manner of dealing with this, nevertheless, upon getting a number of connectors with a number of duties every, is to aim to get well the person duties and connectors first.

Kafka-connect exposes endpoints for restarts of

  • a connector by title: /connectors/<title>/restart
  • a job by connector title and job id /connectors/<title>/duties/<id>/restart

A method to do that is so as to add a 3rd worth within the ConnectorState enum, one thing like UNHEALTHY_TASKS, and the service class accessing all that data can return a wrapper class as a substitute, with the title of the connector and the ids of the unhealthy duties:

Then, both the extension itself or one other service (not solely the orchestrator or scheduler which usually would) may use the well being examine to set off the restarts and alerts. If a connector or a job is unhealthy, you may spin off a thread within the extension to hit the restarting endpoint and retry N instances earlier than alerting.

Responding with “unhealthy” on this situation will solely occur when all connectors are down and one thing horrific has occurred. You’ll in all probability must intervene not directly if all N makes an attempt for self-healing have failed and also you obtain an alert.

In case you are utilizing Spring’s HealthIndicator be sure no matter you’re reporting this well being to can “learn” the response, as a result of you may add all the small print on this planet in there and report any state and the response code might be 200. If the orchestration or different system solely depends on you replying OK for wholesome and 5XX for non-healthy, then be sure so as to add this config:

well being:
UP: 200
DOWN: 503 // or no matter suits

Client lag is virtually the distinction between the final dedicated offset from a client group and the final out there for learn offset. If the speed of manufacturing of information far exceeds the speed at which it’s consumed or the patron is having a difficulty processing the incoming messages, the patron group will lag.

This can be utilized as a efficiency indicator. When you’ve got agreed on an SLO (Service Stage Goal) for the pace at which information ought to arrive from supply to vacation spot and it’s not being met, then a fast take a look at the patron lag alert or dashboard will inform you which app within the pipeline is the responsible participant.

One other potential use case is if in case you have a sudden inflow of information that the purposes will not be designed to deal with in a well timed method. This could be to do with the seasonality or one of many companies being the seasonality of the enterprise or it being affected by a one-off occasion. In these instances, if not deliberate for, you’ll discover a spike within the lag graph and also you may must manually change the configuration and scale your shoppers within the affected teams.

Moreover, the lag could be a symptom of a bug within the processing that may solely be noticed as soon as the app is put below extra stress. This may be fairly harmful for the reason that app is actively processing, nevertheless gradual, however relying on the kind of bug it may not be doing that proper.

Additionally, the app may even be “lifeless” however falsely reporting being OK, therefore why I used to be speaking concerning the healthcheck. It’s not essentially the most thrilling job to work on however nobody desires zombie apps in manufacturing.

In case you are utilizing a cloud supplier, they likely expose an endpoint so that you can get that metric and tie it to a visualization and alerting software of your alternative.

Confluent for instance supplies a wide range of metrics by a REST API. You may get details about the patron lag at totally different ranges of granularity — per group solely, per group and matter, and even per group, matter and partition. That is the instance question from their web site:

You’ll be able to search for different Confluent-specific methods to trace the lag right here

When you’ve got an in-house deployment of Kafka and you could develop your personal metrics reporting service, then you are able to do that programmatically by the AdminClient API.

Right here is essentially the most compressed instance in Kotlin, with no take care of exception dealing with and the return format, simply to demo the concept:

That is extra of an area, intermittent use sort of resolution, nevertheless it’s price mentioning:

--bootstrap-server localhost:9092
--group my_group

It will return the data within the following format:


There are extra metrics related to the general well being of the information pipeline, however these two indicators must be possibly the primary two you arrange.

Hope you discovered one thing helpful right here. Thanks for studying!

More Posts