Tag: kafka

Kafka Manager SSL Issue

We renewed the certificate on our Kafka Manager (now called CMAK, but we haven’t upgraded yet so it’s still ‘manager’), but the site wouldn’t come up. It did, however, dump a bunch of java ick into the log file

Jan 16 14:01:52 kafkamanager kafka-manager: [^[[31merror^[[0m] p.c.s.NettyServer$PlayPipelineFactory - cannot load SSL context
Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.reflect.InvocationTargetException: null
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider(NettyServer.scala:111) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.getPipeline(NettyServer.scala:90) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.lang.Exception: Error loading HTTPS keystore from /path/to/kafkamgr.example.net.jks
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.createSSLContext(DefaultSSLEngineProvider.scala:47) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.<init>(DefaultSSLEngineProvider.scala:21) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.security.UnrecoverableKeyException: Cannot recover key
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyProtector.recover(KeyProtector.java:315) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:141) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore$JKS.engineGetKey(JavaKeyStore.java:56) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:96) ~[na:1.8.0_251]

Elsewhere in the log file, we got output that looks like not-decrypted stuff …

Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.IllegalArgumentException: invalid version format:  ̄G^H▒~A�▒~Zᆴ▒~@▒~A:U▒~HP▒~W5▒~W▒D¬ᄡ^K/↓▒
￧^S▒L
Jan 16 14:01:52 kafkamananger kafka-manager: "^S^A^S^C^S^B▒~@+▒~@/▒~Lᄅ▒~Lᄄ▒~@,▒~@0▒~@

Which led me to hypothesize that either the keystore password wasn’t right (it was, I could use keytool to view the jks file) or the key password wasn’t right. It wasn’t — there isn’t actually a way to configure the key password in Kafka Manager, just a parameter to configure the keystore password. You’ve got to re-use that password for the key password.

To change the key password in a JKS file, use keytool, enter the keystore and key password when prompted, then enter the new key password when prompted.

keytool --keypasswd -alias kafkamanager.example.net -keystore ljr.jks

Voila — once both the key and keystore matched the password configured in play.server.https.keyStore.password … the Kafka Manager service started up and worked properly.

 

Automatically Adding “Extra” JMX Ports to Firewalld

A few months ago, I had dug into a mystery at work — even though JMX was configured to use port 9999, port 9999 was open from the client to the server, and the client was configured to use port 9999 … our Kafka manager tool could only report statistics from the local Kafka server. It failed to retrieve data for the remote ones — saying it was unable to connect. Long story short, JMX uses “it’s” port and two other randomly selected (and not readily configurable) ports. To automate getting JMX working when Kafka is restarted, I built this shell script. It identifies which ports are in use by Java, and it transiently adds them to the firewall rules (since the ports change on each service start, transient firewall rules made sense here). My plan is to link the script to the Kafka unit file as an ExecStartPost directive.

# Get ports linked to java
mapfile -t array_of_ports < <( ss -6 -l -t -p -n | grep java | cut -d ":" -f 4 | grep -v "10.166" | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

mapfile -t array_of_ports < <( ss -4 -l -t -p -n | grep java | cut -d ":" -f 2 | cut -d " " -f 1 | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

Kafka Producer – Sending a Message with a Key

I needed to test sending messages into a topic where the cleanup policy is compact (keep the most recent data for each key) … which means I needed a quick way to send a message with a key to Kafka. Fortunately, the kafka-consume-producer script supports key parsing. You just need to include a few –property parameters when running the script.

./kafka-console-producer.sh --bootstrap-server kafka.example.com:9092 --topic ljrtesting --property "parse.key=true" --property "key.separator=:"

When you send messages, they are in the format Key<deliminator>Message — so “LJRKey:1” will send a message with the key of “LJRKey” and the message content of “1”.

Kafka: Reset __consumer_offsets Topic

I set up a few topics for a new process, but the client was continually unable to subscribe getting the following messages:

[2023-01-06 12:54:58,965] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Group coordinator kafkahost.example.com:9092 (id: 2147483415 rack: null) is unavailable or invalid due to cause: error response NOT_COORDINATOR.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:904)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] JoinGroup failed: This is not the correct coordinator. Marking coordinator unknown. Sent generation was Generation{generationId=-1, memberId='', protocol='null'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:623)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:472)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

The output should contain your topic and an offset:

kafkasandbox:bin # ./kafka-console-consumer.sh  --bootstrap-server kafkaserver.example.com:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets
[APP_RUN,appa_DB_error,5]::OffsetAndMetadata(offset=912219, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016876, expireTimestamp=None)
[APP_RUN,appb_DB_error,5]::OffsetAndMetadata(offset=424539, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appb_DB_error,3]::OffsetAndMetadata(offset=359340, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appc_DB_error,2]::OffsetAndMetadata(offset=986361, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378017005, expireTimestamp=None)

I recreated the topics with the replication factor equal to the number of servers. I restarted all of the Kafka and ZooKeeper services. We tried from other clients. We tried searching the internet for some pointer as to what has gone awry. The only thing I could find is that the __consumer_offsets topic wasn’t right … you should be able to read the offset data:

/kafka-console-consumer.sh --bootstrap-server kafkahost.example.net:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets

Now, resetting the entire consumer offsets partition is a massively bad idea in a production environment. We’ve had to advance offsets before, but never had to reset the entire partition.

The documentation I found online used a ZooKeeper command that no longer exists — ‘rmr’ — but deleteall (basically a subtree deletion) did the trick.

# Reset consumer offsets
# Note -- this is a *really* bad idea in production as it can lead to missed messages. You should record the current offsets, clear the topic, then set the offset for the client groups
systemctl stop kafka # on all servers
mv /kafka/ws_npm_kafka/data-kafka/__consumer_offsets-* /tmp/
./zookeeper-shell.sh localhost:2181
deleteall /brokers/topics/__consumer_offsets
# Once this has been done on all servers, restart kafka
systemctl start kafka

Kafka: Finding the Controller in Zookeeper

When restarting all of the Kafka servers (e.g. a periodic patch and reboot), it is better if you avoid rolling the controller between the nodes on every reboot. To accomplish this goal, find out which server is acting as the controller and restart it last — you’ll have the controller move one time using this method, but only once no matter how many servers are in your deployment.

# Find controller by opening the zookeeper shell and querying for controller
./zookeeper-shell.sh localhost:2181
get /controller

Kafka – Messages Not Appearing in Topic

I created a few new Kafka topics for a project today — but, in testing, messages sent to the topic weren’t there. I normally echo some string into “kafka-console-producer.sh” to test messages. Evidently, STDERR wasn’t getting rendered back to my screen this way. I ran the producer script to get the “>” prompt and tried again — voila, a useful error:

[2022-10-31 15:36:23,471] ERROR Error when sending message to topic MyTopic with key: null, value: 4 bytes with error: (org.apache.kafka.clients.pro.internals.ErrorLoggingCallback)
org.apache.kafka.common.InvalidRecordException: Compacted topic cannot accept message without key in topic partition MyTopic-0.

Ohhh — that makes sense! They’ve got an existing process on a different Kafka server, and I just mirrored the configuration without researching what the configuration meant. They use “compact” as their cleanup policy — so messages don’t really age out of the topic. They age out when a newer message with that key gets posted. It’s a neat algorithm that I remember encountering when I first started reading the Kafka documentation … but it’s not something I had a reason to use. The other data we have transiting our Kafka cluster is time-series data where we want all of the info for trending. Having just the most recent, say, CPU utilization on my server isn’t terribly useful. But it makes sense — if I instruct the topic to clean up old data but retain the most recent message for each key … I need to be giving it a key!

Adding a parameter to parse the string into a key/value pair and provide the separator led to data being published to the clients:

echo “test:EchoTest” | /kafka/bin/kafka-console-producer.sh –bootstrap-server $(hostname):9092 –topic MyTopic –property “parse.key=true” –property “key.separator=:”

Unable to Use JMX Remotely for Kafka Stats

I noticed, today, that our Kafka Manager interface only shows details from one server — the one where we run Kafka Manager. We’ve done everything that we need to do in order to get this working — the port shows as open with nmap, the command to run Kafka includes all of the settings. I’ve even tried setting the JMX hostname, but still there is just one server reporting data

Then I happened across an article online that detailed how JMX actually uses three ports — the configured port 9999 and two other randomly selected and non-configurable ports. I used netstat to list all of the ports in use by the Java PID running my Kafka server and, voila, there were two odd-ball high ports (30000’s and 40000’s). I added those additional ports to the firewall rules and … I’ve got data for all of the Kafka servers!

This is obviously a short-term solution as the two randomly selected ports will be different when I restart the service next time. I’d prefer to leave the firewall in place (i.e. not just open all ports >1024 between the Kafka Manager host and all of the Kafka servers) so might put together a script to identify the “oddball” ports associated to the Java pid and add them to transient firewalld rules. But the last server restart was back in 2021 … so I might just manually add them after the upgrade next week and worry about something ‘better’ next year!

Upgrading Kafka from 2.5.0 to 3.2.3

Bidirectional backwards compatibility was introduced in 2017 – which means my experience where you needed to upgrade the broker first and then the clients is no longer true. Rejoice!

Sandbox Setup

Two CentOS docker containers were provisioned as follows:

docker run -dit --name=kafka1 -p 9092:9092 centos:latest
docker run -dit --name=kafka2 -p 9093:9092 -p9000:9000 centos:latest

# Shell into each container and do the following:

sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-*
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-*

# Get Ips and hosts into /etc/hosts

172.17.0.2 40c2222cfea0
172.17.0.3 2923addbcb6d

# Update installed packages & install required tools

dnf update
yum install -y passwd vim net-tools wget git unzip
# Add a kafka user, make a kafka folder, and give the kafka user ownership of the kafka folder
useradd kafka
passwd kafka
usermod -aG wheel kafka

mkdir /kafka

chown kafka:kafka /kafka

# Install Kafka

su – kafka
cd /kafka
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar vxzf kafka_2.12-2.5.0.tgz
rm kafka_2.12-2.5.0.tgz
ln -s /kafka/kafka_2.12-2.5.0 /kafka/kafka

# Configure zookeeper

vi /kafka/kafka/config/zookeeper.properties
dataDir=/kafka/zookeeperdata
server.1=172.17.0.2:2888:3888

# Start Zookeeper on the first server

screen -S zookeeper
/kafka/kafka/bin/zookeeper-server-start.sh /kafka/kafka/config/zookeeper.properties

# Configure the cluster

vi /kafka/kafka/config/server.properties

broker.id=1 # unique number per cluster node
listeners=PLAINTEXT://:9092
zookeeper.connect=172.17.0.2:2181

# Start Kafka

screen -S kafka
/kafka/kafka/bin/kafka-server-start.sh /kafka/kafka/config/server.properties

# Edit producer.properties on a server

vi /kafka/kafka/config/producer.properties
bootstrap.servers=172.17.0.2:9092,172.17.0.3:9092

# Create test topic

/kafka/kafka/bin/kafka-topics.sh --create --zookeeper 172.17.0.2:2181 --replication-factor 2 --partitions 1 --topic ljrTest

# Post messages to the topic

/kafka/kafka/bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092 --producer.config /kafka/kafka/config/producer.properties --topic ljrTest

# Retrieve messages from topic

/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092 --topic ljrTest --from-beginning
/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.3:9092 --topic ljrTest --from-beginning

Voila, a functional Kafka sandbox cluster.

Now we’ll install the cluster manager

cd /kafka
git clone --depth 1 --branch 3.0.0.6 https://github.com/yahoo/CMAK.git
cd CMAK
vi conf/application.conf
cmak.zkhosts="40c2222cfea0:2181"

# CMAK requires java > 1.8 … so getting 11 set up
cd /usr/lib/jvm
wget https://cdn.azul.com/zulu/bin/zulu11.58.23-ca-jdk11.0.16.1-linux_x64.zip
unzip zulu11.58.23-ca-jdk11.0.16.1-linux_x64.zip
mv zulu11.58.23-ca-jdk11.0.16.1-linux_x64 zulu-11
PATH=/usr/lib/jvm/zulu-11/bin:$PATH

./sbt -java-home /usr/lib/jvm/zulu-11 clean dist

cp /kafka/CMAK/target/universal/cmak-3.0.0.6.zip /kafka

cd /kafka
unzip cmak-3.0.0.6.zip
cd cmak-3.0.0.6
screen -S CMAK
bin/cmak -java-home /usr/lib/jvm/zulu-11 -Dconfig.file=/kafka/cmak-3.0.0.6/conf/application.conf -Dhttp.port=9000

Access it at http://cmak_host:9000

Sandbox Upgrade Process

# Back up the Kafka installation (excluding log files)

tar cvfzp /kafka/kafka-2.5.0.tar.gz --exclude logs /kafka/ws_npm_kafka/kafka_2.12-2.5.0

# Get newest Kafka version installed
# From another host where you can download the file, transfer it to the kafka server

scp kafka_2.12-3.2.3.tgz list@kafka1:/tmp/

# Back on the Kafka server — copy the tgz file into the Kafka directory

mv /tmp/kafka_2.12-3.2.3.tgz /kafka/kafka

# Verify Kafka data is stored outside of the install directory:

[kafka@40c2222cfea0 config]$ grep log.dir server.properties
log.dirs=/tmp/kafka-logs

# Verify zookeeper data is stored outside of the install directory:

[kafka@40c2222cfea0 config]$ grep dataDir zookeeper.properties
dataDir=/kafka/zookeeperdata

# Get the new version of Kafka – start with the zookeeper(s) then do the other nodes

cd /kafka
wget https://downloads.apache.org/kafka/3.2.3/kafka_2.12-3.2.3.tgz
tar vxfz /kafka/kafka_2.12-3.2.3.tgz

# Copy config from old iteration to new

cp /kafka/kafka_2.12-2.5.0/config/* /kafka/kafka_2.12-3.2.3/config/

# Edit server.properties and add a configuration line to force the inter-broker protocol version to the currently running Kafka version
# This ensures your cluster is using the “old” version to communicate and you can, if needed, revert to the previous version

vi /kafka/kafka/config/server.properties
inter.broker.protocol.version=2.5.0

# Restart each Kafka server – waiting until it has come online before restarting the next one – with the new binaries
# Stop kafka

systemctl stop kafka

# Move symlink to new folder

unlink /kafka/kafka
ln -s /kafka/kafka_2.12-3.2.3 /kafka/kafka

# start kafka

systemctl start kafka

# Or, to watch it run,

/kafka/kafka/bin/kafka-server-start.sh /kafka/kafka/config/server.properties

# Finally, ensure you’ve still got ‘stuff’

/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.3:9092 --topic ljrTest --from-beginning

# And verify the version has updated

[kafka@40c2222cfea0 bin]$ ./kafka-topics.sh --version
3.2.3 (Commit:50029d3ed8ba576f)

# Until this point, we can just roll back to the old folder & revert to the previous version of Kafka … that’s out backout plan.

# Once everything has been confirmed to be working, bump the inter-broker protocol version to the new version & restart Kafka

vi /kafka/kafka/config/server.properties
inter.broker.protocol.version=3.2

Kafka Troubleshooting (for those who enjoy reading network traces)

I finally had a revelation that allowed me to definitively prove that I am not doing anything strange that is causing duplicated messages to appear in the Kafka stream — it’s a clear text protocol! That means you can use Wireshark, tcpdump, etc to capture everything that goes over the wire. This shows that the GUID I generated for the duplicated message only appears one time in the network trace. Whatever funky stuff is going on that makes the client see it twice? Not me 😊

I used tcpdump because the batch server doesn’t have tshark (and it’s not my server, so I’m not going to go requesting additional binaries if there’s something sufficient for my need already available). Ran tcpdump -w /srv/data/ljr.cap port 9092 to grab everything that transits port 9092 while my script executed. Once the batch completed, I stopped tcpdump and transferred the file over to my workstation to view the capture in Wireshark. Searched the packet bytes for my duplicated GUID … and there’s only one.

Confluent Kafka Queue Length

The documentation for the Python Confluent Kafka module includes a len function on the producer. I wanted to use the function because we’re getting a number of duplicated messages on the client, and I was trying to isolate what might be causing the problem. Unfortunately, calling producer.len() failed indicating there’s no len() method. I used dir(producer) to show that, no, there isn’t a len() method.

I realized today that the documentation is telling me that I can call the built-in len() function on a producer to get the queue length.

Code:

print(f"Before produce there are {len(producer)} messages awaiting delivery")
producer.produce(topic, key=bytes(str(int(cs.timestamp) ), 'utf8'), value=cs.SerializeToString() )
print(f"After produce there are {len(producer)} messages awaiting delivery")
producer.poll(0) # Per https://github.com/confluentinc/confluent-kafka-python/issues/16 for queue full error
print(f"After poll0 there are {len(producer)} messages awaiting delivery")

Output:

Before produce there are 160 messages awaiting delivery
After produce there are 161 messages awaiting delivery
After poll0 there are 155 messages awaiting delivery