Category: Kafka

Counting Messages in All Kafka Topics

For some reason, I am given a lot of Kafka instances that no one knows what they are or what they do. The first step, generally, is figuring out if it does anything. Because a server that no one has sent a message to in a year or two … well, there’s not much point in bringing it up to standard, monitoring it, and such. My first glance analysis has been just counting all of the messages in all of the topics to see which topics are actually used — quick bash script to accomplish this (presuming a Kafka broker is on port 9092 of the host running the script)

strTopics=$(./kafka-topics.sh --list --bootstrap-server $(hostname):9092)

SAVEIFS=$IFS   
IFS=$'\n'      
arrayTopics=($strTopics)
IFS=$SAVEIFS   

for i in "${arrayTopics[@]}"; do iMessages=`./kafka-console-consumer.sh --bootstrap-server $(hostname):9092 --topic $i --property print.timestamp=true --from-beginning --timeout-ms=10000 2>&1 | grep "Processed a total of"`;         echo "$i     $iMessages"; done

Kafka Manager SSL Issue

We renewed the certificate on our Kafka Manager (now called CMAK, but we haven’t upgraded yet so it’s still ‘manager’), but the site wouldn’t come up. It did, however, dump a bunch of java ick into the log file

Jan 16 14:01:52 kafkamanager kafka-manager: [^[[31merror^[[0m] p.c.s.NettyServer$PlayPipelineFactory - cannot load SSL context
Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.reflect.InvocationTargetException: null
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider(NettyServer.scala:111) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.getPipeline(NettyServer.scala:90) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.lang.Exception: Error loading HTTPS keystore from /path/to/kafkamgr.example.net.jks
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.createSSLContext(DefaultSSLEngineProvider.scala:47) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.<init>(DefaultSSLEngineProvider.scala:21) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.security.UnrecoverableKeyException: Cannot recover key
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyProtector.recover(KeyProtector.java:315) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:141) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore$JKS.engineGetKey(JavaKeyStore.java:56) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:96) ~[na:1.8.0_251]

Elsewhere in the log file, we got output that looks like not-decrypted stuff …

Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.IllegalArgumentException: invalid version format:  ̄G^H▒~A�▒~Zᆴ▒~@▒~A:U▒~HP▒~W5▒~W▒D¬ᄡ^K/↓▒
￧^S▒L
Jan 16 14:01:52 kafkamananger kafka-manager: "^S^A^S^C^S^B▒~@+▒~@/▒~Lᄅ▒~Lᄄ▒~@,▒~@0▒~@

Which led me to hypothesize that either the keystore password wasn’t right (it was, I could use keytool to view the jks file) or the key password wasn’t right. It wasn’t — there isn’t actually a way to configure the key password in Kafka Manager, just a parameter to configure the keystore password. You’ve got to re-use that password for the key password.

To change the key password in a JKS file, use keytool, enter the keystore and key password when prompted, then enter the new key password when prompted.

keytool --keypasswd -alias kafkamanager.example.net -keystore ljr.jks

Voila — once both the key and keystore matched the password configured in play.server.https.keyStore.password … the Kafka Manager service started up and worked properly.

 

Automatically Adding “Extra” JMX Ports to Firewalld

A few months ago, I had dug into a mystery at work — even though JMX was configured to use port 9999, port 9999 was open from the client to the server, and the client was configured to use port 9999 … our Kafka manager tool could only report statistics from the local Kafka server. It failed to retrieve data for the remote ones — saying it was unable to connect. Long story short, JMX uses “it’s” port and two other randomly selected (and not readily configurable) ports. To automate getting JMX working when Kafka is restarted, I built this shell script. It identifies which ports are in use by Java, and it transiently adds them to the firewall rules (since the ports change on each service start, transient firewall rules made sense here). My plan is to link the script to the Kafka unit file as an ExecStartPost directive.

# Get ports linked to java
mapfile -t array_of_ports < <( ss -6 -l -t -p -n | grep java | cut -d ":" -f 4 | grep -v "10.166" | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

mapfile -t array_of_ports < <( ss -4 -l -t -p -n | grep java | cut -d ":" -f 2 | cut -d " " -f 1 | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

Kafka Producer – Sending a Message with a Key

I needed to test sending messages into a topic where the cleanup policy is compact (keep the most recent data for each key) … which means I needed a quick way to send a message with a key to Kafka. Fortunately, the kafka-consume-producer script supports key parsing. You just need to include a few –property parameters when running the script.

./kafka-console-producer.sh --bootstrap-server kafka.example.com:9092 --topic ljrtesting --property "parse.key=true" --property "key.separator=:"

When you send messages, they are in the format Key<deliminator>Message — so “LJRKey:1” will send a message with the key of “LJRKey” and the message content of “1”.

Kafka: Reset __consumer_offsets Topic

I set up a few topics for a new process, but the client was continually unable to subscribe getting the following messages:

[2023-01-06 12:54:58,965] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Group coordinator kafkahost.example.com:9092 (id: 2147483415 rack: null) is unavailable or invalid due to cause: error response NOT_COORDINATOR.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:904)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] JoinGroup failed: This is not the correct coordinator. Marking coordinator unknown. Sent generation was Generation{generationId=-1, memberId='', protocol='null'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:623)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:472)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

The output should contain your topic and an offset:

kafkasandbox:bin # ./kafka-console-consumer.sh  --bootstrap-server kafkaserver.example.com:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets
[APP_RUN,appa_DB_error,5]::OffsetAndMetadata(offset=912219, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016876, expireTimestamp=None)
[APP_RUN,appb_DB_error,5]::OffsetAndMetadata(offset=424539, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appb_DB_error,3]::OffsetAndMetadata(offset=359340, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appc_DB_error,2]::OffsetAndMetadata(offset=986361, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378017005, expireTimestamp=None)

I recreated the topics with the replication factor equal to the number of servers. I restarted all of the Kafka and ZooKeeper services. We tried from other clients. We tried searching the internet for some pointer as to what has gone awry. The only thing I could find is that the __consumer_offsets topic wasn’t right … you should be able to read the offset data:

/kafka-console-consumer.sh --bootstrap-server kafkahost.example.net:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets

Now, resetting the entire consumer offsets partition is a massively bad idea in a production environment. We’ve had to advance offsets before, but never had to reset the entire partition.

The documentation I found online used a ZooKeeper command that no longer exists — ‘rmr’ — but deleteall (basically a subtree deletion) did the trick.

# Reset consumer offsets
# Note -- this is a *really* bad idea in production as it can lead to missed messages. You should record the current offsets, clear the topic, then set the offset for the client groups
systemctl stop kafka # on all servers
mv /kafka/ws_npm_kafka/data-kafka/__consumer_offsets-* /tmp/
./zookeeper-shell.sh localhost:2181
deleteall /brokers/topics/__consumer_offsets
# Once this has been done on all servers, restart kafka
systemctl start kafka

Kafka: Finding the Controller in Zookeeper

When restarting all of the Kafka servers (e.g. a periodic patch and reboot), it is better if you avoid rolling the controller between the nodes on every reboot. To accomplish this goal, find out which server is acting as the controller and restart it last — you’ll have the controller move one time using this method, but only once no matter how many servers are in your deployment.

# Connect to zookeeper
./zookeeper-shell.sh localhost:2181
# Find controller by opening the zookeeper shell and querying for controller
get /controller
{"version":1,"brokerid":250,"timestamp":"1676694139851"}

# Get details on broker ID reported as controller
get /brokers/ids/250
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","SASL_PLAINTEXT":"SASL_PLAINTEXT"},"endpoints":["PLAINTEXT://kafkahost.example.net:9093","SASL_PLAINTEXT://kafkahost.example.net:9092"],"jmx_port":9999,"host":"kafkahost.example.net","timestamp":"1676348503278","port":9093,"version":4}

Kafka – Messages Not Appearing in Topic

I created a few new Kafka topics for a project today — but, in testing, messages sent to the topic weren’t there. I normally echo some string into “kafka-console-producer.sh” to test messages. Evidently, STDERR wasn’t getting rendered back to my screen this way. I ran the producer script to get the “>” prompt and tried again — voila, a useful error:

[2022-10-31 15:36:23,471] ERROR Error when sending message to topic MyTopic with key: null, value: 4 bytes with error: (org.apache.kafka.clients.pro.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: Compacted topic cannot accept message without key in topic partition MyTopic-0.

Ohhh — that makes sense! They’ve got an existing process on a different Kafka server, and I just mirrored the configuration without researching what the configuration meant. They use “compact” as their cleanup policy — so messages don’t really age out of the topic. They age out when a newer message with that key gets posted. It’s a neat algorithm that I remember encountering when I first started reading the Kafka documentation … but it’s not something I had a reason to use. The other data we have transiting our Kafka cluster is time-series data where we want all of the info for trending. Having just the most recent, say, CPU utilization on my server isn’t terribly useful. But it makes sense — if I instruct the topic to clean up old data but retain the most recent message for each key … I need to be giving it a key!

Adding a parameter to parse the string into a key/value pair and provide the separator led to data being published to the clients:

echo “test:EchoTest” | /kafka/bin/kafka-console-producer.sh –bootstrap-server $(hostname):9092 –topic MyTopic –property “parse.key=true” –property “key.separator=:”