Category: Technology

K8s 1.24.12 Upgrade

Trying to upgrade our dev Kubernetes environment to 1.24.12 … and we encountered what seems to be a fairly common error — unknown service runtime.v1alpha2.RuntimeService

kubeserver:~ # kubeadm init
I0323 13:53:26.492921   55320 version.go:256] remote version is much newer: v1.26.3; falling back to: stable-1.24
[init] Using Kubernetes version: v1.24.12
[preflight] Running pre-flight checks
        [WARNING Firewalld]: firewalld is active, please ensure ports [6443 10250] are open or your cluster may not function correctly
error execution phase preflight: [preflight] Some fatal errors occurred:
        [ERROR FileAvailable--etc-kubernetes-manifests-kube-apiserver.yaml]: /etc/kubernetes/manifests/kube-apiserver.yaml already exists
        [ERROR FileAvailable--etc-kubernetes-manifests-kube-controller-manager.yaml]: /etc/kubernetes/manifests/kube-controller-manager.yaml already exists
        [ERROR FileAvailable--etc-kubernetes-manifests-kube-scheduler.yaml]: /etc/kubernetes/manifests/kube-scheduler.yaml already exists
        [ERROR FileAvailable--etc-kubernetes-manifests-etcd.yaml]: /etc/kubernetes/manifests/etcd.yaml already exists
        [ERROR CRI]: container runtime is not running: output: E0323 13:53:26.741684   55340 remote_runtime.go:948] "Status from runtime service failed" err="rpc error: code = Unimplemented desc = unknown service runtime.v1alpha2.RuntimeService"
time="2023-03-23T13:53:26-05:00" level=fatal msg="getting status of runtime: rpc error: code = Unimplemented desc = unknown service runtime.v1alpha2.RuntimeService"
, error: exit status 1
        [ERROR DirAvailable--var-lib-etcd]: /var/lib/etcd is not empty
[preflight] If you know what you are doing, you can make a check non-fatal with `--ignore-preflight-errors=...`
To see the stack trace of this error execute with --v=5 or higher

We found a lot of people online with the same issue who (1) removed the config.toml and tried again, (2) changed the SystemdCGroup setting in the config, or uninstalled and reinstalled some/all of the components until it worked. Unfortunately, removing or modifying the config didn’t help. And removing and reinstalling everything wasn’t particularly appealing. However, we noticed that the same error was reported directly from containerd:

kubeserver:~ # crictl ps
E0323 13:53:07.061777   55228 remote_runtime.go:557] "ListContainers with filter from runtime service failed" err="rpc error: code = Unimplemented desc = unknown service runtime.v1alpha2.RuntimeService" filter="&ContainerFilter{Id:,State:&ContainerStateValue{State:CONTAINER_RUNNING,},PodSandboxId:,LabelSelector:map[string]string{},}"
FATA[0000] listing containers: rpc error: code = Unimplemented desc = unknown service runtime.v1alpha2.RuntimeService

Looking at the plugins, there were some in an error state

kubeserver:~ # ctr plugins ls
TYPE                                  ID                       PLATFORMS      STATUS
io.containerd.content.v1              content                  -              ok
io.containerd.snapshotter.v1          aufs                     linux/amd64    skip
io.containerd.snapshotter.v1          btrfs                    linux/amd64    skip
io.containerd.snapshotter.v1          devmapper                linux/amd64    error
io.containerd.snapshotter.v1          native                   linux/amd64    ok
io.containerd.snapshotter.v1          overlayfs                linux/amd64    error
io.containerd.snapshotter.v1          zfs                      linux/amd64    skip

So … it seemed reasonable to look for errors in the messages log from containerd. And, yeah, we had all sorts of errors. Including a rather scary one about reformatting the file system!

Mar 23 13:24:51 kubeserver containerd: time="2023-03-23T13:24:51.726984260-05:00" level=warning msg="failed to load plugin io.containerd.snapshotter.v1.overlayfs" error="/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs does not support d_type. If the backing filesystem is xfs, please reformat with ftype=1 to enable d_type support"

That would do it — we have a dedicated partition for the k8s stuff … and that volume is formatted the right way — xfs_info confirmed ftype=1

kubeserver:~ # xfs_info /kubernetes/
meta-data=/dev/mapper/kubernetes-kubernetes isize=512    agcount=4, agsize=131071744 blks
         =                       sectsz=512   attr=2, projid32bit=1
         =                       crc=1        finobt=0 spinodes=0
data     =                       bsize=4096   blocks=524286976, imaxpct=5
         =                       sunit=0      swidth=0 blks
naming   =version 2              bsize=4096   ascii-ci=0 ftype=1
log      =internal               bsize=4096   blocks=255999, version=2
         =                       sectsz=512   sunit=0 blks, lazy-count=1
realtime =none                   extsz=4096   blocks=0, rtextents=0

However containerd doesn’t really know anything about this volume, does it? The default location that containerd wants to use isn’t set up to support d_type. Editing /etc/containerd/config.toml, root now tells containerd to use our special partition for ‘stuff’ …

And we were able to run kubeadm init without error. Everything came up as it should have, and our k8s server was upgraded!

Tableau — Installing after Control Panel Uninstall

When attempting to install Tableau, I got an error telling me that “an older version of tableau is installed but the tsm administrative services are not running” … which, as I didn’t own the server until recently, was quite possibly true. But it was also uninstalled from the control panel, cleaned up on disk, and really not there to start.

I discovered that Tableau registers some environment variables — and the mere presence of these variables will cause the “it’s already installed, so I’m not going any farther” error. Deleting the environment variables allowed me to re-install the current Tableau version without problem.

To remove an environment variable, go to Control Panel > System > Advanced system settings — highlight the variable you want to remove (all of the TABLEAU* ones in this case) and click ‘Delete’.

Artificial Limits on Artificial Intelligence

I cannot say it surprises me that a company that considered “restart it” and “reboot it” to be suitable solutions to a whole host of memory management issues has decided that clearing the history after n interactions with their AI chatbot is the solution to not having it out looking for nuclear codes. From a functional standpoint, I can see where one or two “turns” is probably sufficient for the search engine to produce some salient information or relevant links. But, from a technological standpoint, it seems that an AI that becomes “confused” five or six exchanges into a conversation is vastly problematic; and introducing some artificial limit on the front-end to reset the history (1) eliminates a key feature of learning algorithms by preventing it from “learning” you and (2) is akin to turning your back on a tiger and considering the problem sorted.

Maple Mapping

For the last few years, we’ve talked about mapping out our maple trees — we track which ones we tap, when we tap them, and occasionally try to track how much sap the tree produced. Which is difficult when the tree is labeled as “second down from planter on driveway” or “the next, next one by neighbor”. It seemed like we should be able to use our phones to tag each location — ideally while there are still leaves on the trees so we could denote them as sugar, red, etc.

We settled on an open source app that uses Open StreetMap — https://github.com/osmandapp/OsmAnd/ — there’s no convenient way for Scott and I to simultaneously edit the data set, but we can export the file on one phone and import it onto the other so we are both looking at the same points. Each tree is numbered, and there is a note with the type of tree and how many taps.

Now we know we are at tree #27 (the phone’s location will show up as a blue dot).

Python Script: Checking Certificate Expiry Dates

A long time ago, before the company I work for paid for an external SSL certificate management platform that does nice things like clue you into the fact your cert is expiring tomorrow night, we had an outage due to an expired certificate. One. I put together a perl script that parsed output from the openssl client and proactively alerted us of any pending expiration events.

That script went away quite some time ago — although I still have a copy running at home that ensures our SMTP and web server certs are current. This morning, though, our K8s environment fell over due to expired certificates. Digging into it, the certs are in the management platform (my first guess was self-signed certificates that wouldn’t have been included in the pending expiry notices) but were not delivered to those of us who actually manage the servers. Luckily it was our dev k8s environment and we now know the prod one will be expiring in a week or so. But I figured it was a good impetus to resurrect the old script. Unfortunately, none of the modules I used for date calculation were installed on our script server. Seemed like a hint that I should rewrite the script in Python. So … here is a quick Python script that gets certificates from hosts and calculates how long until the certificate expires. Add on a “if” statement and a notification function, and we shouldn’t come in to failed environments needing certificate renewals.

from cryptography import x509
from cryptography.hazmat.backends import default_backend
import socket
import ssl
from datetime import datetime, timedelta

# Dictionary of hosts:port combinations to check for expiry
dictHostsToCheck = {
"tableau.example.com": 443       # Tableau 
,"kibana.example.com": 5601      # ELK Kibana
,"elkmaster.example.com": 9200   # ELK Master
,"kafka.example.com": 9093       # Kafka server
}
for strHostName in dictHostsToCheck:
    iPort = dictHostsToCheck[strHostName]

    datetimeNow = datetime.utcnow()

    # create default context
    context = ssl.create_default_context()

    # Do not verify cert chain or hostname so we ensure we always check the certificate
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    with socket.create_connection((strHostName, iPort)) as sock:
        with context.wrap_socket(sock, server_hostname=strHostName) as ssock:
            objDERCert = ssock.getpeercert(True)
            objPEMCert = ssl.DER_cert_to_PEM_cert(objDERCert)
            objCertificate = x509.load_pem_x509_certificate(str.encode(objPEMCert),backend=default_backend())

            print(f"{strHostName}\t{iPort}\t{objCertificate.not_valid_after}\t{(objCertificate.not_valid_after - datetimeNow).days} days")


Visualizing GeoIP Information in Kibana

Before we can use map details in Kibana visualizations, we need to add fields with the geographic information. The first few steps are something the ELK admin staff will need to do in order to map source and/or destination IPs to geographic information.

First update the relevant index template to map the location information into geo-point fields – load this JSON (but, first, make sure there aren’t existing mappings otherwise you’ll need to merge the existing JSON in with the new elements for geoip_src and geoip_dst

{
  "_doc": {
    "_meta": {},
    "_source": {},
    "properties": {
      "geoip_dst": {
        "dynamic": true,
        "type": "object",
        "properties": {
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "half_float"
          },
          "location": {
            "type": "geo_point"
          },
          "longitude": {
            "type": "half_float"
          }
        }
      },
      "geoip_src": {
        "dynamic": true,
        "type": "object",
        "properties": {
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "half_float"
          },
          "location": {
            "type": "geo_point"
          },
          "longitude": {
            "type": "half_float"
          }
        }
      }
    }
  }
}

First, click on the index template name to view the settings. Click to the ‘mappings’ tab and copy what is in there

Munge in the two ‘properties’ in the above JSON. Edit the index template

Click to the “Mappings” section and use “Load JSON” to import the new mapping configuration

Paste in your JSON & click to “Load & Overwrite”

Voila – you will have geo-point items in the template.

Next, the logstash pipeline needs to be configured to enrich log records with geoip information. There is a geoip filter available, which uses the MaxMind GeoIP database (this is refreshed automatically; currently, we do not merge in any geoip information for the private network address spaces) . You just need to indicate what field(s) have the IP address and where the location information should be stored. You can have multiple geographic IP fields – in this example, we map both source and destination IP addresses.

        geoip {
                source => "src_ip"
                target => "geoip_src"
                add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
                add_field => [ "[geoip][location]", "%{[geoip][latitude]}"  ]
        }
        geoip {
                source => "dst_ip"
                target => "geoip_dest"
                add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
                add_field => [ "[geoip][location]", "%{[geoip][latitude]}"  ]
        }

E.G.

One logstash is restarted, the documents stored in Kibana will have geoip_src and geoip_dest fields:

Once relevant data is being stored, use the refresh-looking button on the index pattern(s) to refresh the field list from stored data. This will add the geo-point items into the index pattern.

Once GeoIP information is available in the index pattern, select the “Maps” visualization

Leave the road map layer there (otherwise you won’t see the countries!)

Select ‘Documents’ as the data source to link in ElasticSearch data

Select the index pattern that contains your data source (if your index pattern does not appear, then Kibana doesn’t recognize the pattern as containing geographic fields … I’ve had to delete and recreate my index pattern so the geographic fields were properly mapped).

And select the field(s) that contain geographic details:

You can name the layer

And add a tool tip that will include the country code or name

Under “Term joins”, add a new join. Click on “Join –select–” to link a field from the map to a field in your dataset.

In this case, I am joining the two-character country codes —

Normally, you can leave the “and use metric count” in place (the map is color coded by the number of requests coming from each country). If you want to add a filter, you can click the “where — add filter –” link to edit the filter.

In this example, I don’t want to filter the data, so I’ve left that at the default.

Click “Save & close” to save the changes to the map visualization. To view your map, you won’t find it under Visualizations – instead, click “Maps” along the left-hand navigation menu.

Voila – a map where the shading on a country gets darker the more requests have come from the country.

Internal Addresses

If we want to (and if we have information to map IP subnets to City/State/Zip/LatLong, etc), we can edit the database used for GeoIP mappings — https://github.com/maxmind/getting-started-with-mmdb provides a perl module that interacts with the database file. That isn’t currently done, so internal servers where traffic is sourced primarily from private address spaces won’t have particularly thrilling map data.

 

Kafka Manager SSL Issue

We renewed the certificate on our Kafka Manager (now called CMAK, but we haven’t upgraded yet so it’s still ‘manager’), but the site wouldn’t come up. It did, however, dump a bunch of java ick into the log file

Jan 16 14:01:52 kafkamanager kafka-manager: [^[[31merror^[[0m] p.c.s.NettyServer$PlayPipelineFactory - cannot load SSL context
Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.reflect.InvocationTargetException: null
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider(NettyServer.scala:111) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.getPipeline(NettyServer.scala:90) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.lang.Exception: Error loading HTTPS keystore from /path/to/kafkamgr.example.net.jks
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.createSSLContext(DefaultSSLEngineProvider.scala:47) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.DefaultSSLEngineProvider.<init>(DefaultSSLEngineProvider.scala:21) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createScalaSSLEngineProvider(ServerSSLEngine.scala:96) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.ssl.ServerSSLEngine$.createSSLEngineProvider(ServerSSLEngine.scala:32) ~[com.typesafe.play.play-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.liftedTree1$1(NettyServer.scala:113) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: at play.core.server.NettyServer$PlayPipelineFactory.sslEngineProvider$lzycompute(NettyServer.scala:112) [com.typesafe.play.play-netty-server_2.11-2.4.6.jar:2.4.6]
Jan 16 14:01:52 kafkamanager kafka-manager: Caused by: java.security.UnrecoverableKeyException: Cannot recover key
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyProtector.recover(KeyProtector.java:315) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:141) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.JavaKeyStore$JKS.engineGetKey(JavaKeyStore.java:56) ~[na:1.8.0_251]
Jan 16 14:01:52 kafkamanager kafka-manager: at sun.security.provider.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:96) ~[na:1.8.0_251]

Elsewhere in the log file, we got output that looks like not-decrypted stuff …

Jan 16 14:01:52 kafkamanager kafka-manager: java.lang.IllegalArgumentException: invalid version format:  ̄G^H▒~A�▒~Zᆴ▒~@▒~A:U▒~HP▒~W5▒~W▒D¬ᄡ^K/↓▒
￧^S▒L
Jan 16 14:01:52 kafkamananger kafka-manager: "^S^A^S^C^S^B▒~@+▒~@/▒~Lᄅ▒~Lᄄ▒~@,▒~@0▒~@

Which led me to hypothesize that either the keystore password wasn’t right (it was, I could use keytool to view the jks file) or the key password wasn’t right. It wasn’t — there isn’t actually a way to configure the key password in Kafka Manager, just a parameter to configure the keystore password. You’ve got to re-use that password for the key password.

To change the key password in a JKS file, use keytool, enter the keystore and key password when prompted, then enter the new key password when prompted.

keytool --keypasswd -alias kafkamanager.example.net -keystore ljr.jks

Voila — once both the key and keystore matched the password configured in play.server.https.keyStore.password … the Kafka Manager service started up and worked properly.

 

Automatically Adding “Extra” JMX Ports to Firewalld

A few months ago, I had dug into a mystery at work — even though JMX was configured to use port 9999, port 9999 was open from the client to the server, and the client was configured to use port 9999 … our Kafka manager tool could only report statistics from the local Kafka server. It failed to retrieve data for the remote ones — saying it was unable to connect. Long story short, JMX uses “it’s” port and two other randomly selected (and not readily configurable) ports. To automate getting JMX working when Kafka is restarted, I built this shell script. It identifies which ports are in use by Java, and it transiently adds them to the firewall rules (since the ports change on each service start, transient firewall rules made sense here). My plan is to link the script to the Kafka unit file as an ExecStartPost directive.

# Get ports linked to java
mapfile -t array_of_ports < <( ss -6 -l -t -p -n | grep java | cut -d ":" -f 4 | grep -v "10.166" | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

mapfile -t array_of_ports < <( ss -4 -l -t -p -n | grep java | cut -d ":" -f 2 | cut -d " " -f 1 | sed -e 's/\ *$//g')

declare -p array_of_ports

for i in "${array_of_ports[@]}"
do
   : 
   if (( i > 20000 )); then
      echo "/bin/firewall-cmd --zone=public --add-port=$i/tcp"
      output=`/bin/firewall-cmd --zone=public --add-port=$i/tcp`
      echo $output
   fi
done

Kafka Producer – Sending a Message with a Key

I needed to test sending messages into a topic where the cleanup policy is compact (keep the most recent data for each key) … which means I needed a quick way to send a message with a key to Kafka. Fortunately, the kafka-consume-producer script supports key parsing. You just need to include a few –property parameters when running the script.

./kafka-console-producer.sh --bootstrap-server kafka.example.com:9092 --topic ljrtesting --property "parse.key=true" --property "key.separator=:"

When you send messages, they are in the format Key<deliminator>Message — so “LJRKey:1” will send a message with the key of “LJRKey” and the message content of “1”.

Kafka: Reset __consumer_offsets Topic

I set up a few topics for a new process, but the client was continually unable to subscribe getting the following messages:

[2023-01-06 12:54:58,965] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Group coordinator kafkahost.example.com:9092 (id: 2147483415 rack: null) is unavailable or invalid due to cause: error response NOT_COORDINATOR.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:904)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] JoinGroup failed: This is not the correct coordinator. Marking coordinator unknown. Sent generation was Generation{generationId=-1, memberId='', protocol='null'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:623)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:472)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

The output should contain your topic and an offset:

kafkasandbox:bin # ./kafka-console-consumer.sh  --bootstrap-server kafkaserver.example.com:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets
[APP_RUN,appa_DB_error,5]::OffsetAndMetadata(offset=912219, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016876, expireTimestamp=None)
[APP_RUN,appb_DB_error,5]::OffsetAndMetadata(offset=424539, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appb_DB_error,3]::OffsetAndMetadata(offset=359340, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appc_DB_error,2]::OffsetAndMetadata(offset=986361, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378017005, expireTimestamp=None)

I recreated the topics with the replication factor equal to the number of servers. I restarted all of the Kafka and ZooKeeper services. We tried from other clients. We tried searching the internet for some pointer as to what has gone awry. The only thing I could find is that the __consumer_offsets topic wasn’t right … you should be able to read the offset data:

/kafka-console-consumer.sh --bootstrap-server kafkahost.example.net:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets

Now, resetting the entire consumer offsets partition is a massively bad idea in a production environment. We’ve had to advance offsets before, but never had to reset the entire partition.

The documentation I found online used a ZooKeeper command that no longer exists — ‘rmr’ — but deleteall (basically a subtree deletion) did the trick.

# Reset consumer offsets
# Note -- this is a *really* bad idea in production as it can lead to missed messages. You should record the current offsets, clear the topic, then set the offset for the client groups
systemctl stop kafka # on all servers
mv /kafka/ws_npm_kafka/data-kafka/__consumer_offsets-* /tmp/
./zookeeper-shell.sh localhost:2181
deleteall /brokers/topics/__consumer_offsets
# Once this has been done on all servers, restart kafka
systemctl start kafka