This script creates very basic roles with no extra capabilities and restricts the role to viewing only the indicated source category’s data.
################################################################################
# This script reads an Excel file containing role data, then uses the Sumo Logic
# API to create roles based on the data. It checks each row for a role name and
# uses the source category to set data filters. The script requires a config.py
# file with access credentials.
################################################################################
import pandas as pd
import requests
import json
from config import access_id, access_key # Import credentials from config.py
# Path to Excel file
excel_file_path = 'NewRoles.xlsx'
# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'
################################################################################
# Function to create a new role using the Sumo Logic API.
#
# Args:
# role_name (str): The name of the role to create.
# role_description (str): The description of the role.
# source_category (str): The source category to restrict the role to.
#
# Returns:
# None. Prints the status of the API call.
################################################################################
def create_role(role_name, role_description, source_category):
url = f'{base_url}/roles'
# Role payload
data_filter = f'_sourceCategory={source_category}'
payload = {
'name': role_name,
'description': role_description,
'logAnalyticsDataFilter': data_filter,
'auditDataFilter': data_filter,
'securityDataFilter': data_filter
}
# Headers for the request
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
# Debugging line
print(f"Attempting to create role: '{role_name}' with description: '{role_description}' and filter: '{data_filter}'")
# Make the POST request to create a new role
response = requests.post(url, auth=(access_id, access_key), headers=headers, data=json.dumps(payload))
# Check the response
if response.status_code == 201:
print(f'Role {role_name} created successfully.')
else:
print(f'Failed to create role {role_name}. Status Code: {response.status_code}')
print('Response:', response.json())
################################################################################
# Reads an Excel file and processes each row to extract role information and
# create roles using the Sumo Logic API.
#
# Args:
# file_path (str): The path to the Excel file containing role data.
#
# Returns:
# None. Processes the file and attempts to create roles based on the data.
################################################################################
def process_excel(file_path):
# Load the spreadsheet
df = pd.read_excel(file_path, engine='openpyxl')
# Print column names to help debug and find correct ones
print("Columns found in Excel:", df.columns)
# Iterate over each row in the DataFrame
for index, row in df.iterrows():
role_name = row['Role Name'] # Correct column name for role name
source_category = row['Source Category'] # Correct column name for source category to which role is restricted
# Only create a role if the role name is not null
if pd.notnull(role_name):
role_description = f'Provides access to source category {source_category}'
create_role(role_name, role_description, source_category)
# Process the Excel file
process_excel(excel_file_path)
I am working with a new application that doesn’t seem to like when a person has multiple roles assigned to them … however, I first need to prove that is the problem. Luckily, your browser gets the SAML response and you can actually see the Role entitlements that are being sent. Just need to parse them out of the big 80 meg file that a simple “go here and log on” generates!
To gather data to be parsed, open the Dev Tools for the browser tab. Click the settings gear icon and select “Persist Logs”. Reproduce the scenario – navigate to the site, log in. Then save the dev tools session as a HAR file. The following Python script will analyze the file, extract any SAML response tokens, and print them in a human-readable format.
################################################################################
# This script reads a HAR file, identifies HTTP requests and responses containing
# SAML tokens, and decodes "SAMLResponse" values.
#
# The decoded SAML assertions are printed out for inspection in a readable format.
#
# Usage:
# - Update the str_har_file_path with your HAR file
################################################################################
# Editable Variables
str_har_file_path = 'SumoLogin.har'
# Imports
import json
import base64
import urllib.parse
from xml.dom.minidom import parseString
################################################################################
# This function decodes SAML responses found within the HAR capture
# Args:
# saml_response_encoded(str): URL encoded, base-64 encoded SAML response
# Returns:
# string: decoded string
################################################################################
def decode_saml_response(saml_response_encoded):
url_decoded = urllib.parse.unquote(saml_response_encoded)
base64_decoded = base64.b64decode(url_decoded).decode('utf-8')
return base64_decoded
################################################################################
# This function finds and decodes SAML tokens from HAR entries.
#
# Args:
# entries(list): A list of HTTP request and response entries from a HAR file.
#
# Returns:
# list: List of decoded SAML assertion response strings.
################################################################################
def find_saml_tokens(entries):
saml_tokens = []
for entry in entries:
request = entry['request']
response = entry['response']
if request['method'] == 'POST':
request_body = request.get('postData', {}).get('text', '')
if 'SAMLResponse=' in request_body:
saml_response_encoded = request_body.split('SAMLResponse=')[1].split('&')[0]
saml_tokens.append(decode_saml_response(saml_response_encoded))
response_body = response.get('content', {}).get('text', '')
if response.get('content', {}).get('encoding') == 'base64':
response_body = base64.b64decode(response_body).decode('utf-8', errors='ignore')
if 'SAMLResponse=' in response_body:
saml_response_encoded = response_body.split('SAMLResponse=')[1].split('&')[0]
saml_tokens.append(decode_saml_response(saml_response_encoded))
return saml_tokens
################################################################################
# This function converts XML string to an XML dom object formatted with
# multiple lines with heirarchital indentations
#
# Args:
# xml_string (str): The XML string to be pretty-printed.
#
# Returns:
# dom: A pretty-printed version of the XML string.
################################################################################
def pretty_print_xml(xml_string):
dom = parseString(xml_string)
return dom.toprettyxml(indent=" ")
# Load HAR file with UTF-8 encoding
with open(str_har_file_path, 'r', encoding='utf-8') as file:
har_data = json.load(file)
entries = har_data['log']['entries']
saml_tokens = find_saml_tokens(entries)
for token in saml_tokens:
print("Decoded SAML Token:")
print(pretty_print_xml(token))
print('-' * 80)
This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation.
import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
from config import access_id, access_key # Import your credentials from config.py
# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'
def get_all_collectors():
"""Retrieve all collectors with pagination support."""
collectors = []
limit = 1000 # Adjust as needed; check API docs for max limit
offset = 0
while True:
url = f'{base_url}/collectors?limit={limit}&offset={offset}'
response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
if response.status_code == 200:
result = response.json()
collectors.extend(result.get('collectors', []))
if len(result.get('collectors', [])) < limit:
break # Exit the loop if we received fewer than the limit, meaning it's the last page
offset += limit
else:
print('Error fetching collectors:', response.status_code, response.text)
break
return collectors
def get_sources(collector_id):
"""Retrieve sources for a specific collector."""
url = f'{base_url}/collectors/{collector_id}/sources'
response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
if response.status_code == 200:
sources = response.json().get('sources', [])
# print(f"Log Sources for collector {collector_id}: {sources}")
return sources
else:
print(f'Error fetching sources for collector {collector_id}:', response.status_code, response.text)
return []
def check_required_logs(sources):
"""Check if the required logs are present in the sources."""
required_logs = {
'_security_events': False,
'_linux_system_events': False,
'cron_logs': False,
'dnf_rpm_logs': False
}
for source in sources:
if source['sourceType'] == 'LocalFile':
name = source.get('name', '')
for key in required_logs.keys():
if name.endswith(key):
required_logs[key] = True
# Determine missing logs
missing_logs = {log: "MISSING" if not present else "" for log, present in required_logs.items()}
return missing_logs
# Main execution
if __name__ == "__main__":
collectors = get_all_collectors()
report_data = []
for collector in collectors:
# Check if the collector's osName is 'Linux'
if collector.get('osName') == 'Linux':
collector_id = collector['id']
collector_name = collector['name']
print(f"Checking Linux Collector: ID: {collector_id}, Name: {collector_name}")
sources = get_sources(collector_id)
missing_logs = check_required_logs(sources)
if any(missing_logs.values()):
report_entry = {
"Collector Name": collector_name,
"_security_events": missing_logs['_security_events'],
"_linux_system_events": missing_logs['_linux_system_events'],
"cron_logs": missing_logs['cron_logs'],
"dnf_rpm_logs": missing_logs['dnf_rpm_logs']
}
# print(f"Missing logs for collector {collector_name}: {report_entry}")
report_data.append(report_entry)
# Create a DataFrame and write to Excel
df = pd.DataFrame(report_data, columns=[
"Collector Name", "_security_events", "_linux_system_events", "cron_logs", "dnf_rpm_logs"
])
# Generate the filename with current date and time
if not df.empty:
timestamp = pd.Timestamp.now().strftime("%Y%m%d-%H%M")
output_file = f"{timestamp}-missing_logs_report.xlsx"
df.to_excel(output_file, index=False)
print(f"\nData written to {output_file}")
else:
print("\nAll collectors have the required logs.")
This is my base script for using the Sumo Logic API to query logs and analyze data. This particular script finds hosts sending syslog data successfully through our firewall, looks who owns the netblock (they weren’t all internal!), and checks our configuration management database (cmdb) to see if we have a host registered with the destination IP address of the syslog traffic.
import requests
from requests.auth import HTTPBasicAuth
import time
from collections import defaultdict
import cx_Oracle
import pandas as pd
import ipaddress
from datetime import datetime
from ipwhois import IPWhois
from ipwhois.exceptions import IPDefinedError
# Import credentials from a config file
from config import access_id, access_key, oracle_username, oracle_password
# Initialize Oracle Client
cx_Oracle.init_oracle_client(lib_dir=r"C:\Oracle\instantclient_21_15")
oracle_dsn = cx_Oracle.makedsn('cmdb_db.example.com', 1521, service_name='cmdb_db.example.com')
# Function to query Oracle database
def query_oracle_cmdb(strIPAddress):
with cx_Oracle.connect(user=oracle_username, password=oracle_password, dsn=oracle_dsn) as connection:
cursor = connection.cursor()
query = """
SELECT HOSTNAME, FRIENDLYNAME, STATUS, COLLECTIONTIME, RETIREDBYDISPLAYNAME,
RETIREDDATETIME, SERVERAPPSUPPORTTEAM, SERVERENVIRONMENT
FROM NBIREPORT.CHERWELL_CMDBDATA_FULL
WHERE IPADDRESS = :ipaddy
"""
cursor.execute(query, [strIPAddress])
result = cursor.fetchone()
cursor.close()
return result if result else ("",) * 8
# Function to determine IP ownership
def get_ip_ownership(ip):
# Define internal IP ranges
internal_networks = [
ipaddress.IPv4Network("10.0.0.0/8"),
ipaddress.IPv4Network("172.16.0.0/12"),
ipaddress.IPv4Network("192.168.0.0/16")
]
# Check if the IP is internal
ip_obj = ipaddress.IPv4Address(ip)
if any(ip_obj in network for network in internal_networks):
return "INTERNAL"
# For external IPs, use ipwhois to get ownership info
try:
obj = IPWhois(ip)
result = obj.lookup_rdap(depth=1)
ownership = result['network']['name']
except IPDefinedError:
ownership = "Reserved IP"
except Exception as e:
print(f"Error looking up IP {ip}: {e}")
ownership = "UNKNOWN"
return ownership
# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'
# Define the search query
search_query = '''
(dpt=514)
AND _sourcecategory = "observe/perimeter/firewall/logs"
| where !(act = "deny")
| where !(act = "timeout")
| where !(act = "ip-conn")
| where (proto=17 or proto=6)
| count dst, act
'''
# Function to create and manage search jobs
def run_search_job(start_time, end_time):
search_job_data = {
'query': search_query,
'from': start_time,
'to': end_time,
'timeZone': 'UTC'
}
# Create a search job
search_job_url = f'{base_url}/search/jobs'
response = requests.post(
search_job_url,
auth=HTTPBasicAuth(access_id, access_key),
json=search_job_data
)
if response.status_code != 202:
print('Error starting search job:', response.status_code, response.text)
return None
# Get the search job ID
job_id = response.json()['id']
print('Search Job ID:', job_id)
# Poll for the search job status
job_status_url = f'{search_job_url}/{job_id}'
while True:
response = requests.get(job_status_url, auth=HTTPBasicAuth(access_id, access_key))
status = response.json().get('state', None)
print('Search Job Status:', status)
if status in ['DONE GATHERING RESULTS', 'CANCELLED', 'FAILED']:
break
time.sleep(5) # Reasonable delay to prevent overwhelming the server
return job_id if status == 'DONE GATHERING RESULTS' else None
# Function to retrieve results of a search job
def retrieve_results(job_id):
dst_counts = defaultdict(int)
results_url = f'{base_url}/search/jobs/{job_id}/messages'
offset = 0
limit = 1000
while True:
params = {'offset': offset, 'limit': limit}
try:
response = requests.get(results_url, auth=HTTPBasicAuth(access_id, access_key), params=params, timeout=30)
if response.status_code == 200:
results = response.json()
messages = results.get('messages', [])
for message in messages:
message_map = message['map']
dst = message_map.get('dst')
if dst:
dst_counts[dst] += 1
if len(messages) < limit:
break
offset += limit
else:
print('Error retrieving results:', response.status_code, response.text)
break
except requests.exceptions.RequestException as e:
print(f'Error during request: {e}')
time.sleep(5)
continue
return dst_counts
# Main execution
if __name__ == "__main__":
# Prompt for the start date
start_date_input = input("Enter the start date (YYYY-MM-DD): ")
try:
start_time = datetime.strptime(start_date_input, "%Y-%m-%d").strftime("%Y-%m-%dT00:00:00")
except ValueError:
print("Invalid date format. Please enter the date in YYYY-MM-DD format.")
exit()
# Use today's date as the end date
end_time = datetime.now().strftime("%Y-%m-%dT00:00:00")
# Create a search job
job_id = run_search_job(start_time, end_time)
if job_id:
# Retrieve and process results
dst_counts = retrieve_results(job_id)
# Prepare data for Excel
data_for_excel = []
print("\nDestination IP Counts and Oracle Data:")
for dst, count in dst_counts.items():
oracle_data = query_oracle_cmdb(dst)
ownership = get_ip_ownership(dst)
# Use only Oracle data columns
combined_data = (dst, count, ownership) + oracle_data
data_for_excel.append(combined_data)
print(combined_data)
# Create a DataFrame and write to Excel
df = pd.DataFrame(data_for_excel, columns=[
"IP Address", "Occurrence Count", "Ownership",
"CMDB_Hostname", "CMDB_Friendly Name", "CMDB_Status", "CMDB_Collection Time",
"CMDB_Retired By", "CMDB_Retired Date", "CMDB_Support Team", "CMDB_Environment"
])
# Generate the filename with current date and time
timestamp = datetime.now().strftime("%Y%m%d-%H%M")
output_file = f"{timestamp}-sumo_oracle_data.xlsx"
df.to_excel(output_file, index=False)
print(f"\nData written to {output_file}")
else:
print('Search job did not complete successfully.')
I put together a quick program that creates a “fancy” QR code to a specified URL with the specified color and drops the desired “logo” file into the center of the code.
import qrcode
from PIL import Image
def generate_qr_code_with_custom_color_and_logo():
url = input("Please enter the URL for which you want to generate a QR code: ")
rgb_input = input("Please enter the RGB values for the QR code color (e.g. 0,0,0 for black): ")
try:
rgb_color = tuple(map(int, rgb_input.split(',')))
if len(rgb_color) != 3 or not all(0 <= n <= 255 for n in rgb_color):
raise ValueError("Invalid RGB color value.")
except Exception:
print("Error parsing RGB values. Please make sure to enter three integers separated by commas.")
return
qr = qrcode.QRCode(
version=1, # controls the size of the QR Code
error_correction=qrcode.constants.ERROR_CORRECT_H, # high error correction for image insertion
box_size=10,
border=4,
)
qr.add_data(url)
qr.make(fit=True)
# Generate the QR code with the specified RGB color
img = qr.make_image(fill_color=rgb_color, back_color="white")
# Load the logo image
logo_image_path = input("Please enter the logo for the center of this QR code: ")
try:
logo = Image.open(logo_image_path)
except FileNotFoundError:
print(f"Logo image file '{logo_image_path}' not found. Proceeding without a logo.")
img.save("qr_code_with_custom_color.png")
print("QR code has been generated and saved as 'qr_code_with_custom_color.png'.")
return
# Resize the logo image to fit in the QR code
img_width, img_height = img.size
logo_size = int(img_width * 0.2) # The logo will take up 20% of the QR code width
logo = logo.resize((logo_size, logo_size), Image.ANTIALIAS)
position = ((img_width - logo_size) // 2, (img_height - logo_size) // 2)
img.paste(logo, position, mask=logo.convert("RGBA"))
img.save("qr_code_with_custom_color_and_logo.png")
print("QR code with a custom color and a logo image has been generated and saved as 'qr_code_with_custom_color_and_logo.png'.")
if __name__ == "__main__":
generate_qr_code_with_custom_color_and_logo()
I write a lot of things down to save myself time the next time I need to do the same sort of thing — and publish this to the Internet in case I can save someone else time too. But this one is so specific, I’m not sure it’s an “ever going to encounter this again” sort of thing. Just in case, though — I have device data being stored in redis — because the device doesn’t know its throughput values, you need the last time and last value paired with the current device metrics to calculate throughput. OK. But, sporadically, the cached data is updated insomuch as a new record is posted with a new timestamp. But the actual values, other than timestamp, remain unchanged. With millions of interfaces, it’s challenging to identify these situations by spot-checking the visualizations. Instead, I need to monitor redis and identify when the tstamp is updated but no other values change.
import redis
import time
import re
import json
import os
# Configuration
redis_host = 'redishost.example.net'
redis_port = 6379
redis_password = 'P@5sw0rDG03sH3r3' # Replace with your Redis password
pattern = re.compile(r'INTERFACE_RAW_STATS_hostname\d\d\d\d_\d+_\d+')
output_file = 'changed_records.json'
# Connect to Redis
client = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)
# Dictionary to track records
records = {}
matching_keys = []
def get_matching_keys():
"""
Retrieve keys from Redis matching the specified pattern.
Returns:
list: A list of keys that match the pattern.
"""
all_keys = client.keys()
matching_keys = [key for key in all_keys if pattern.match(key)]
return matching_keys
def process_keys():
"""
Process Redis keys to track changes in data.
Retrieves keys matching the pattern, gets their data using HGETALL,
and tracks changes. If only the 'tstamp' field has changed and all
other fields remain the same, the record is written to a file.
"""
global records
i = 0
for key in matching_keys:
i += 1
data = client.hgetall(key)
if i == 1 or i % 1000 == 0:
print(f"Processed {i} records")
if not data:
continue
collector_name = data.get('collectorName')
node_id = data.get('nodeId')
if_index = data.get('ifIndex')
tstamp = data.get('tstamp')
if not collector_name or not node_id or not if_index or not tstamp:
continue
unique_key = f"{collector_name}_{node_id}_{if_index}"
if unique_key in records:
previous_data = records[unique_key]
if previous_data['tstamp'] != tstamp:
# Check if all other values are the same
if all(data[k] == previous_data[k] for k in data if k != 'tstamp'):
print(f"***** Record changed: {json.dumps(data, indent=2)} *****")
write_to_file(data)
records[unique_key] = data # Update the record
else:
records[unique_key] = data
def write_to_file(data):
"""
Write the given data to a file.
Args:
data (dict): The data to write to the file.
"""
with open(output_file, 'a') as file:
file.write(json.dumps(data) + '\n')
if __name__ == "__main__":
# Ensure the output file is empty at the start
if os.path.exists(output_file):
os.remove(output_file)
# Retrieve the list of matching keys once
matching_keys = get_matching_keys()
while True:
process_keys()
print("Sleeping ... ")
time.sleep(300) # Sleep for 5 minutes
So, I know that Redis should be a data cache that can be repopulated … but we use it to calculate deltas (what was the value last time) … so repopulating the information makes the first half hour or so of calculations rather slow as the application tries redis, gets nothing, and fails back to a database query. Then we get a backlog of data to churn through, and it would just be better if the Redis cache hadn’t gone away in the first place. And if you own both servers and the files are in the same format, you could just copy the cache db from the old server to the new one. But … when you cannot just copy the file and you would really prefer the data not disappear and need to be repopulated … there’s a script for that! This python script reads all of the data from the “old” server and populates it into the “new” server.
import redis
def migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password):
# Connect to the source Redis server
source_client = redis.StrictRedis(host=redis_source_host, port=redis_source_port, db=redis_source_db, password=redis_source_password)
# Connect to the destination Redis server
dest_client = redis.StrictRedis(host=redis_dest_host, port=redis_dest_port, db=redis_dest_db, password=redis_dest_password)
# Fetch all keys from the source Redis
keys = source_client.keys('*')
for key in keys:
# Get the type of the key
key_type = source_client.type(key).decode('utf-8')
if key_type == 'string':
value = source_client.get(key)
print("Setting string value in dest")
dest_client.set(key, value)
elif key_type == 'list':
values = source_client.lrange(key, 0, -1)
print("Setting list value in dest")
dest_client.delete(key) # Ensure the list is empty before pushing
for value in values:
dest_client.rpush(key, value)
elif key_type == 'set':
values = source_client.smembers(key)
print("Setting set value in dest")
dest_client.delete(key) # Ensure the set is empty before pushing
for value in values:
dest_client.sadd(key, value)
elif key_type == 'zset':
values = source_client.zrange(key, 0, -1, withscores=True)
print("Setting zset value in dest")
dest_client.delete(key) # Ensure the zset is empty before pushing
for value, score in values:
dest_client.zadd(key, {value: score})
elif key_type == 'hash':
values = source_client.hgetall(key)
print("Setting hash value in dest")
dest_client.delete(key) # Ensure the hash is empty before pushing
dest_client.hmset(key, values)
print("Data migration completed.")
if __name__ == "__main__":
# Source Redis server details
redis_source_host = 'oldredis.example.com'
redis_source_port = 6379
redis_source_db = 0
redis_source_password = 'SourceRedisPassword'
# Destination Redis server details
redis_dest_host = 'newredis.example.com'
redis_dest_port = 6379
redis_dest_db = 0
redis_dest_password = 'DestRedisPassword'
# Migrate data
migrate_data(redis_source_host, redis_source_port, redis_source_db, redis_source_password,
redis_dest_host, redis_dest_port, redis_dest_db, redis_dest_password)
As communication between development and production platforms is limited for security and data integrity reasons, this creates a challenge when testing changes in development: we cannot access “real world” data with which to perform tests. Having a limited set of data in development means testing may not illuminate issues that occur at high volume or on a large scale.
Solution
While limiting communication between the prod and dev systems is reasonable, it would be beneficial to be able to replay production-like data within our development systems for testing purposes. While it is not cost effective to buy large network devices with thousands of interfaces for testing, the Python module snmpsim provides “canned responses” that simulate real devise on the production network. For simplicity, I have a bash script that launches the SNMP responder.
This responder will replay data stored in the directory /opt/snmp/snmpsim/data – any file ending in snmprec will be included in the response, and the filename prior to .snmprec is the community string to access the response data. E.G. public.snmprec is the data for the public community string
The response files are in the format OID|TAG|VALUE where OID is the OID number of the SNMP object, TAG is an integer defined at https://pypi.org/project/snmpsim/0.2.3/
Valid tag values and their corresponding ASN.1/SNMP types are:
ASN.1/SNMP Type
Tag Value
Integer32
2
Octet String
4
Null
5
Object Identifier
6
IP Address
64
Counter32
65
Gauge32
66
Time Ticks
67
Opaque
68
Counter65
70
And the value is the data to be returned for the OID object. As an example:
1.3.6.1.2.1.1.3.0|67|2293092270
1.3.6.1.2.1.1.3.0 is the sysUpTime, the data type is TimeTicks, and the system up time is 2293092270 hundredths of a second. Or 6375 hours, 20 minutes, and 24 seconds.
Items within the response file need to be listed in ascending order.
Generating Response Data
There are two methods for creating the data provided to an SNMP GET request. A response file can be created manually, populated with OID objects that should be included in the response as well as sample data. Alternatively, a network trace can be gathered from the production network and parsed to create the response file.
Manually Generated Response File
While you can literally type data into a response file, but it is far easier to use a script to generate sample data. /opt/snmp/snmpsim/_genData.py is an example of creating a response file for about 1,000 interfaces
from datetime import datetime
import random
iRangeMax = 1000
dictTags = {'Integer': '2', 'OctetString': '4', 'NULL': '5', 'ObjectIdentifier': '6', 'IPAddress': '64', 'Counter32': '65', 'Gauge32': '66', 'TimeTicks': '67', 'Opaque': '68','Counter64': '70'} # Valid tags per https://pypi.org/project/snmpsim/0.2.3/
today = datetime.now()
iftable_snmp_objects = [
('1.3.6.1.2.1.2.2.1.1', 'Integer', lambda i: i), # ifIndex
('1.3.6.1.2.1.2.2.1.2', 'OctetString', lambda i: f"SampleInterface{i}"), # ifDescr
('1.3.6.1.2.1.2.2.1.3', 'Integer', lambda i: 6), # ifType
('1.3.6.1.2.1.2.2.1.4', 'Integer', lambda i: 1500), # ifMtu
('1.3.6.1.2.1.2.2.1.5', 'Gauge32', lambda i: 100000000), # ifSpeed
('1.3.6.1.2.1.2.2.1.6', 'OctetString', lambda i: f"00:00:00:00:{format(i, '02x')[:2]}:{format(i, '02x')[-2:]}"), # ifPhysAddress
('1.3.6.1.2.1.2.2.1.7', 'Integer', lambda i: 1), # ifAdminStatus
('1.3.6.1.2.1.2.2.1.8', 'Integer', lambda i: 1), # ifOperStatus
('1.3.6.1.2.1.2.2.1.9', 'TimeTicks', lambda i: int((datetime.now() - datetime(2024, random.randint(1, today.month), random.randint(1, today.day))).total_seconds()) * 100), # ifLastChange
('1.3.6.1.2.1.2.2.1.10', 'Counter32', lambda i: random.randint(3, i*50000)), # ifInOctets
('1.3.6.1.2.1.2.2.1.11', 'Counter32', lambda i: random.randint(3, i*50000)), # ifInUcastPkts
('1.3.6.1.2.1.2.2.1.12', 'Counter32', lambda i: random.randint(0, 80)), # ifInNUcastPkts
('1.3.6.1.2.1.2.2.1.13', 'Counter32', lambda i: random.randint(0, 80)), # ifInDiscards
('1.3.6.1.2.1.2.2.1.14', 'Counter32', lambda i: random.randint(0, 80)), # ifInErrors
('1.3.6.1.2.1.2.2.1.15', 'Counter32', lambda i: random.randint(3, i*50000)), # ifInUnknownProtos
('1.3.6.1.2.1.2.2.1.16', 'Counter32', lambda i: random.randint(3, i*50000)), # ifOutOctets
('1.3.6.1.2.1.2.2.1.17', 'Counter32', lambda i: random.randint(3, i*50000)), # ifOutUcastPkts
('1.3.6.1.2.1.2.2.1.18', 'Counter32', lambda i: random.randint(3, i*50000)), # ifOutNUcastPkts
('1.3.6.1.2.1.2.2.1.19', 'Counter32', lambda i: random.randint(0, 80)), # ifOutDiscards
('1.3.6.1.2.1.2.2.1.20', 'Counter32', lambda i: random.randint(0, 80)), # ifOutErrors
]
ifxtable_snmp_objects = [
('1.3.6.1.2.1.31.1.1.1.1', 'OctetString', lambda i: f"SampleInterface{i}"), # ifName
('1.3.6.1.2.1.31.1.1.1.15', 'Gauge32', lambda i: "100"), # ifHighSpeed
('1.3.6.1.2.1.31.1.1.1.6', 'Counter32', lambda i: random.randint(3, i*50000)), # ifHCInOctets
('1.3.6.1.2.1.31.1.1.1.10', 'Counter32', lambda i: random.randint(3, i*60000)), # ifHCOutOctets
]
# Print IFTable data
for oid_base, tag_type, value_func in iftable_snmp_objects:
for i in range(1, iRangeMax+1):
value = value_func(i)
print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")
# IP-MIB objects for managing IP addressing
# ipAdEntAddr: The IP address to which this entry's addressing information pertains
print(f"1.3.6.1.2.1.4.20.1.1|{dictTags.get('IPAddress')}|10.5.5.5")
# ipAdEntIfIndex: The index value which uniquely identifies the interface to which this entry is applicable
print(f"1.3.6.1.2.1.4.20.1.2|{dictTags.get('OctetString')}|1")
# ipAdEntNetMask: The subnet mask associated with the IP address of this entry
print(f"1.3.6.1.2.1.4.20.1.3|{dictTags.get('OctetString')}|255.255.255.0")
# hrSWRunIndex: An index uniquely identifying a row in the hrSWRun table
print(f"1.3.6.1.2.1.25.4.2.1.1.1|{dictTags.get('Integer')}|1")
# hrSWRunName: The name of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.2.1|{dictTags.get('OctetString')}|LJRSNMPAgent")
# hrSWRunID: The product ID of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.3.1|{dictTags.get('ObjectIdentifier')}|1.3.6.1.4.1.25709.55")
# hrSWRunPath: The path of the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.4.1|{dictTags.get('OctetString')}|/opt/snmp/snmpsim/_agent.sh")
# hrSWRunParameters: Operational parameters for the software running on this device
print(f"1.3.6.1.2.1.25.4.2.1.5.1|{dictTags.get('OctetString')}|-L")
# hrSWRunType: The type of software running (e.g., operating system, application)
print(f"1.3.6.1.2.1.25.4.2.1.6.1|{dictTags.get('Integer')}|4")
# hrSWRunStatus: The status of this software (running, runnable, notRunnable, invalid)
print(f"1.3.6.1.2.1.25.4.2.1.7.1|{dictTags.get('Integer')}|1")
for oid_base, tag_type, value_func in ifxtable_snmp_objects:
for i in range(1, iRangeMax+1):
value = value_func(i)
print(f"{oid_base}.{i}|{dictTags.get(tag_type)}|{value}")
Network Capture
Even better, parse a network capture file.
Capture Data
On the server that gathers SNMP data from the host we want to simulate, use a network capture utility to gather the SNMP communication between the server and the desired device.
tcpdump -i <interface> -w <filename>.pcap
E.G. to record the communication with 10.5.171.114
tcpdump ‘host 10.5.171.114 and (tcp port 161 or tcp port 162 or udp port 161 or udp port 162)’ -w /tmp/ar.pcap
Note – there Is no benefit to capturing more than one cycle of SNMP responses. If data is captured immediately, that means the devices were in the middle of a cycle. End the capture and start a new one shortly. There should be no packets captured for a bit, then packets during the SNMP polling cycle, and then another pause until the next cycle.
Parsing The Capture Data Into A Response File
The following script parses the capture file into an snmprec response file – note, I needed to use 2.6.0rc1 of scapy to parse SNMP data. The 2.5.0 release version failed to parse most of the packets which I believe is related to https://github.com/secdev/scapy/issues/3900
from scapy.all import rdpcap, SNMP
from scapy.layers.inet import UDP
from scapy.packet import Raw
from scapy.layers.snmp import SNMP, SNMPvarbind, SNMPresponse, SNMPbulk
from scapy.all import conf, load_layer
from scapy.utils import hexdump
from scapy.all import UDP, load_contrib
from scapy.packet import bind_layers
import os
from datetime import datetime
import argparse
# Ensure Scapy's SNMP contributions are loaded
load_contrib("snmp")
def sort_by_oid(listSNMPResponses):
"""
Sorts a list of "OID|TAG|Value" strings by the OID numerically and hierarchically.
:param listSNMPResponses: A list of "OID|TAG|Value" strings.
:return: A list of "OID|TAG|Value" strings sorted by OID.
"""
# Split each element into a tuple of (OID list, original string), converting OID to integers for proper comparison
oid_tuples = [(list(map(int, element.split('|')[0].split('.'))), element) for element in listSNMPResponses]
# Sort the list of tuples by the OID part (the list of integers)
sorted_oid_tuples = sorted(oid_tuples, key=lambda x: x[0])
# Extract the original strings from the sorted list of tuples
sorted_listSNMPResponses = [element[1] for element in sorted_oid_tuples]
return sorted_listSNMPResponses
parser = argparse.ArgumentParser(description='This script converts an SNMP packet capture into a snmpsim response file')
parser.add_argument('--filename', '-f', help='The capture file to process', required=True)
args = parser.parse_args()
strFullCaptureFilePath = args.filename
strCaptureFilePath, strCaptureFileName = os.path.split(strFullCaptureFilePath)
# Valid tags per https://pypi.org/project/snmpsim/0.2.3/
dictTags = {'ASN1_INTEGER': '2', 'ASN1_STRING': '4', 'ASN1_NULL': '5', 'ASN1_OID': '6', 'ASN1_IPADDRESS': '64', 'ASN1_COUNTER32': '65', 'ASN1_GAUGE32': '66', 'ASN1_TIME_TICKS': '67', 'Opaque': '68','ASN1_COUNTER64': '70'}
listSNMPResponses = []
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.1.1|2|1")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.2.1|4|LJRSNMPAgent")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.3.1|6|1.3.6.1.4.1.25709.55")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.4.1|4|/opt/snmp/snmpsim/_agent.sh")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.5.1|4|-L")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.6.1|2|4")
listSNMPResponses.append("1.3.6.1.2.1.25.4.2.1.7.1|2|1")
i = 0
if True:
packets = rdpcap(strFullCaptureFilePath)
# Packets are zero indexed, so packet 1 in script is packet 2 in Wireshark GUI
#for i in range(0,4):
for packet in packets:
print(f"Working on packet {i}")
i = i + 1
if SNMP in packet:
snmp_layer = packet[SNMP]
if isinstance(packet[SNMP].PDU,SNMPresponse):
snmp_response = snmp_layer.getfield_and_val('PDU')[1]
if hasattr(snmp_response, 'varbindlist') and snmp_response.varbindlist is not None:
for varbind in snmp_response.varbindlist:
strOID = varbind.oid.val if hasattr(varbind.oid, 'val') else str(varbind.oid)
strValue = varbind.value.val if hasattr(varbind.value, 'val') else str(varbind.value)
strType = type(varbind.value).__name__
if dictTags.get(strType):
iType = dictTags.get(strType)
else:
iType = strType
if isinstance(strValue, bytes):
print(f"Decoding {strValue}")
strValue = strValue.decode('utf-8',errors='ignore')
print(f"OID: {strOID}, Type: {strType}, Tag: {iType}, Value: {strValue}")
listSNMPResponses.append(f"{strOID}|{iType}|{strValue}")
else:
print(f"Not a response -- type is {type(packet[SNMP].PDU)}")
elif Raw in packet:
print(f"I have a raw packet at {i}")
else:
print(dir(packet))
print(f"No SNMP or Raw in {i}: {packet}")
# Sort by OID numbers
listSortedSNMPResponses = sort_by_oid(listSNMPResponses)
f = open(f'/opt/snmp/snmpsim/data/{datetime.now().strftime("%Y%m%d")}-{strCaptureFileName.rsplit(".", 1)[0]}.deactivated', "w")
for strSNMPResponse in listSortedSNMPResponses:
print(strSNMPResponse)
f.write(strSNMPResponse)
f.write("\n")
f.close()
This will create an snmpsim response file at /opt/snmp/snmpsim/data named as the capture file prefixed with the current year, month, and date. I.E. My ar.cap file results are /opt/snmp/snmpsim/data/20240705-ar.deactivated – you can then copy the file to whatever community string you want – cp 20240705-ar.deactivated CommunityString.snmprec
I got a rather last minute notice from our security department that the SSL certificate used in the IdP partnership between my application and their identity provider would be expiring soon and did I want to renew it Monday, Tuesday, or Wednesday. Being that this was Friday afternoon … “none of the above” would have been my preference to avoid filing the “emergency change” paperwork, but Wednesday was the least bad of the three options. Of course, an emergency requires paperwork as to why you didn’t plan two weeks in advance. And how you’ll do better next time.
Sometimes that is a bit of a stretch — next time someone is working on the electrical system and drops a half-inch metal plate into the building wiring, I’m probably still going to have a problem when the power drops. But, in this case, there are two perfectly rational solutions. One, of course, would be that the people planning the certificate renewals start contacting partner applications more promptly. But that’s not within my purview. The thing I can do is watch the metadata on the identity provider and tell myself when the certificates will be expiring soon.
So I now have a little python script that has a list of all of our SAML-authenticated applications. It pulls the metadata from PingID, loads the X509 certificate, checks how far in the future the expiry date is. In my production version, anything < 30 days sends an e-mail alert. Next time, we can contact security ahead of time, find out when they’re planning on doing the renewal, and get the change request approved well in advance.
import requests
import xml.etree.ElementTree as ET
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from datetime import datetime, date
strIDPMetadataURLBase = 'https://login.example.com/pf/federation_metadata.ping?PartnerSpId='
listSPIDs = ["https://tableau.example.com", "https://email.example.com", "https://internal.example.com", "https://salestool.example.com"]
for strSPID in listSPIDs:
objResults = requests.get(f"{strIDPMetadataURLBase}{strSPID}")
if objResults.status_code == 200:
try:
root = ET.fromstring(objResults.text)
for objX509Cert in root.findall("./{urn:oasis:names:tc:SAML:2.0:metadata}IDPSSODescriptor/{urn:oasis:names:tc:SAML:2.0:metadata}KeyDescriptor/{http://www.w3.org/2000/09/xmldsig#}KeyInfo/{http://www.w3.org/2000/09/xmldsig#}X509Data/{http://www.w3.org/2000/09/xmldsig#}X509Certificate"):
strX509Cert = f"-----BEGIN CERTIFICATE-----\n{objX509Cert.text}\n-----END CERTIFICATE-----"
cert = x509.load_pem_x509_certificate(bytes(strX509Cert,'utf8'), default_backend())
iDaysUntilExpiry = cert.not_valid_after - datetime.today()
print(f"{strSPID}\t{iDaysUntilExpiry.days}")
except:
print(f"{strSPID}\tFailed to decode X509 Certficate")
else:
print(f"{strSPID}\tFailed to retrieve metadata XML")
I was having a lot of trouble using find/findall when parsing an XML document — turns out the namespace prefixed the tag name … so I needed to find {http://maven.apache.org/POM/4.0.0}groupId instead of just groupId
How do you figure that out? Quickest way, for me, was just to print out all of the tag names.
from lxml import etree
# Load POM XML into tree
tree = etree.parse( strXMLFile )
# # List all element names in XML document
for element in tree.iter():
print(element.tag)