Tag: python

Python Script: Checking Certificate Expiry Dates

A long time ago, before the company I work for paid for an external SSL certificate management platform that does nice things like clue you into the fact your cert is expiring tomorrow night, we had an outage due to an expired certificate. One. I put together a perl script that parsed output from the openssl client and proactively alerted us of any pending expiration events.

That script went away quite some time ago — although I still have a copy running at home that ensures our SMTP and web server certs are current. This morning, though, our K8s environment fell over due to expired certificates. Digging into it, the certs are in the management platform (my first guess was self-signed certificates that wouldn’t have been included in the pending expiry notices) but were not delivered to those of us who actually manage the servers. Luckily it was our dev k8s environment and we now know the prod one will be expiring in a week or so. But I figured it was a good impetus to resurrect the old script. Unfortunately, none of the modules I used for date calculation were installed on our script server. Seemed like a hint that I should rewrite the script in Python. So … here is a quick Python script that gets certificates from hosts and calculates how long until the certificate expires. Add on a “if” statement and a notification function, and we shouldn’t come in to failed environments needing certificate renewals.

from cryptography import x509
from cryptography.hazmat.backends import default_backend
import socket
import ssl
from datetime import datetime, timedelta

# Dictionary of hosts:port combinations to check for expiry
dictHostsToCheck = {
"tableau.example.com": 443       # Tableau 
,"kibana.example.com": 5601      # ELK Kibana
,"elkmaster.example.com": 9200   # ELK Master
,"kafka.example.com": 9093       # Kafka server
}
for strHostName in dictHostsToCheck:
    iPort = dictHostsToCheck[strHostName]

    datetimeNow = datetime.utcnow()

    # create default context
    context = ssl.create_default_context()

    # Do not verify cert chain or hostname so we ensure we always check the certificate
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    with socket.create_connection((strHostName, iPort)) as sock:
        with context.wrap_socket(sock, server_hostname=strHostName) as ssock:
            objDERCert = ssock.getpeercert(True)
            objPEMCert = ssl.DER_cert_to_PEM_cert(objDERCert)
            objCertificate = x509.load_pem_x509_certificate(str.encode(objPEMCert),backend=default_backend())

            print(f"{strHostName}\t{iPort}\t{objCertificate.not_valid_after}\t{(objCertificate.not_valid_after - datetimeNow).days} days")


Quick Python Bingo Caller

I keep re-writing the same quick script to implement a bingo “caller” so Anya and I can play a game … figured I’d save it somewhere and save a few minutes next time! We use more words than squares so not every word is on both boards, but you can shorten the list to 24 and just put the words in different squares on each board.

import random
  
# initializing the word list -- 24 words for the 24 squares but we play with more words than squares!
wordList = ["Hypothesis", "Observation", "Theory", "Variable", "Cat"
, "Fun", "Science", "Happy", "Dog", "Thyme"
, "Rosemary", "Sage", "Time", "Run", "Pot"
, "TV", "Rogue", "Smile", "Black", "Rock"
, "Ash", "Kitten", "Love", "Bingo (but not BINGO like somebody won!)",
"Mom", "Dad", "Anya", "Wood", "Trail", "Tail", "Star"]

# shuffling word list
random.shuffle(wordList)

i = 0  
while i < len(wordList):
    print(wordList[i])
    i += 1
    x= input()

ELK Monitoring

We have a number of logstash servers gathering data from various filebeat sources. We’ve recently experienced a problem where the pipeline stops getting data for some of those sources. Not all — and restarting the non-functional filebeat source sends data for ten minutes or so. We were able to rectify the immediate problem by restarting our logstash services (IT troubleshooting step #1 — we restarted all of the filebeats and, when that didn’t help, moved on to restarting the logstashes)

But we need to have a way to ensure this isn’t happening — losing days of log data from some sources is really bad. So I put together a Python script to verify there’s something coming in from each of the filebeat sources.

pip install elasticsearch==7.13.4

#!/usr/bin/env python3
#-*- coding: utf-8 -*-
# Disable warnings that not verifying SSL trust isn't a good idea
import requests
requests.packages.urllib3.disable_warnings()

from elasticsearch import Elasticsearch
import time

# Modules for email alerting
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText


# Config variables
strSenderAddress = "devnull@example.com"
strRecipientAddress = "me@example.com"
strSMTPHostname = "mail.example.com"
iSMTPPort = 25

listSplunkRelayHosts = ['host293', 'host590', 'host591', 'host022', 'host014', 'host135']
iAgeThreashold = 3600 # Alert if last document is more than an hour old (3600 seconds)

strAlert = None

for strRelayHost in listSplunkRelayHosts:
	iCurrentUnixTimestamp = time.time()
	elastic_client = Elasticsearch("https://elasticsearchhost.example.com:9200", http_auth=('rouser','r0pAs5w0rD'), verify_certs=False)

	query_body = {
		"sort": {
			"@timestamp": {
				"order": "desc"
			}
		},
		"query": {
			"bool": {
				"must": {
					"term": {
						"host.hostname": strRelayHost
					}
				},
				"must_not": {
					"term": {
						"source": "/var/log/messages"
					}
				}
			}
		}
	}

	result = elastic_client.search(index="network_syslog*", body=query_body,size=1)
	all_hits = result['hits']['hits']

	iDocumentAge = None
	for num, doc in enumerate(all_hits):
		iDocumentAge =  (  (iCurrentUnixTimestamp*1000) - doc.get('sort')[0]) / 1000.0

	if iDocumentAge is not None:
		if iDocumentAge > iAgeThreashold:
			if strAlert is None:
				strAlert = f"<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>"
			else:
				strAlert = f"{strAlert}\n<tr><td>{strRelayHost}</td><td>{iDocumentAge}</td></tr>\n"
			print(f"PROBLEM - For {strRelayHost}, document age is {iDocumentAge} second(s)")
		else:
			print(f"GOOD - For {strRelayHost}, document age is {iDocumentAge} second(s)")
	else:
		print(f"PROBLEM - For {strRelayHost}, no recent record found")


if strAlert is not None:
	msg = MIMEMultipart('alternative')
	msg['Subject'] = "ELK Filebeat Alert"
	msg['From'] = strSenderAddress
	msg['To'] = strRecipientAddress

	strHTMLMessage = f"<html><body><table><tr><th>Server</th><th>Document Age</th></tr>{strAlert}</table></body></html>"
	strTextMessage = strAlert

	part1 = MIMEText(strTextMessage, 'plain')
	part2 = MIMEText(strHTMLMessage, 'html')

	msg.attach(part1)
	msg.attach(part2)

	s = smtplib.SMTP(strSMTPHostname)
	s.sendmail(strSenderAddress, strRecipientAddress, msg.as_string())
	s.quit()

Certbot — Plugin Not Found

I got a certificate expiry warning this morning — an oddity because I’ve had a cron task renewing our certificates for quite some time. Running the cron’d command manually … well, that would do it! The plug-in for my DNS registrar isn’t found.

Checking the registered plugins, well … it’s not there.

Except it’s there — running “pip install certbot-dns-porkbun” (and even trying pip3 just to make sure) tells me it’s already installed. Looking around for the files, this turns out to be one of those things that there’s obviously a right way to solve and a quick way to solve. For some reason, /usr/local/lib is not being searched for packages even though it’s included in my PYTHONPATH. The right thing to do is figure out why this is happening. Quick solution? Symlink the things into where they need to be

ln -s /usr/local/lib/python3.10/site-packages/certbot_dns_porkbun /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.10/site-packages/pkb_client /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.10/site-packages/filelock /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.7/site-packages/tldextract /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.10/site-packages/requests_file /usr/lib/python3.10/site-packages/

ln -s /usr/local/lib/python3.10/site-packages/certbot_dns_porkbun-0.2.1.dist-info /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.10/site-packages/filelock-3.6.0.dist-info /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.10/site-packages/pkb_client-1.2.dist-info /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.7/site-packages/tldextract-3.0.2.dist-info/ /usr/lib/python3.10/site-packages/
ln -s /usr/local/lib/python3.10/site-packages/requests_file-1.5.1.dist-info /usr/lib/python3.10/site-packages/

Voila, the plug-in exists again (and my cron task successfully renews the certificate)

Apache HTTPD Log File Analysis — Hits by IP Address

When we are decommissioning a website (or web server), I always watch the log files to ensure there aren’t a lot of people still accessing it. Sometimes there are and it’s worth tracking them down individually to clue them into the site’s eminent demise. Usually there aren’t, and it’s just a confirmation that our decommissioning efforts won’t be impactful.

This python script looks for IP addresses in the log files and outputs each IP & it’s access count per log file. Not great if you’ll see a bunch of IP addresses in the recorded URI string, but it’s good enough for 99% of our log data.

import os
import re
from collections import Counter

def parseApacheHTTPDLog(strLogFile):
    regexIPAddress = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'

    with open(strLogFile) as f:
        objLog = f.read()
        listIPAddresses = re.findall(regexIPAddress,objLog)
        counterAccessByIP = Counter(listIPAddresses)
        for strIP, iAccessCount in counterAccessByIP.items():
            print(f"{strLogFile}\t{str(strIP)}\t{str(iAccessCount)}")

if __name__ == '__main__':
    strLogDirectory = '/var/log/httpd/'
    for strFileName in os.listdir(strLogDirectory):
        if strFileName.__contains__("access_log"):
        #if strFileName.__contains__("hostname.example.com") and strFileName.__contains__("access_log"):
            parseApacheHTTPDLog(f"{strLogDirectory}{strFileName}")

Postgresql Through an SSH Tunnel in Python

Our production Postgresql servers have a fairly restrictive IP access control list — which means you cannot VPN in and query the server. We’ve been using DBeaver with an SSH tunnel to connect, but it’s a bit time consuming to run a query across all of the servers for monitoring and troubleshooting. To work around the restriction, I built a python script that uses an SSH tunnel to relay communications to the Postgresql servers.

import psycopg2
from sshtunnel import SSHTunnelForwarder

from config import strSSHRelayHost, iSSHRelayPort, strSSHRelayUser, strSSHAuthKeyFile, dictHost
# In the config.py, dictHost should contain the following information
# dictHost = {"host":"dbserver.example.com","port":5432,"database": "dbname", "username":"dbuser", "password":"S3cr3tPhr@5e"}

# Example query -- listing out locks 
sqlQuery = "WITH RECURSIVE l AS (  SELECT pid, locktype, mode, granted, ROW(locktype,database,relation,page,tuple,virtualxid,transactionid,classid,objid,objsubid) obj FROM pg_locks ), pairs AS ( SELECT w.pid waiter, l.pid locker, l.obj, l.mode FROM l w JOIN l ON l.obj IS NOT DISTINCT FROM w.obj AND l.locktype=w.locktype AND NOT l.pid=w.pid AND l.granted  WHERE NOT w.granted ), tree AS ( SELECT l.locker pid, l.locker root, NULL::record obj, NULL AS mode, 0 lvl, locker::text path, array_agg(l.locker) OVER () all_pids FROM ( SELECT DISTINCT locker FROM pairs l WHERE NOT EXISTS (SELECT 1 FROM pairs WHERE waiter=l.locker) ) l  UNION ALL  SELECT w.waiter pid, tree.root, w.obj, w.mode, tree.lvl+1, tree.path||'.'||w.waiter, all_pids || array_agg(w.waiter) OVER () FROM tree JOIN pairs w ON tree.pid=w.locker AND NOT w.waiter = ANY ( all_pids )) SELECT (clock_timestamp() - a.xact_start)::interval(3) AS ts_age, replace(a.state, 'idle in transaction', 'idletx') state, (clock_timestamp() - state_change)::interval(3) AS change_age, a.datname,tree.pid,a.usename,a.client_addr,lvl, (SELECT count(*) FROM tree p WHERE p.path ~ ('^'||tree.path) AND NOT p.path=tree.path) blocked, repeat(' .', lvl)||' '||left(regexp_replace(query, 's+', ' ', 'g'),100) query FROM tree JOIN pg_stat_activity a USING (pid) ORDER BY path"

with SSHTunnelForwarder( (strSSHRelayHost, iSSHRelayPort), ssh_username=strSSHRelayUser, ssh_private_key=strSSHAuthKeyFile, local_bind_address=("localhost",55432), remote_bind_address=(dictHost.get('host'), dictHost.get('port'))) as server:
# Alternately, you can use password authentication
#with SSHTunnelForwarder( (strSSHRelayHost, iSSHRelayPort), ssh_username=strSSHRelayUser, ssh_password=strSSHRelayUserPass, local_bind_address=("localhost",55432), remote_bind_address=(dictHost.get('host'), dictHost.get('port'))) as server:
    if server is not None:
        server.start()
        server._check_is_started()
        #print("Tunnel server connected")
        params = {'database': dictHost.get('database'),'user': dictHost.get('username'),'password': dictHost.get('password'), 'host': server.local_bind_host, 'port': server.local_bind_port}
        conn = psycopg2.connect(**params)
        cursor = conn.cursor()
        cursor.execute(sqlQuery)
        column_names = [desc[0] for desc in cursor.description]
        print(column_names)
        rows = cursor.fetchall()
        for row in rows:
            print(row)
        cursor.close()
        if conn is not None:
            conn.close()
        server.stop()
    else:
        print("Unable to establish SSH tunnel")

Python List Joining

I’ve seen lists joined with a delimiter like this before:

strDelimiter = “\n”
strDelimiter.join(listOfStuff)

But it seems silly to allocate memory just for the delimiter … not a big deal from a resource perspective, and probably using the delimiter variable is more comprehensible in the future … but I’ve always wondered if you couldn’t just use a static string with the join method. Turns out you can —

msgContent.attach(MIMEText(“\n”.join(listOfStuff), ‘plain’))

 

Porkbun DDNS API

I’ve been working on a script that updates our host names in Porkbun, but the script had a problem with the example.com type A records. Updating host.example.com worked fine, but example.com became example.com.example.com

Now, in a Bind zone, you just fully qualify the record by post-pending the implied root dot (i.e. instead of “example.com”, you use “example.com.”, but Porkbun didn’t understand a fully qualified record. You cannot say the name is null (or “”). You cannot say the name is “example.com” or “example.com.”

In what I hope is my final iteration of the script, I now identify cases where the name matches the zone and don’t include the name parameter in the JSON data. Otherwise I include the ‘name’ as the short hostname (i.e. the fully qualified hostname minus the zone name). This appears to be working properly, so (fingers crossed, knock on wood, and all that) our ‘stuff’ won’t go offline next time our IP address changes.

Scraping Google Calendar Data, take 2

I had written a script that uses the Google Calendar API to pull records from the Township’s calendar. Unfortunately, the pickle / token / whatever has started expiring every week. Which means manual intervention is required for my automated process to run. Which made me wonder … for a private calendar, it makes sense to use the API. I need to authenticate in order to read my private appointments. I can get the token to last for a year, but then I’ve got to go through whatever to be a real / approved application. Which is a lot of effort for something that I’m using to read my own data. Which made me wonder why I need to authenticate to read events on a public calendar!?

Turns out I don’t. I just need to use the iCal feed for the calendar. Using requests to pull data from a URL and then parsing out the iCal data is simple enough. So now I have a script that pulls the iCal file to populate my Exchange calendar. Since it’s unauthenticated, I shouldn’t have to do anything to get it working again next week 🙂

Python Selenium Headed v/s Headless

We are automating a file download — it works fine when running headed, but headless execution doesn’t manage to log in. Proxying the requests through Fiddler show that several JavaScript pages download unexpected content.

I’ve added a user-agent to the request, but I’ve noticed that the ChromeDriver also sets sec-ch-* headers … I expect the null sec-ch-ua causes the web server to refuse our request. I don’t see any issues in the ChromeDriver repo for the sec-ch-* headers … and I don’t really want to walk back versions until I find one that doesn’t try setting this header value. Firefox’s GeckoDriver, though, doesn’t set them … so I moved the script over to use Firefox instead of Chrome and am able to download the file.

Headed run:

GET /o/telx-theme/css/A.bootstrap.css+slick,,_slick.css,Mcc.JKqfH-juDS.css.pagespeed.cf.ZO22sEGAvO.css HTTP/1.1
Host: example.com
Connection: keep-alive
sec-ch-ua: “Chromium”;v=”92″, ” Not A;Brand”;v=”99″, “Google Chrome”;v=”92″
sec-ch-ua-mobile: ?0
User-Agent: “Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36
Accept: text/css,*/*;q=0.1
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: no-cors
Sec-Fetch-Dest: style
Referer: https://example.com/web/guest/login
Accept-Encoding: gzip, deflate, br
Accept-Language: en-US,en;q=0.9
Cookie: JSESSIONID=0330C2C988F31010790779A126EA6F55.node1; COOKIE_SUPPORT=true; GUEST_LANGUAGE_ID=en_US; AWSELB=039B496118DDEAD697B2B51C93852940763289C324F9E7C7223F953330AF5506573D13C4D5599541FD3CADB645303C1CAEB6D26992826965DA6C8BEDBDE9C297AE26CD76ED; AWSELBCORS=039B496118DDEAD697B2B51C93852940763289C324F9E7C7223F953330AF5506573D13C4D5599541FD3CADB645303C1CAEB6D26992826965DA6C8BEDBDE9C297AE26CD76ED; TS0194d418=01092b79076749232d762d2a6c232e015d103453fbeda3826bd3d20e1d937f5a90cabe03655c97a79198969eea539e4c2e7fc426216092c78ccda85763d52300ce05672704e45b4fc25516d2c24279656db7b0242f7c8b9c8bfed35b7608afb0c54bbc33d489f431059d048094c1e707a20d28031885ca6c61f81613ac299044f0c2b9ba36

 

Headless run:

GET /o/telx-theme/css/A.bootstrap.css+slick,,_slick.css,Mcc.JKqfH-juDS.css.pagespeed.cf.ZO22sEGAvO.css HTTP/1.1
Host: example.com
Connection: keep-alive
sec-ch-ua:
sec-ch-ua-mobile: ?0
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36
Accept: text/css,*/*;q=0.1
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: no-cors
Sec-Fetch-Dest: style
Referer: https://example.com/web/guest/login
Accept-Encoding: gzip, deflate, br
Accept-Language: en-US
Cookie: JSESSIONID=F4293ECE33B134CC368C0E62D6923B48.node1; COOKIE_SUPPORT=true; GUEST_LANGUAGE_ID=en_US; AWSELB=039B496118DDEAD697B2B51C93852940763289C324A5AB24AE470C70960B5319A93C181302D27B4C9425A4AA05795334C4404D491FBCC8E6A9B809746A802EAC2EC8C2FBFA; AWSELBCORS=039B496118DDEAD697B2B51C93852940763289C324A5AB24AE470C70960B5319A93C181302D27B4C9425A4AA05795334C4404D491FBCC8E6A9B809746A802EAC2EC8C2FBFA; TS0194d418=01ba3b12a4ef612e3839114024b5082fd19d56b17293c914ff867740ad37ae362e385934695ad3fc275074bfd1ee24c7d1591b146ad39d153a8758aecc8eb44d374dc1c689e540deca9566f723df65e9f5ad26551e25bacd5df14e4e6104a91a0ecdb59a65176bd5a0ebed284847e0e6618a05ed1d9db6b544e195d8e1f41164e7199a6596