Month: January 2023

Kafka Producer – Sending a Message with a Key

I needed to test sending messages into a topic where the cleanup policy is compact (keep the most recent data for each key) … which means I needed a quick way to send a message with a key to Kafka. Fortunately, the kafka-consume-producer script supports key parsing. You just need to include a few –property parameters when running the script.

./kafka-console-producer.sh --bootstrap-server kafka.example.com:9092 --topic ljrtesting --property "parse.key=true" --property "key.separator=:"

When you send messages, they are in the format Key<deliminator>Message — so “LJRKey:1” will send a message with the key of “LJRKey” and the message content of “1”.

Kafka: Reset __consumer_offsets Topic

I set up a few topics for a new process, but the client was continually unable to subscribe getting the following messages:

[2023-01-06 12:54:58,965] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Group coordinator kafkahost.example.com:9092 (id: 2147483415 rack: null) is unavailable or invalid due to cause: error response NOT_COORDINATOR.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:904)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] JoinGroup failed: This is not the correct coordinator. Marking coordinator unknown. Sent generation was Generation{generationId=-1, memberId='', protocol='null'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:623)
[2023-01-06 12:54:58,966] INFO [Worker clientId=connect-1, groupId=connect-cluster-group-dev] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:472)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

The output should contain your topic and an offset:

kafkasandbox:bin # ./kafka-console-consumer.sh  --bootstrap-server kafkaserver.example.com:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets
[APP_RUN,appa_DB_error,5]::OffsetAndMetadata(offset=912219, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016876, expireTimestamp=None)
[APP_RUN,appb_DB_error,5]::OffsetAndMetadata(offset=424539, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appb_DB_error,3]::OffsetAndMetadata(offset=359340, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378016938, expireTimestamp=None)
[APP_RUN,appc_DB_error,2]::OffsetAndMetadata(offset=986361, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1673378017005, expireTimestamp=None)

I recreated the topics with the replication factor equal to the number of servers. I restarted all of the Kafka and ZooKeeper services. We tried from other clients. We tried searching the internet for some pointer as to what has gone awry. The only thing I could find is that the __consumer_offsets topic wasn’t right … you should be able to read the offset data:

/kafka-console-consumer.sh --bootstrap-server kafkahost.example.net:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets

Now, resetting the entire consumer offsets partition is a massively bad idea in a production environment. We’ve had to advance offsets before, but never had to reset the entire partition.

The documentation I found online used a ZooKeeper command that no longer exists — ‘rmr’ — but deleteall (basically a subtree deletion) did the trick.

# Reset consumer offsets
# Note -- this is a *really* bad idea in production as it can lead to missed messages. You should record the current offsets, clear the topic, then set the offset for the client groups
systemctl stop kafka # on all servers
mv /kafka/ws_npm_kafka/data-kafka/__consumer_offsets-* /tmp/
./zookeeper-shell.sh localhost:2181
deleteall /brokers/topics/__consumer_offsets
# Once this has been done on all servers, restart kafka
systemctl start kafka

Kafka: Finding the Controller in Zookeeper

When restarting all of the Kafka servers (e.g. a periodic patch and reboot), it is better if you avoid rolling the controller between the nodes on every reboot. To accomplish this goal, find out which server is acting as the controller and restart it last — you’ll have the controller move one time using this method, but only once no matter how many servers are in your deployment.

# Connect to zookeeper
./zookeeper-shell.sh localhost:2181
# Find controller by opening the zookeeper shell and querying for controller
get /controller
{"version":1,"brokerid":250,"timestamp":"1676694139851"}

# Get details on broker ID reported as controller
get /brokers/ids/250
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","SASL_PLAINTEXT":"SASL_PLAINTEXT"},"endpoints":["PLAINTEXT://kafkahost.example.net:9093","SASL_PLAINTEXT://kafkahost.example.net:9092"],"jmx_port":9999,"host":"kafkahost.example.net","timestamp":"1676348503278","port":9093,"version":4}

Anya’s 10th Birthday Cake: Strawberry Whipped Cream

Ingredients:

  • 1 teaspoon unflavored gelatin powder
  • 4 teaspoons water
  • 1 cup heavy whipping cream
  • 1 Tablespoon maple syrup
  • 1/4 cup strawberry reduction

Method:

  • Place water in a heat-proof container and sprinkle gelatin on top. Allow to sit for at least five minutes to hydrate. Place container into a pot of water (water should not come too close to the top of the container — we just need a little bit in the pot to make a water bath). Simmer until gelatin is clear and liquid. Remove from heat and allow to slightly cool.
  • Add maple syrup to whipped cream and beat until soft peaks form.
  • Slowly drizzle gelatin into whipped cream (beating as you pour).
  • Slowly add in strawberry reduction and continue to beat until firm peaks form.

Anya’s 10th Birthday Cake: Strawberry Cake with Strawberry Whipped Cream

Strawberry Cake with Strawberry Whipped Cream

Recipe by LisaCourse: DessertCuisine: AmericanDifficulty: Medium
Servings

12

servings
Prep time

1

hour 
Cooking time

40

minutes

Ingredients

  • 14 oz all-purpose Flour

  • 2 tsp baking powder

  • 1 tsp baking soda

  • 1 teaspoon baking soda

  • 1/2 teaspoon salt

  • 226 g unsalted butter, room temperature

  • 284 g sugar

  • 1 Tablespoon lemon juice

  • 170 g egg whites, room temperature

  • 125 g strawberry reduction, room temperature

  • 170 g heavy whipping cream

Method

  • Preheat oven to 350F. Flour three 8″ cake pans and set aside.
  • Whisk together the cream, strawberry reduction, and lemon juice.
  • In a separate bowl, whisk together the flour, baking powder, baking soda, and salt.
  • In the bowl of a stand mixer, place the room temperature butter and beat until smooth and shiny.
  • Gradually add in the sugar, beating until the mixture is fluffy and light colored.
  • Add the egg whites, one at a time, and beat at a medium speed.
  • On a low speed, add about 1/3 of the dry ingredients followed by 1/3 of the wet ingredients, mixing until mostly incorporated into the batter. Repeat until all ingredients have been added.
  • Divide the batter evenly between the three pans.
  • Bake at 350 for 30-40 minutes (a cake tester should come out clean).
  • Place pans on a wire rack to cool for ten minutes, then turn the cakes out onto the rack and allow to cool.
  • To assemble, place a cake on the cake plate. Cover with strawberry reduction, and top with a second cake. Cover with strawberry reduction, then top with the third cake. Coat liberally with strawberry whipped cream and serve.

Commentary

  • This year, Anya chose a strawberry cake. We looked through a lot of recipes online and merged a couple into what we hoped would be a very strawberry flavored cake. It was — the strawberry whipped cream was specially tasty. I think it could have used more strawberry reduction between the layers, though the cake was already very moist.