Anybody that’s ever labored with Kafka should have had some ideas in regards to the console client output. Normally, there are no less than 2 questions that come up inevitably:
- How do I print X? — X could be “key”, “partition”, “offset”, “timestamp”, “header”.. all bits that might be fairly helpful however should not a part of the default output. This was extra of a problem in model 2.6 and older the place among the above properties weren’t obtainable in any respect.
- How am I supposed to identify what I would like on this mess of characters? — the enjoyment of determining easy methods to print no matter you want is short-lived when you see the way it seems.
For those who’ve already googled all these, you recognize you should use kafkacat
and that’s a completely viable possibility. Nevertheless, I’m providing you a extra inventive and versatile different answer.
As talked about, by way of completeness the most recent variations of Kafka do supply methods to print extra particulars for the data by passing every extra property individually.
kafka-console-consumer
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
Nevertheless, should you are caught with legacy code or for some purpose nonetheless work on an older model, your choices are fairly restricted
kafka-console-consumer
--bootstrap-server localhost:9092
--topic my_topic
Both of these isn’t nice if you want to devour greater than 3–4 messages and that greater than as soon as. Belief me, your eyes will thanks should you swap to gazing one thing like this as a substitute:
kafka-console-consumer
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
--formatter my.customized.KafkaMetricsFormatter
I’m not saying this design is what it’s best to go for, what I’m saying is that there’s a solution to format the output in any method you need — add shade and spacing, why not go loopy and underline or make issues daring.
What’s extra, it’s tremendous simple, it’ll take you 15mins and possibly prevent extra in the long term…or no less than make you are feeling higher about utilizing the console client, or is it simply me?
Anyway, let’s see easy methods to obtain this:
First, create a scala venture and add Kafka as a dependency.
libraryDependencies += "org.apache.kafka" %% "kafka" % "3.1.0"
Second, create a brand new scala class extending the DefaultMessageFormatter
and override the writeTo
methodology to do the formatting any method you want.
The code for the above instance seems like this:
import Console._override def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit =def deserialize(deserializer: Possibility[Deserializer[_]], sourceBytes: Array[Byte], matter: String) =
val nonNullBytes = Possibility(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(matter, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytesimport consumerRecord._
if (printTimestamp && timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(utfBytes(s"$CYAN└── $timestampType:$RESET $timestamp"))
output.write(lineSeparator)if (printPartition)
output.write(utfBytes(s"$CYAN└── Partition:$RESET $partition().toString"))
output.write(lineSeparator)if (printOffset)
output.write(utfBytes(s"$CYAN└── Offset:$RESET $offset().toString"))
output.write(lineSeparator)if (printHeaders)
if (!headers().toArray.isEmpty)
output.write(utfBytes(s"$CYAN└── Headers:$RESET "))
val headersIt = headers().iterator
whereas (headersIt.hasNext)
val header = headersIt.subsequent()
output.write(utfBytes(s"nt$header.key(): "))
output.write(deserialize(headersDeserializer, header.worth(), matter))
if (headersIt.hasNext)
output.write(headersSeparator)
output.write(lineSeparator)if (printKey)
output.write(utfBytes(s"$CYAN└── Key:$RESET "))
output.write(deserialize(keyDeserializer, key, matter))
output.write(lineSeparator)if (printValue)
output.write(deserialize(valueDeserializer, worth, matter))
output.write(lineSeparator)
output.write(utfBytes(s"---------------"))
output.write(lineSeparator)personal def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)
Third, create the jar by operating:
sbt meeting
Fourth, copy or transfer the ensuing jar from <venture>/goal/jars/**
to the libs dir of your native Kafka set up (must be one thing like **/usr/native/Cellar/kafka/3.0.1/libexec/libs/**
).
Lastly, use and luxuriate in by simply passing this to any Kafka-console-consumer command:
--formatter my.customized.KafkaMessageFormatter
You’ll be able to see the complete code for reference here or clone and use as-is or edit to match your preferences.
Now, with this cool console output.. devour all of the messages !!