Formatting The Apache Kafka Console Consumer Output | by Ivelina Yordanova | Feb, 2022

Eat all of the messages

Picture by Kevin Ku on Unsplash
kafka-console-consumer 
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
kafka-console-consumer 
--bootstrap-server localhost:9092
--topic my_topic
kafka-console-consumer 
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
--formatter my.customized.KafkaMetricsFormatter
libraryDependencies += "org.apache.kafka" %% "kafka" % "3.1.0"
import Console._override def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = 

def deserialize(deserializer: Possibility[Deserializer[_]], sourceBytes: Array[Byte], matter: String) =
val nonNullBytes = Possibility(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(matter, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes

import consumerRecord._

if (printTimestamp && timestampType != TimestampType.NO_TIMESTAMP_TYPE)
output.write(utfBytes(s"$CYAN└── $timestampType:$RESET $timestamp"))
output.write(lineSeparator)

if (printPartition)
output.write(utfBytes(s"$CYAN└── Partition:$RESET $partition().toString"))
output.write(lineSeparator)

if (printOffset)
output.write(utfBytes(s"$CYAN└── Offset:$RESET $offset().toString"))
output.write(lineSeparator)

if (printHeaders)
if (!headers().toArray.isEmpty)
output.write(utfBytes(s"$CYAN└── Headers:$RESET "))

val headersIt = headers().iterator
whereas (headersIt.hasNext)
val header = headersIt.subsequent()
output.write(utfBytes(s"nt$header.key(): "))
output.write(deserialize(headersDeserializer, header.worth(), matter))
if (headersIt.hasNext)
output.write(headersSeparator)


output.write(lineSeparator)

if (printKey)
output.write(utfBytes(s"$CYAN└── Key:$RESET "))
output.write(deserialize(keyDeserializer, key, matter))
output.write(lineSeparator)

if (printValue)
output.write(deserialize(valueDeserializer, worth, matter))
output.write(lineSeparator)
output.write(utfBytes(s"---------------"))
output.write(lineSeparator)

personal def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)

sbt meeting
--formatter my.customized.KafkaMessageFormatter

More Posts