Category: Technology

JavaScript: Extracting Web Content You Cannot Copy

There are many times I need to copy “stuff” from a website that is structured in such a way that simply copy/pasting the table data is impossible. Screen prints work, but I usually want the table of data in Excel so I can add notations and such. In these cases, running JavaScript from the browser’s developers console lets you access the underlying text elements.

Right click on one of the text elements and select “Inspect”

Now copy the element’s XPath

Read the value — we don’t generally want just this one element … but the path down to the “tbody” tag looks like a reasonable place to find the values within the table.

/html/body/div[1]/div/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[3]/div/div/div/table/tbody/a[4]/td[2]/div/span[2]

Use JavaScript to grab all of the TD elements under the tbody:

// Define the XPath expression to select all <td> elements within the specific <tbody>
const xpathExpression = "/html/body/div[1]/div/div/div[2]/div[2]/div[2]/div/div[3]/div/div/div[3]/div/div/div/table/tbody//td";

// Use document.evaluate to get all matching <td> nodes
const nodesSnapshot = document.evaluate(xpathExpression, document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null);

// Log the number of nodes found (for debugging purposes)
console.log("Total <td> elements found:", nodesSnapshot.snapshotLength);

// Iterate over the nodes and log their text content
for (let i = 0; i < nodesSnapshot.snapshotLength; i++) {
    let node = nodesSnapshot.snapshotItem(i);
    if (node) {
        const textContent = node.textContent.trim();
        if (textContent) { // Only log non-empty content
            console.log(textContent);
        }
    }
}

Voila! I redacted some data below, but it’s just a list of values, one per line.

Kafka Metadata Mismatch

I’ll prefix this with a caveat — when Zookeeper and the Kafka metadata properties topic ID values don’t match up … there’s something wrong beyond “hey, make this string the same as that string and try again”. Look for communication issues, disk issues, etc that might lead to bad data. And, lacking any cause, the general recommendation is to drop and recreate the topic.

But! “Hey, I am going to delete all the data in this Kafka topic. Everyone good with that?” is seldom answered with a resounding “of course, we don’t actually want to keep the data we specifically configured a system to keep”. And you need to keep the topic around even though something has clearly gone sideways. Fortunately, you can manually update either the zookeeper topic ID or the one on disk. I find it easier to update the one on disk in Kafka because this change can be reverted. Stop all of the Kafka servers. The following script runs on each Kafka server – identifies all of the folders within the Kafka data for the partition and changes the wrong topic ID to the right one. Then start the Kafka servers again, and the partition should be functional. For now.

#!/bin/bash

# Function to check if Kafka is stopped
check_kafka_stopped() {
    if systemctl is-active --quiet kafka; then
        echo "Error: Kafka service is still running. Please stop Kafka before proceeding."
        exit 1
    else
        echo "Kafka service is stopped. Proceeding with backup and update."
    fi
}

# Define the base directory where Kafka stores its data
DATA_DIR="/path/todata-kafka"

# Define the topic prefix to search for
TOPIC_PREFIX="MY_TOPIC_NAME-"

# Old and new topic IDs
OLD_TOPIC_ID="2xSmlPMBRv2_ihuWgrvQNA"
NEW_TOPIC_ID="57kBnenRTo21_VhEhWFOWg"

# Date string for backup file naming
DATE_STR=$(date +%Y%m%d)

# Check if Kafka is stopped
check_kafka_stopped

# Find all directories matching the topic pattern and process each one
for dir in $DATA_DIR/${TOPIC_PREFIX}*; do
    # Construct the path to the partition.metadata file
    metadata_file="$dir/partition.metadata"

    # Check if the metadata file exists
    if [[ -f "$metadata_file" ]]; then
        # Backup the existing partition.metadata file
        backup_file="$dir/partmetadata.$DATE_STR"
        echo "Backing up $metadata_file to $backup_file"
        cp "$metadata_file" "$backup_file"

        # Use sed to replace the line containing the old topic_id with the new one
        echo "Updating topic_id in $metadata_file"
        sed -i "s/^topic_id: $OLD_TOPIC_ID/topic_id: $NEW_TOPIC_ID/" "$metadata_file"
    else
        echo "No partition.metadata file found in $dir"
    fi
done

echo "Backup and topic ID update complete."

How do you get the Kafka metadata and Zookeeper topic IDs? In Zookeeper, ask it:

kafkaserver::fixTopicID # bin/zookeeper-shell.sh $(hostname):2181
get /brokers/topics/MY_TOPIC_NAME
{"removing_replicas":{},"partitions":{"2":[251,250],"5":[249,250],"27":[248,247],"12":[248,251],"8":[247,248],"15":[251,250],"21":[247,250],"18":[249,248],"24":[250,248],"7":[251,247],"1":[250,249],"17":[248,247],"23":[249,247],"26":[247,251],"4":[248,247],"11":[247,250],"14":[250,248],"20":[251,249],"29":[250,249],"6":[250,251],"28":[249,248],"9":[248,249],"0":[249,248],"22":[248,251],"16":[247,251],"19":[250,249],"3":[247,251],"10":[251,249],"25":[251,250],"13":[249,247]},"topic_id":"57kBnenRTo21_VhEhWFOWg","adding_replicas":{},"version":3}

Kafka’s is stored on disk in each folder for a topic partition:

kafkaserver::fixTopicID # cat /path/to/data-kafka/MY_TOPIC_NAME-29/partition.metadata
version: 0
topic_id: 2xSmlPMBRv2_ihuWgrvQNA

Once the data is processed, it would be a good idea to recreate the topic … but ensuring the two topic ID values match up will get you back online.

Python: Getting Active Directory Subnets

Like my script that pulls the AD site information – this lets me see what subnets are defined and which sites are assigned to those subnets. I was able to quickly confirm that the devices that had problems communicating with Active Directory don’t have a site defined. Way back in 2000, we created a “catch all” 10.0.0.0/8 subnet and assigned it to the user authentication site. New networks on a whole different addressing scheme don’t have a site assignment. It should still work, but the application in question has historically had issues with going the “Ok, list ’em all” route.

from ldap3 import Server, Connection, ALL, SUBTREE, Tls
import ssl
import getpass

# Attempt to import USERNAME and PASSWORD from config.py
try:
    from config import USERNAME, PASSWORD
except ImportError:
    USERNAME, PASSWORD = None, None

# Define constants
LDAP_SERVER = 'ad.example.com'
LDAP_PORT = 636

def get_subnets_and_sites(username, password):
    # Set up TLS configuration
    tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2)

    # Connect to the LDAP server
    server = Server(LDAP_SERVER, port=LDAP_PORT, use_ssl=True, tls=tls_configuration, get_info=ALL)
    connection = Connection(server, user=username, password=password, authentication='SIMPLE', auto_bind=True)

    # Define the search base for subnets
    search_base = 'CN=Subnets,CN=Sites,CN=Configuration,DC=example,DC=com'  # Change this to match your domain's DN
    search_filter = '(objectClass=subnet)'  # Filter to find all subnet objects
    search_attributes = ['cn', 'siteObject']  # Retrieve the common name and site object references

    # Perform the search
    connection.search(search_base, search_filter, SUBTREE, attributes=search_attributes)

    # Extract and return subnets and their site assignments
    subnets_sites = []
    for entry in connection.entries:
        subnet_name = entry.cn.value
        site_dn = entry.siteObject.value if entry.siteObject else "No site assigned"
        subnets_sites.append((subnet_name, site_dn))

    return subnets_sites

def print_subnets_and_sites(subnets_sites):
    if subnets_sites:
        print("\nSubnets and their Site Assignments:")
        for subnet, site in subnets_sites:
            print(f"Subnet: {subnet}, Site: {site}")
    else:
        print("No subnets found in the domain.")

def main():
    # Prompt for username and password if not available in config.py
    username = USERNAME if USERNAME else input("Enter your LDAP username: ")
    password = PASSWORD if PASSWORD else getpass.getpass("Enter your LDAP password: ")

    subnets_sites = get_subnets_and_sites(username, password)
    print_subnets_and_sites(subnets_sites)

if __name__ == "__main__":
    main()

Python: Get Active Directory Sites

One down side of not administering the Active Directory domain anymore is that I don’t have the quick GUI tools that show you how “stuff” is set up. Luckily, the sites are all reflected in AD objects that can be read by authenticated users:

from ldap3 import Server, Connection, ALL, SIMPLE, SUBTREE, Tls
import ssl
import getpass

# Attempt to import USERNAME and PASSWORD from config.py
try:
    from config import USERNAME, PASSWORD
except ImportError:
    USERNAME, PASSWORD = None, None

# Define constants
LDAP_SERVER = 'ad.example.com'
LDAP_PORT = 636

def get_all_sites(username, password):
    # Set up TLS configuration
    tls_configuration = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2)

    # Connect to the LDAP server
    server = Server(LDAP_SERVER, port=LDAP_PORT, use_ssl=True, tls=tls_configuration, get_info=ALL)
    connection = Connection(server, user=username, password=password, authentication='SIMPLE', auto_bind=True)

    # Define the search base for sites
    search_base = 'CN=Sites,CN=Configuration,DC=example,DC=com'  # Update to match your domain's DN structure
    search_filter = '(objectClass=site)'  # Filter to find all site objects
    search_attributes = ['cn']  # We only need the common name (cn) of the sites

    # Perform the search
    connection.search(search_base, search_filter, SUBTREE, attributes=search_attributes)

    # Extract and return site names
    site_names = [entry['cn'].value for entry in connection.entries]
    return site_names

def print_site_names(site_names):
    if site_names:
        print("\nAD Sites:")
        for site in site_names:
            print(f"- {site}")
    else:
        print("No sites found in the domain.")

def main():
    # Prompt for username and password if not available in config.py
    username = USERNAME if USERNAME else input("Enter your LDAP username: ")
    password = PASSWORD if PASSWORD else getpass.getpass("Enter your LDAP password: ")

    site_names = get_all_sites(username, password)
    print_site_names(site_names)

if __name__ == "__main__":
    main()

ElasticSearch 7.7.0 with Log4j 2.24.1

Interestingly, I am still able to manually upgrade the log4j components of ElasticSearch 7.7.0 to keep off the “naughty list” of vulnerabilities at work. I am successfully using 2.24.1 with ElasticSearch 7.7.0, OpenDistro, and the S3 backup plug-in.

Same script as before — just downloaded newer JARs, placed them into the /tmp folder on each server, and set the l4jver variable to the current release.

export l4jver=2.24.1
systemctl stop elasticsearch 

# Remove old jars
rm  --interactive=never /opt/elk/elasticsearch/lib/log4j-api-*.jar
rm  --interactive=never /opt/elk/elasticsearch/lib/log4j-core-*.jar
rm  --interactive=never /opt/elk/elasticsearch/modules/x-pack-identity-provider/log4j-slf4j-impl-*.jar
rm  --interactive=never /opt/elk/elasticsearch/modules/x-pack-security/log4j-slf4j-impl-*.jar
rm  --interactive=never /opt/elk/elasticsearch/plugins/opendistro_security/log4j-slf4j-impl-*.jar
rm  --interactive=never /opt/elk/elasticsearch/modules/x-pack-core/log4j-1.2-api-*.jar 
rm  --interactive=never /opt/elk/elasticsearch/plugins/repository-s3/log4j-1.2-api-*.jar 

# Copy in upgraded jars
cp /tmp/log4j-api-$l4jver*.jar /opt/elk/elasticsearch/lib/
cp /tmp/log4j-core-$l4jver*.jar /opt/elk/elasticsearch/lib/
cp /tmp/log4j-slf4j-impl-$l4jver*.jar /opt/elk/elasticsearch/modules/x-pack-identity-provider/
cp /tmp/log4j-slf4j-impl-$l4jver*.jar /opt/elk/elasticsearch/modules/x-pack-security/
cp /tmp/log4j-slf4j-impl-$l4jver*.jar /opt/elk/elasticsearch/plugins/opendistro_security/
cp /tmp/log4j-1.2-api-$l4jver*.jar /opt/elk/elasticsearch/modules/x-pack-core/
cp /tmp/log4j-1.2-api-$l4jver*.jar /opt/elk/elasticsearch/plugins/repository-s3/

# Fix permissions
chown elkadmin:elkadmin /opt/elk/elasticsearch/lib/log4j*
chown elkadmin:elkadmin /opt/elk/elasticsearch/modules/x-pack-core/log4j*
chown elkadmin:elkadmin /opt/elk/elasticsearch/modules/x-pack-identity-provider/log4j*
chown elkadmin:elkadmin /opt/elk/elasticsearch/modules/x-pack-security/log4j*
chown elkadmin:elkadmin /opt/elk/elasticsearch/plugins/repository-s3/log4j*
chown elkadmin:elkadmin /opt/elk/elasticsearch/plugins/opendistro_security/log4j*

systemctl start elasticsearch 

SNMPWalk

I’ve been doing a lot of testing with SNMP this week, and it is helpful to have an ad hoc SNMP client that can retrieve data before you go about trying to retrieve and parse data in your own code. I’m a lot more confident telling someone they gave me a bad community string if someone else’s “known working” program fails! Enter snmpwalk

Some of our devices return data out of order, so I need the -Cc (turn off check for increasing OID numbers). The following command walks the 1.3.6.1.2.1.2.2.1.2 (ifDescr) tree for host 10.13.115.82 using the community string C0mmun1tyStr1ngH3r3:

snmpwalk -v 2c -c C0mmun1tyStr1ngH3r3 -Cc "10.13.115.82" .1.3.6.1.2.1.2.2.1.2
IF-MIB::ifDescr.1 = STRING: eth 6/0
IF-MIB::ifDescr.2 = STRING: eth 7/0
IF-MIB::ifDescr.1000072 = STRING: XGige 6/0
IF-MIB::ifDescr.1000073 = STRING: XGige 6/1
IF-MIB::ifDescr.1000074 = STRING: XGige 6/2
IF-MIB::ifDescr.1000075 = STRING: XGige 6/3
IF-MIB::ifDescr.1000076 = STRING: XGige 6/4
IF-MIB::ifDescr.1000077 = STRING: XGige 6/5
IF-MIB::ifDescr.1000078 = STRING: XGige 6/6
IF-MIB::ifDescr.1000079 = STRING: XGige 6/7
IF-MIB::ifDescr.1000080 = STRING: Gige 6/0

PowerShell to Uninstall an Application

I was curious if, instead of getting prompted for the local admin account for each application I want to remove, I could run PowerShell “as a different user” then use it to uninstall an application or list of applications. In this case, all of the .NET 6 stuff. Answer: absolutely.

# List all installed applications containing string "NET"
# Get-WmiObject -Class Win32_Product | Where-Object { $_.Name -like '*NET*' } | Select-Object -Property Name

# Define a static list of application names to uninstall
$appsToUninstall = @(
    'Microsoft .NET Runtime - 6.0.36 (x86)',
    'Microsoft .NET Host FX Resolver - 6.0.36 (x64)',
    'Microsoft .NET Host - 6.0.36 (x64)',
    'Microsoft .NET Host FX Resolver - 6.0.36 (x86)',
    'Microsoft .NET Host - 6.0.36 (x86)',
    'Microsoft .NET Runtime - 6.0.36 (x64)',
    'Microsoft.NET.Workload.Emscripten.net6.Manifest (x64)'
)

# Loop through each application name in the static list
foreach ($appName in $appsToUninstall) {
    # Find the application object by name
    $app = Get-WmiObject -Class Win32_Product | Where-Object { $_.Name -eq $appName }
    
    # Check if the application was found before attempting to uninstall
    if ($app) {
        Write-Output "Uninstalling $($app.Name)..."
        $app.Uninstall() | Out-Null
        Write-Output "$($app.Name) has been uninstalled successfully."
    }
    else {
        Write-Output "Application $appName not found."
    }
}

Debugging Logstash

Running all of the logstash pipelines in debug mode produces an inundating amount of data, but you can stop logstash and manually run an individual pipeline file (-f) from the command line:

/opt/elk/logstash/bin/logstash -f /opt/elk/logstash/config/ljr_testing.conf --log.level debug

Which still produces a lot of output, but it’s all relevant to the pipeline that is experiencing a problem:

Using bundled JDK: /opt/elk/logstash/jdk
/opt/elk/logstash-8.13.1/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_int
/opt/elk/logstash-8.13.1/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_f
Sending Logstash logs to /opt/elk/logstash/logs which is now configured via log4j2.properties
[2024-10-23T10:47:36,364][INFO ][logstash.runner          ] Log4j configuration path used is: /opt/elk/logstash/config/log4j2.properties
[2024-10-23T10:47:36,369][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.13.1", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}
[2024-10-23T10:47:36,371][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11]
[2024-10-23T10:47:36,376][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"fb_apache", :directory=>"/opt/elk/logstash/modules/fb_apache/configuration"}
[2024-10-23T10:47:36,376][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"fb_apache", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x14917e38 @directory="/opt/elk/logstash/modules/fb_apache/configuration", @module_name="fb_apache", @kibana_version_parts=["6", "0", "0"]>}
[2024-10-23T10:47:36,379][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"netflow", :directory=>"/opt/elk/logstash/modules/netflow/configuration"}
[2024-10-23T10:47:36,379][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"netflow", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x5909021c @directory="/opt/elk/logstash/modules/netflow/configuration", @module_name="netflow", @kibana_version_parts=["6", "0", "0"]>}
[2024-10-23T10:47:36,408][DEBUG][logstash.runner          ] Setting global FieldReference escape style: none
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] -------- Logstash Settings (* means modified) ---------
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] allow_superuser: true
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] node.name: "logstashhost.example.net"
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] *path.config: "/opt/elk/logstash/config/ljr_testing.conf"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] path.data: "/opt/elk/logstash/data"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules.cli: #<Java::OrgLogstashUtil::ModulesSettingArray: []>
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules: []
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules_list: []
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules_variable_list: []
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules_setup: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.test_and_exit: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.reload.automatic: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.reload.interval: #<Java::OrgLogstashUtil::TimeValue:0x4e8224cf>
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.support_escapes: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.field_reference.escape_style: "none"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] event_api.tags.illegal: "rename"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] metric.collect: true
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] pipeline.id: "main"
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.system: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.workers: 4
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.batch.size: 125
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.batch.delay: 50
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.unsafe_shutdown: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.reloadable: true
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.plugin_classloaders: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.separate_logs: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.ordered: "auto"
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.ecs_compatibility: "v8"
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] path.plugins: []
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] config.debug: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] *log.level: "debug" (default: "info")
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] version: false
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] help: false
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] enable-local-plugin-development: false
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] log.format: "plain"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.enabled: true
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *api.http.host: "10.166.25.148" (via deprecated `http.host`; default: "127.0.0.1")
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *http.host: "10.166.25.148"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *api.http.port: 9600..9600 (via deprecated `http.port`; default: 9600..9700)
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *http.port: 9600..9600
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.environment: "production"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.auth.type: "none"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.auth.basic.password_policy.mode: "WARN"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.length.minimum: 8
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.upper: "REQUIRED"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.lower: "REQUIRED"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.digit: "REQUIRED"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.symbol: "OPTIONAL"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.ssl.enabled: false
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.ssl.supported_protocols: []
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.type: "memory"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.drain: false
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.page_capacity: 67108864
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.max_bytes: 1073741824
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.max_events: 0
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.checkpoint.acks: 1024
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] queue.checkpoint.writes: 1024
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] queue.checkpoint.interval: 1000
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] queue.checkpoint.retry: true
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.enable: false
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.max_bytes: 1073741824
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.flush_interval: 5000
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.storage_policy: "drop_newer"
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.warn: #<Java::OrgLogstashUtil::TimeValue:0x3436da82>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.info: #<Java::OrgLogstashUtil::TimeValue:0x1001ede3>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.debug: #<Java::OrgLogstashUtil::TimeValue:0x41820105>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.trace: #<Java::OrgLogstashUtil::TimeValue:0x2bb4b203>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] keystore.classname: "org.logstash.secret.store.backend.JavaKeyStore"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] keystore.file: "/opt/elk/logstash/config/logstash.keystore"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] *monitoring.cluster_uuid: "hosq-h4ESECe63l2DvgymA"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.queue: "/opt/elk/logstash/data/queue"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.dead_letter_queue: "/opt/elk/logstash/data/dead_letter_queue"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.settings: "/opt/elk/logstash/config"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.logs: "/opt/elk/logstash/logs"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.enabled: false
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.hosts: ["http://localhost:9200"]
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.collection.interval: #<Java::OrgLogstashUtil::TimeValue:0x35f2b220>
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.collection.timeout_interval: #<Java::OrgLogstashUtil::TimeValue:0x296c863c>
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.username: "logstash_system"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.ssl.verification_mode: "full"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.ssl.cipher_suites: []
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.sniffing: false
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.monitoring.collection.pipeline.details.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.monitoring.collection.config.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.enabled: false
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.hosts: ["http://localhost:9200"]
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.interval: #<Java::OrgLogstashUtil::TimeValue:0x6d5f9907>
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.timeout_interval: #<Java::OrgLogstashUtil::TimeValue:0x69d0fac0>
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.username: "logstash_system"
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.ssl.verification_mode: "full"
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.ssl.cipher_suites: []
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.sniffing: false
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.pipeline.details.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.config.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] node.uuid: ""
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.geoip.downloader.endpoint: "https://geoip.elastic.co/v1/database"
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.geoip.downloader.poll.interval: #<Java::OrgLogstashUtil::TimeValue:0x52e2ea7a>
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.geoip.downloader.enabled: true
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.enabled: false
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.logstash.poll_interval: #<Java::OrgLogstashUtil::TimeValue:0x6d5a43c1>
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.pipeline.id: ["main"]
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.username: "logstash_system"
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.hosts: ["https://localhost:9200"]
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.ssl.cipher_suites: []
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.ssl.verification_mode: "full"
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.sniffing: false
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] --------------- Logstash Settings -------------------
[2024-10-23T10:47:36,640][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2024-10-23T10:47:36,682][DEBUG][logstash.agent           ] Initializing API WebServer {"api.http.host"=>"10.166.25.148", "api.http.port"=>9600..9600, "api.ssl.enabled"=>false, "api.auth.type"=>"none", "api.environment"=>"production"}
[2024-10-23T10:47:36,790][DEBUG][logstash.api.service     ] [api-service] start
[2024-10-23T10:47:36,868][DEBUG][logstash.agent           ] Setting up metric collection
[2024-10-23T10:47:36,912][DEBUG][logstash.instrument.periodicpoller.os] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,121][DEBUG][logstash.instrument.periodicpoller.jvm] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,178][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2024-10-23T10:47:37,182][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}
[2024-10-23T10:47:37,196][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,205][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,208][DEBUG][logstash.instrument.periodicpoller.flowrate] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,519][DEBUG][logstash.agent           ] Starting agent
[2024-10-23T10:47:37,530][DEBUG][logstash.agent           ] Starting API WebServer (puma)
[2024-10-23T10:47:37,586][DEBUG][logstash.config.source.local.configpathloader] Reading config file {:config_file=>"/opt/elk/logstash/config/ljr_testing.conf"}
[2024-10-23T10:47:37,596][DEBUG][logstash.agent           ] Trying to start API WebServer {:port=>9600, :ssl_enabled=>false}
[2024-10-23T10:47:37,657][DEBUG][logstash.agent           ] Converging pipelines state {:actions_count=>1}
[2024-10-23T10:47:37,669][DEBUG][logstash.agent           ] Executing action {:action=>LogStash::PipelineAction::Create/pipeline_id:main}
[2024-10-23T10:47:37,683][DEBUG][org.logstash.secret.store.SecretStoreFactory] Attempting to exists or secret store with implementation: org.logstash.secret.store.backend.JavaKeyStore
[2024-10-23T10:47:37,758][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2024-10-23T10:47:38,172][INFO ][org.reflections.Reflections] Reflections took 86 ms to scan 1 urls, producing 132 keys and 468 values
[2024-10-23T10:47:38,191][DEBUG][org.logstash.secret.store.SecretStoreFactory] Attempting to exists or secret store with implementation: org.logstash.secret.store.backend.JavaKeyStore
[2024-10-23T10:47:38,219][DEBUG][logstash.plugins.registry] On demand adding plugin to the registry {:name=>"generator", :type=>"input", :class=>LogStash::Inputs::Generator}
[2024-10-23T10:47:38,289][DEBUG][logstash.plugins.registry] On demand adding plugin to the registry {:name=>"json", :type=>"codec", :class=>LogStash::Codecs::JSON}
[2024-10-23T10:47:38,318][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@id = "json_3f7d0e36-fb51-4b2a-993f-4287698b2b63"
[2024-10-23T10:47:38,319][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@enable_metric = true
[2024-10-23T10:47:38,319][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@charset = "UTF-8"
[2024-10-23T10:47:38,346][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@count = 1
[2024-10-23T10:47:38,350][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@codec = <LogStash::Codecs::JSON id=>"json_3f7d0e36-fb51-4b2a-993f-4287698b2b63", enable_metric=>true, charset=>"UTF-8">
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@id = "ca89bb64fce3017f652b7d9e15198c67ecfbcea7435cac7e2316ed9f567a0820"
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@message = "{\"test\": \"data\"}"
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@enable_metric = true
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@add_field = {}
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@threads = 1
[2024-10-23T10:47:38,392][DEBUG][logstash.plugins.registry] On demand adding plugin to the registry {:name=>"http", :type=>"output", :class=>LogStash::Outputs::Http}
[2024-10-23T10:47:38,405][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@id = "json_523f376a-5ed8-44dc-a17c-ff9f1daad51a"
[2024-10-23T10:47:38,406][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@enable_metric = true
[2024-10-23T10:47:38,406][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@charset = "UTF-8"
[2024-10-23T10:47:38,413][ERROR][logstash.outputs.http    ] Unknown setting 'ssl_verification_mode' for http
[2024-10-23T10:47:38,420][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (ConfigurationError) Something is wrong with your configuration.", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.AbstractPipelineExt.initialize(AbstractPipelineExt.java:186)", "org.logstash.execution.AbstractPipelineExt$INVOKER$i$initialize.call(AbstractPipelineExt$INVOKER$i$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:847)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1319)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1292)", "org.jruby.ir.targets.indy.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:30)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/java_pipeline.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.RubyClass.newInstance(RubyClass.java:931)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.indy.InvokeSite.performIndirectCall(InvokeSite.java:735)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:657)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/pipeline_action/create.rb:49)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/pipeline_action/create.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.ir.targets.indy.InvokeSite.performIndirectCall(InvokeSite.java:735)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:657)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/agent.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:141)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:64)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:144)", "org.jruby.RubyProc.call(RubyProc.java:352)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)", "java.base/java.lang.Thread.run(Thread.java:840)"]}
[2024-10-23T10:47:38,448][DEBUG][logstash.agent           ] Shutting down all pipelines {:pipelines_count=>0}
[2024-10-23T10:47:38,449][DEBUG][logstash.agent           ] Converging pipelines state {:actions_count=>0}
[2024-10-23T10:47:38,452][DEBUG][logstash.instrument.periodicpoller.os] Stopping
[2024-10-23T10:47:38,457][DEBUG][logstash.instrument.periodicpoller.jvm] Stopping
[2024-10-23T10:47:38,458][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Stopping
[2024-10-23T10:47:38,458][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Stopping
[2024-10-23T10:47:38,458][DEBUG][logstash.instrument.periodicpoller.flowrate] Stopping
[2024-10-23T10:47:38,483][DEBUG][logstash.agent           ] API WebServer has stopped running
[2024-10-23T10:47:38,484][INFO ][logstash.runner          ] Logstash shut down.
[2024-10-23T10:47:38,493][FATAL][org.logstash.Logstash    ] Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:808) ~[jruby.jar:?]
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:767) ~[jruby.jar:?]
        at opt.elk.logstash.lib.bootstrap.environment.<main>(/opt/elk/logstash/lib/bootstrap/environment.rb:90) ~[?:?]
logstashhost:config #

Voila — the error!

[2024-10-23T10:47:38,413][ERROR][logstash.outputs.http    ] Unknown setting 'ssl_verification_mode' for http

And, it turns out, the ssl_verification_mode setting didn’t exist in the older version of logstash that’s running in prod. Just as well, it appears to be working and verifying the cert chain on the destination.

Kafka Consumer and Producer Example – Java

Producer

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.InputStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class SimpleKafkaProducer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaProducer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaHost = args[0];
        String kafkaPort = args[1];
        String kafkaServer = kafkaHost + ":" + kafkaPort;
        String topicName = "LJRJavaTest";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaProducer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);

            // Define the date-time formatter
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd 'at' HH:mm:ss");

            try {
                for (int i = 0; i < 10; i++) {
                    String key = "Key-" + i;
                    
                    // Get the current timestamp
                    String timestamp = LocalDateTime.now().format(formatter);
                    
                    // Include the timestamp, Kafka host, and port in the message value
                    String value = String.format("Message-%d (Test from %s on server %s:%s)", i, timestamp, kafkaHost, kafkaPort);
                    
                    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
                    producer.send(record);
                    System.out.println("Sent message: (" + key + ", " + value + ")");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaConsumer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaServer = args[0] + ":" + args[1];
        String topicName = "LJRJavaTest";
        String groupId = "test-consumer-group";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaConsumer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(topicName));

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Consumed message: (key: %s, value: %s, offset: %d, partition: %d)%n",
                                record.key(), record.value(), record.offset(), record.partition());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Keystore for Trusts

The cert on my Kafka server is signed by a CA that has both a root and intermediary signing certificate. To trust the chain, I need to populate rootCA.crt and intermediateCA.crt files with the correct base64 encoded public key and import both files into a new trust store:

keytool -importcert -file rootCA.crt -keystore kafka.client.truststore.jks -alias rootCA -storepass truststore-password

keytool -importcert -file intermediateCA.crt -keystore kafka.client.truststore.jks -alias intermediateCA -storepass truststore-password

Then list certs in the trust store to verify the import was successful:

keytool -list -keystore kafka.client.truststore.jks -storepass truststore-password

And place the jks keystore with the CA certs in project:

C:.
│   pom.xml
│
├───src
│   └───main
│       ├───java
│       │   └───com
│       │       └───example
│       │               SimpleKafkaProducer.java
│       │
│       └───resources
│               kafka.client.truststore.jks

Build and Run

Create a pom.xml for the build

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-producer-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Kafka client dependency -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Compiler plugin to ensure Java 8 compatibility -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- Exec Maven Plugin to run the Java class -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <mainClass>com.example.SimpleKafkaProducer</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Build using `mvn clean package`

Run the producer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaProducer” -Dexec.args=”kafkahost.example.net 9095″`

Run the consumer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaConsumer” -Dexec.args=”kafkahost.example.net 9095″`

K8s, resolv.conf, and ndots

I had a very strange problem when firewalld was used with nftables as the back end – rules configured properly in firewalld didn’t exist in the nftables rulesets so … didn’t exist. The most obvious failure in the k8s cluster was DNS resolution – requests to any nodes where nftables was the back end just timed out. In diagnosing the “dns queries time out” issue, I was watching the logs from the coredns pods. And I saw a lot of NXDOMAIN errors. Not because I had a hostname mistyped or anything – each pod was appending every domain in the resolv.conf search order before trying the actual hostname.

Quick solution was to update our hostnames to include the trailing dot for the root zone. It is not redishost.example.com but rather redishost.example.com.

But that didn’t explain why – I’ve got plenty of Linux boxes where there are some search domains in resolv.conf. Never once seen redishost.example.com.example.com come across the query log. There is a configuration that I’ve rarely used that is designed to speed up getting to the search list. You can configure ndots – the default is one, but you can set whatever positive integer you would like. Surely, they wouldn’t set ndots to something crazy high … right??

Oh, look –

Defaulted container "kafka-streams-app" out of: kafka-streams-app, filebeat
bash-4.4# cat /etc/resolv.conf
search kstreams.svc.cluster.local svc.cluster.local cluster.local mgmt.example.net dsys.example.net dnoc.example.net admin.example.net example.com
nameserver 10.6.0.5
options ndots:5

Yup, it’s right there in the source — and it’s been there for seven years:

What does this mean? Well, ndots is really just the number of dots in a hostname. If there are fewer than ndots dots, the resolver will try appending the search domains first and then try what you typed as a last resort. With one dot, that basically means a string with no dots will get the search domains appended. I guess if you go out and register a gTLD for your company – my hostname is literally just example. – then you’ll have a little inefficiency as the search domains are tried. But that’s a really edge case. With the k8s default, anything with fewer than five dots gets all of those search domains appended first.

So I need redishost.example.com? I see the following resolutions fail because there is no such hostname:

[INFO] 64.24.29.155:57014 - # "A IN redishost.example.com.svc.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:57028 - # "AAAA IN redishost.example.com.svc.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:56096 - # "A IN redishost.example.com.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:56193 - # "AAAA IN redishost.example.com.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:55001 - # "A IN redishost.example.com.mgmt.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:55194 - # "AAAA IN redishost.example.com.mgmt.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:54078 - # "A IN redishost.example.com.dsys.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:54127 - # "AAAA IN redishost.example.com.dsys.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:52061 - # "A IN redishost.example.com.dnoc.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:52182 - # "AAAA IN redishost.example.com.dnoc.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:51018 - # "A IN redishost.example.com.admin.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:51104 - # "AAAA IN redishost.example.com.admin.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:50052 - # "A IN redishost.example.com.example.com. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:50189 - # "AAAA IN redishost.example.com.example.com. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s

Wonderful — IPv6 is enabled and it’s trying AAAA records too. Finally it resolves redishost.example.com!

Luckily, there is a quick solution. Update the deployment YAML to include a custom ndots value – I like 1. I could see where someone might want two – something.else where I need svc.cluster.local appended, maybe I don’t want to waste time looking up something.else … I don’t want to do that. But I could see why something higher than one might be desirable in k8s. Not sure I buy it’s awesome enough to be the default, though!

Redeployed and instantly cut the DNS traffic by about 90% — and reduced application latency as each DNS call no longer has to have fourteen failures before the final success.