{"id":11208,"date":"2024-10-22T10:19:15","date_gmt":"2024-10-22T15:19:15","guid":{"rendered":"https:\/\/www.rushworth.us\/lisa\/?p=11208"},"modified":"2024-10-22T10:19:16","modified_gmt":"2024-10-22T15:19:16","slug":"kafka-consumer-and-producer-example-java","status":"publish","type":"post","link":"https:\/\/www.rushworth.us\/lisa\/?p=11208","title":{"rendered":"Kafka Consumer and Producer Example \u2013 Java"},"content":{"rendered":"\n<h2 class=\"wp-block-heading\">Producer<\/h2>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: java; title: ; notranslate\" title=\"\">\npackage com.example;\n\nimport org.apache.kafka.clients.producer.KafkaProducer;\nimport org.apache.kafka.clients.producer.ProducerRecord;\nimport org.apache.kafka.clients.producer.ProducerConfig;\nimport org.apache.kafka.common.serialization.StringSerializer;\n\nimport java.io.InputStream;\nimport java.time.LocalDateTime;\nimport java.time.format.DateTimeFormatter;\nimport java.util.Properties;\n\npublic class SimpleKafkaProducer {\n\n    public static void main(String&#x5B;] args) {\n        if (args.length &lt; 2) {\n            System.err.println(&quot;Please provide Kafka server and port. Usage: SimpleKafkaProducer &lt;kafka-server&gt; &lt;port&gt;&quot;);\n            System.exit(1);\n        }\n\n        String kafkaHost = args&#x5B;0];\n        String kafkaPort = args&#x5B;1];\n        String kafkaServer = kafkaHost + &quot;:&quot; + kafkaPort;\n        String topicName = &quot;LJRJavaTest&quot;;\n        String truststorePassword = &quot;truststore-password&quot;;\n\n        Properties props = new Properties();\n        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);\n        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());\n        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());\n\n        \/\/ Load truststore from resources\n        try (InputStream truststoreStream = SimpleKafkaProducer.class.getResourceAsStream(&quot;\/kafka.client.truststore.jks&quot;)) {\n            if (truststoreStream == null) {\n                throw new RuntimeException(&quot;Truststore not found in resources&quot;);\n            }\n\n            \/\/ Create a temporary file to hold the truststore\n            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile(&quot;kafka.client.truststore&quot;, &quot;.jks&quot;);\n            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);\n\n            props.put(&quot;security.protocol&quot;, &quot;SSL&quot;);\n            props.put(&quot;ssl.truststore.location&quot;, tempTruststore.toString());\n            props.put(&quot;ssl.truststore.password&quot;, truststorePassword);\n\n            KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(props);\n\n            \/\/ Define the date-time formatter\n            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(&quot;yyyyMMdd &#039;at&#039; HH:mm:ss&quot;);\n\n            try {\n                for (int i = 0; i &lt; 10; i++) {\n                    String key = &quot;Key-&quot; + i;\n                    \n                    \/\/ Get the current timestamp\n                    String timestamp = LocalDateTime.now().format(formatter);\n                    \n                    \/\/ Include the timestamp, Kafka host, and port in the message value\n                    String value = String.format(&quot;Message-%d (Test from %s on server %s:%s)&quot;, i, timestamp, kafkaHost, kafkaPort);\n                    \n                    ProducerRecord&lt;String, String&gt; record = new ProducerRecord&lt;&gt;(topicName, key, value);\n                    producer.send(record);\n                    System.out.println(&quot;Sent message: (&quot; + key + &quot;, &quot; + value + &quot;)&quot;);\n                }\n            } catch (Exception e) {\n                e.printStackTrace();\n            } finally {\n                producer.close();\n            }\n        } catch (Exception e) {\n            e.printStackTrace();\n        }\n    }\n}\n<\/pre><\/div>\n\n\n<h2 class=\"wp-block-heading\">Consumer<\/h2>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: java; title: ; notranslate\" title=\"\">\npackage com.example;\n\nimport org.apache.kafka.clients.consumer.ConsumerConfig;\nimport org.apache.kafka.clients.consumer.ConsumerRecords;\nimport org.apache.kafka.clients.consumer.KafkaConsumer;\nimport org.apache.kafka.clients.consumer.ConsumerRecord;\nimport org.apache.kafka.common.serialization.StringDeserializer;\n\nimport java.io.InputStream;\nimport java.time.Duration;\nimport java.util.Collections;\nimport java.util.Properties;\n\npublic class SimpleKafkaConsumer {\n\n    public static void main(String&#x5B;] args) {\n        if (args.length &lt; 2) {\n            System.err.println(&quot;Please provide Kafka server and port. Usage: SimpleKafkaConsumer &lt;kafka-server&gt; &lt;port&gt;&quot;);\n            System.exit(1);\n        }\n\n        String kafkaServer = args&#x5B;0] + &quot;:&quot; + args&#x5B;1];\n        String topicName = &quot;LJRJavaTest&quot;;\n        String groupId = &quot;test-consumer-group&quot;;\n        String truststorePassword = &quot;truststore-password&quot;;\n\n        Properties props = new Properties();\n        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);\n        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);\n        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\n        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());\n        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);\n\n        \/\/ Load truststore from resources\n        try (InputStream truststoreStream = SimpleKafkaConsumer.class.getResourceAsStream(&quot;\/kafka.client.truststore.jks&quot;)) {\n            if (truststoreStream == null) {\n                throw new RuntimeException(&quot;Truststore not found in resources&quot;);\n            }\n\n            \/\/ Create a temporary file to hold the truststore\n            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile(&quot;kafka.client.truststore&quot;, &quot;.jks&quot;);\n            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);\n\n            props.put(&quot;security.protocol&quot;, &quot;SSL&quot;);\n            props.put(&quot;ssl.truststore.location&quot;, tempTruststore.toString());\n            props.put(&quot;ssl.truststore.password&quot;, truststorePassword);\n\n            KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);\n            consumer.subscribe(Collections.singletonList(topicName));\n\n            try {\n                while (true) {\n                    ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofMillis(100));\n                    for (ConsumerRecord&lt;String, String&gt; record : records) {\n                        System.out.printf(&quot;Consumed message: (key: %s, value: %s, offset: %d, partition: %d)%n&quot;,\n                                record.key(), record.value(), record.offset(), record.partition());\n                    }\n                }\n            } catch (Exception e) {\n                e.printStackTrace();\n            } finally {\n                consumer.close();\n            }\n        } catch (Exception e) {\n            e.printStackTrace();\n        }\n    }\n}\n<\/pre><\/div>\n\n\n<h2 class=\"wp-block-heading\">Keystore for Trusts<\/h2>\n\n\n\n<p>The cert on my Kafka server is signed by a CA that has both a root and intermediary signing certificate. To trust the chain, I need to populate rootCA.crt and intermediateCA.crt files with the correct base64 encoded public key and import both files into a new trust store:<\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: bash; title: ; notranslate\" title=\"\">\nkeytool -importcert -file rootCA.crt -keystore kafka.client.truststore.jks -alias rootCA -storepass truststore-password\n\nkeytool -importcert -file intermediateCA.crt -keystore kafka.client.truststore.jks -alias intermediateCA -storepass truststore-password\n<\/pre><\/div>\n\n\n<figure class=\"wp-block-image size-full\"><a href=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimporting.jpg\"><img loading=\"lazy\" decoding=\"async\" width=\"624\" height=\"341\" src=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimporting.jpg\" alt=\"\" class=\"wp-image-11210\" srcset=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimporting.jpg 624w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimporting-300x164.jpg 300w\" sizes=\"auto, (max-width: 624px) 100vw, 624px\" \/><\/a><\/figure>\n\n\n\n<p>Then list certs in the trust store to verify the import was successful:<\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: bash; title: ; notranslate\" title=\"\">\nkeytool -list -keystore kafka.client.truststore.jks -storepass truststore-password\n<\/pre><\/div>\n\n\n<figure class=\"wp-block-image size-full\"><a href=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimported.jpg\"><img loading=\"lazy\" decoding=\"async\" width=\"624\" height=\"166\" src=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimported.jpg\" alt=\"\" class=\"wp-image-11209\" srcset=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimported.jpg 624w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/keytool-cacertsimported-300x80.jpg 300w\" sizes=\"auto, (max-width: 624px) 100vw, 624px\" \/><\/a><\/figure>\n\n\n\n<p>And place the jks keystore with the CA certs in project:<\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: bash; title: ; notranslate\" title=\"\">\nC:.\n\u2502   pom.xml\n\u2502\n\u251c\u2500\u2500\u2500src\n\u2502   \u2514\u2500\u2500\u2500main\n\u2502       \u251c\u2500\u2500\u2500java\n\u2502       \u2502   \u2514\u2500\u2500\u2500com\n\u2502       \u2502       \u2514\u2500\u2500\u2500example\n\u2502       \u2502               SimpleKafkaProducer.java\n\u2502       \u2502\n\u2502       \u2514\u2500\u2500\u2500resources\n\u2502               kafka.client.truststore.jks\n\n<\/pre><\/div>\n\n\n<h2 class=\"wp-block-heading\">Build and Run<\/h2>\n\n\n\n<p>Create a pom.xml for the build<\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: xml; title: ; notranslate\" title=\"\">\n&lt;project xmlns=&quot;http:\/\/maven.apache.org\/POM\/4.0.0&quot;\n         xmlns:xsi=&quot;http:\/\/www.w3.org\/2001\/XMLSchema-instance&quot;\n         xsi:schemaLocation=&quot;http:\/\/maven.apache.org\/POM\/4.0.0 http:\/\/maven.apache.org\/xsd\/maven-4.0.0.xsd&quot;&gt;\n    &lt;modelVersion&gt;4.0.0&lt;\/modelVersion&gt;\n\n    &lt;groupId&gt;com.example&lt;\/groupId&gt;\n    &lt;artifactId&gt;kafka-producer-example&lt;\/artifactId&gt;\n    &lt;version&gt;1.0-SNAPSHOT&lt;\/version&gt;\n    \n    &lt;properties&gt;\n        &lt;maven.compiler.source&gt;1.8&lt;\/maven.compiler.source&gt;\n        &lt;maven.compiler.target&gt;1.8&lt;\/maven.compiler.target&gt;\n    &lt;\/properties&gt;\n\n    &lt;dependencies&gt;\n        &lt;!-- Kafka client dependency --&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;org.apache.kafka&lt;\/groupId&gt;\n            &lt;artifactId&gt;kafka-clients&lt;\/artifactId&gt;\n            &lt;version&gt;3.0.0&lt;\/version&gt;\n        &lt;\/dependency&gt;\n    &lt;\/dependencies&gt;\n\n    &lt;build&gt;\n        &lt;plugins&gt;\n            &lt;!-- Compiler plugin to ensure Java 8 compatibility --&gt;\n            &lt;plugin&gt;\n                &lt;groupId&gt;org.apache.maven.plugins&lt;\/groupId&gt;\n                &lt;artifactId&gt;maven-compiler-plugin&lt;\/artifactId&gt;\n                &lt;version&gt;3.8.1&lt;\/version&gt;\n                &lt;configuration&gt;\n                    &lt;source&gt;1.8&lt;\/source&gt;\n                    &lt;target&gt;1.8&lt;\/target&gt;\n                &lt;\/configuration&gt;\n            &lt;\/plugin&gt;\n            &lt;!-- Exec Maven Plugin to run the Java class --&gt;\n            &lt;plugin&gt;\n                &lt;groupId&gt;org.codehaus.mojo&lt;\/groupId&gt;\n                &lt;artifactId&gt;exec-maven-plugin&lt;\/artifactId&gt;\n                &lt;version&gt;3.0.0&lt;\/version&gt;\n                &lt;configuration&gt;\n                    &lt;mainClass&gt;com.example.SimpleKafkaProducer&lt;\/mainClass&gt;\n                &lt;\/configuration&gt;\n            &lt;\/plugin&gt;\n        &lt;\/plugins&gt;\n    &lt;\/build&gt;\n&lt;\/project&gt;\n\n<\/pre><\/div>\n\n\n<p>Build using `mvn clean package` <\/p>\n\n\n\n<p>Run the producer using ` mvn exec:java -Dexec.mainClass=&#8221;com.example.SimpleKafkaProducer&#8221; -Dexec.args=&#8221;kafkahost.example.net 9095&#8243;`<\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><a href=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-producer.jpg\"><img loading=\"lazy\" decoding=\"async\" width=\"1024\" height=\"561\" src=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-producer-1024x561.jpg\" alt=\"\" class=\"wp-image-11212\" srcset=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-producer-1024x561.jpg 1024w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-producer-300x164.jpg 300w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-producer-768x420.jpg 768w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-producer-750x411.jpg 750w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-producer.jpg 1085w\" sizes=\"auto, (max-width: 1024px) 100vw, 1024px\" \/><\/a><\/figure>\n\n\n\n<p>Run the consumer using ` mvn exec:java -Dexec.mainClass=&#8221;com.example.SimpleKafkaConsumer&#8221; -Dexec.args=&#8221;kafkahost.example.net 9095&#8243;`<\/p>\n\n\n\n<figure class=\"wp-block-image size-full\"><a href=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-consumer.jpg\"><img loading=\"lazy\" decoding=\"async\" width=\"624\" height=\"352\" src=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-consumer.jpg\" alt=\"\" class=\"wp-image-11211\" srcset=\"https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-consumer.jpg 624w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-consumer-300x169.jpg 300w, https:\/\/www.rushworth.us\/lisa\/wp-content\/uploads\/2024\/10\/kafka-java-consumer-480x270.jpg 480w\" sizes=\"auto, (max-width: 624px) 100vw, 624px\" \/><\/a><\/figure>\n","protected":false},"excerpt":{"rendered":"<p>Producer Consumer Keystore for Trusts The cert on my Kafka server is signed by a CA that has both a root and intermediary signing certificate. To trust the chain, I need to populate rootCA.crt and intermediateCA.crt files with the correct base64 encoded public key and import both files into a new trust store: Then list &hellip;<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[33,1547,1762],"tags":[578,1361,2055,2056],"class_list":["post-11208","post","type-post","status-publish","format-standard","hentry","category-coding","category-java","category-kafka","tag-java","tag-kafka","tag-kafka-consumer","tag-kafka-producer"],"_links":{"self":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11208","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=11208"}],"version-history":[{"count":1,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11208\/revisions"}],"predecessor-version":[{"id":11213,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11208\/revisions\/11213"}],"wp:attachment":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=11208"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=11208"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=11208"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}