Tag: python

Blender API: Finding The Orange Dot

A quick script to get each object and the location of the “orange dot” … the origin of the object

# Get location of orange dot for each object in Blender
import bpy

scene = bpy.context.scene
us = scene.unit_settings

unit_system = getattr(us, "system", "NONE")  # 'NONE', 'METRIC', 'IMPERIAL'

meters_per_bu = us.scale_length if unit_system != 'NONE' else 1.0
mm_per_bu = meters_per_bu * 1000.0

for obj in bpy.data.objects:
    if obj.type != 'MESH':
        continue

    origin_world = obj.matrix_world.translation          # in BU
    origin_world_mm = origin_world * mm_per_bu           # in mm

    print(f"Object: {obj.name}")
    print(f"  origin_world (BU): {origin_world.x:.6f}, {origin_world.y:.6f}, {origin_world.z:.6f}")
    print(f"  origin_world (mm): {origin_world_mm.x:.3f}, {origin_world_mm.y:.3f}, {origin_world_mm.z:.3f}")
    print("-" * 30)

Blender API: Bending a 2D Rectangle

Another attempt to create a t-post bracket using a script. This creates a 2D rectangle, bends it, and then solidifies it into a 3d object.

import bpy
import bmesh
import math
from mathutils import Vector, Matrix

# -----------------------------
# Reset / clear scene
# -----------------------------
for obj in list(bpy.data.objects):
    bpy.data.objects.remove(obj, do_unlink=True)

# -----------------------------
# Scene units: mm (1 BU = 1 mm)
# -----------------------------
scene = bpy.context.scene
scene.unit_settings.system = 'METRIC'
scene.unit_settings.scale_length = 0.001

INCH_TO_MM = 25.4
def inch(x):  # returns mm (Blender units)
    return x * INCH_TO_MM

# -----------------------------
# Parameters
# -----------------------------
size_x_in = 3.0
size_y_in = 7.0
thickness_in = 0.25  # SOLIDIFY thickness

fold1_offset_in = 0.5   # from MIN-Y end
fold2_offset_in = 2.0   # from MIN-Y end

fold1_rad = math.radians(-80.0)
fold2_rad = math.radians(80.0)

subdivide_cuts = 60
EPS_Y = 1e-5  # mm tolerance for "on the fold line"

# -----------------------------
# Create flat sheet (plane)
# -----------------------------
bpy.ops.mesh.primitive_plane_add(size=1.0, location=(0.0, 0.0, 0.0))
obj = bpy.context.active_object
obj.name = "Bracket"
obj.dimensions = (inch(size_x_in), inch(size_y_in), 0.0)
bpy.ops.object.transform_apply(location=False, rotation=False, scale=True)

# Subdivide for clean fold lines
bpy.ops.object.mode_set(mode='EDIT')
bpy.ops.mesh.select_all(action='SELECT')
bpy.ops.mesh.subdivide(number_cuts=subdivide_cuts)
bpy.ops.object.mode_set(mode='OBJECT')

# Compute fold Y positions
half_y = inch(size_y_in) / 2.0
min_y = -half_y
y_fold1 = min_y + inch(fold1_offset_in)
y_fold2 = min_y + inch(fold2_offset_in)

# Add both fold lines
bm = bmesh.new()
bm.from_mesh(obj.data)

for y_fold in (y_fold1, y_fold2):
    geom = bm.verts[:] + bm.edges[:] + bm.faces[:]
    bmesh.ops.bisect_plane(
        bm,
        geom=geom,
        plane_co=Vector((0.0, y_fold, 0.0)),
        plane_no=Vector((0.0, 1.0, 0.0)),
        clear_inner=False,
        clear_outer=False
    )

bm.normal_update()
bm.to_mesh(obj.data)
bm.free()

# -----------------------------
# Re-open bmesh, store ORIGINAL Y per vertex
# -----------------------------
bm = bmesh.new()
bm.from_mesh(obj.data)
bm.verts.ensure_lookup_table()

orig_y_layer = bm.verts.layers.float.new("orig_y")
for v in bm.verts:
    v[orig_y_layer] = v.co.y

# ============================================================
# FOLD 1
# ============================================================
hinge_verts_1 = [v for v in bm.verts if abs(v[orig_y_layer] - y_fold1) < EPS_Y]
if not hinge_verts_1:
    raise RuntimeError("No hinge vertices found for fold 1. Increase subdivide_cuts or EPS_Y.")

hinge_point_1 = Vector((0.0, 0.0, 0.0))
for v in hinge_verts_1:
    hinge_point_1 += v.co
hinge_point_1 /= len(hinge_verts_1)

verts_to_rotate_1 = [v for v in bm.verts if v[orig_y_layer] > (y_fold1 + EPS_Y)]
rot1 = Matrix.Rotation(fold1_rad, 4, 'X')
bmesh.ops.rotate(bm, verts=verts_to_rotate_1, cent=hinge_point_1, matrix=rot1)

# ============================================================
# FOLD 2
# ============================================================
hinge_verts_2 = [v for v in bm.verts if abs(v[orig_y_layer] - y_fold2) < EPS_Y]
if not hinge_verts_2:
    raise RuntimeError("No hinge vertices found for fold 2. Increase subdivide_cuts or EPS_Y.")

hinge_point_2 = Vector((0.0, 0.0, 0.0))
for v in hinge_verts_2:
    hinge_point_2 += v.co
hinge_point_2 /= len(hinge_verts_2)

verts_to_rotate_2 = [v for v in bm.verts if v[orig_y_layer] > (y_fold2 + EPS_Y)]
rot2 = Matrix.Rotation(fold2_rad, 4, 'X')
bmesh.ops.rotate(bm, verts=verts_to_rotate_2, cent=hinge_point_2, matrix=rot2)

# Write back mesh
bm.normal_update()
bm.to_mesh(obj.data)
bm.free()

# -----------------------------
# Solidify AFTER folding
# -----------------------------
solid = obj.modifiers.new(name="Solidify_0p5in", type='SOLIDIFY')
solid.thickness = inch(thickness_in)  # 0.5"
solid.offset = 0.0                    # centered thickness (equal on both sides)
solid.use_even_offset = True
solid.use_rim = True

# Optional: keep object active
bpy.ops.object.select_all(action='DESELECT')
obj.select_set(True)
bpy.context.view_layer.objects.active = obj

Blender API: Playing with Cylinders

This script was mostly made to play around with rotation on cylinders.

import bpy
import math

# Delete all existing objects
for obj in list(bpy.data.objects):
    bpy.data.objects.remove(obj, do_unlink=True)

for i in range(4):
    bpy.ops.mesh.primitive_cylinder_add(
        radius=0.5,
        depth=10.0,
        location=(0, 0.0, 0.0),
        rotation=((i * 5.5), 0.0, 0.0)
    )

    cyl = bpy.context.active_object
    cyl.name = f"DemoCylinderX{i}"

for i in range(4):
    bpy.ops.mesh.primitive_cylinder_add(
        radius=0.5,
        depth=10.0,
        location=(0, 0.0, 0.0),
        rotation=(0.0, (i * 5.5), 0.0)
    )

    cyl = bpy.context.active_object
    cyl.name = f"DemoCylinderY{i}"

# cyl.rotation_euler = (15.0,13.0,12.0)

# Or single-axis rotation
# Rotate 45 degrees about X axis
#cyl.rotation_euler[0] = math.radians(45.0)


Blender Scripting – T-Post Sign Holder

We spent a lot of the day trying to modify 3D models that we found online to work as a sign holder. Something like the bent metal plates you can buy at the tractor store. Since these are simple polygons, I thought it might be easier to script the build (plus making changes to the dimensions would just require tweaking variables).

Voila – hopefully it’s a T-post sign holder! It at least looks like one.

import bpy
import bmesh
import math
from mathutils import Vector

# Clear all existing objects
for obj in list(bpy.data.objects):
    bpy.data.objects.remove(obj, do_unlink=True)

# -----------------------------
# Scene units (mm)
# -----------------------------
scene = bpy.context.scene
scene.unit_settings.system = 'METRIC'
scene.unit_settings.scale_length = 0.001  # 1 Blender unit = 1 mm

INCH = 25.4
def inch(x): return x * INCH

# -----------------------------
# PARAMETERS (mm)
# -----------------------------
bracket_thickness = inch(0.25)   # sheet thickness
bracket_width = inch(3)   # bracket width (across the post)

# Leg lengths (side profile)
bracket_top_length = inch(1)        # bracket segment 1 length
bracket_middle_length = inch(2)     # bracket segment 2 length
bracket_bottom_length = inch(4.5)   # bracket segment 3 length

# Bend included angles
bend1_angle_included = 105.0   # top flange
bend2_angle_included = 255.0   # web -> long leg

# If the long leg goes the wrong direction, flip this
flip_second_bend = True

# -----------------------------
# Punch hole
# -----------------------------
do_punch = True

# T-post size references
tpost_horizontal_hole_height = inch(0.25)
tpost_horizontal_hole_width = inch(1.5)
tpost_vertical_hole_height = inch(2)
tpost_vertical_hole_width = inch(0.25)
punch_clearance = 1.0 # clearance added around each rectangle (mm)

# Position of t-post before rotation (Z from p0 end, and X across width)
punch_center_z = inch(1)
punch_center_x = bracket_width / 2

# Vertical placement on top flange (Y=0 plane)
punch_center_y = -inch(0.5)

# -----------------------------
# Optional bevel to make edges25ook more formed
# -----------------------------
do_bevel = True

bevel_width = inch(0.05)
bevel_segments = 25

# -----------------------------
# Cleanup
# -----------------------------
#for n in ["BracketShape", "PunchBar", "PunchStem", "HoleRight1", "HoleRight2", "HoleRight3", "HoleRight4", "HoleLeft1", "HoleRLeft2", "HoleLeft3", "HoleLeft4"]:
#    o = bpy.data.objects.get(n)
#    if o:
#        bpy.data.objects.remove(o, do_unlink=True)

# -----------------------------
# Helpers (YZ plane directions)
# Define 0° as +Z. +90° is +Y. -90° is -Y.
# -----------------------------
def unit_from_angle(deg_from_posZ):
    a = math.radians(deg_from_posZ)
    return Vector((0.0, math.sin(a), math.cos(a)))

def boolean_diff(target, cutter):
    mod = target.modifiers.new(name=f"BOOL_{cutter.name}", type="BOOLEAN")
    mod.operation = 'DIFFERENCE'
    mod.solver = 'EXACT'
    mod.object = cutter
    bpy.context.view_layer.objects.active = target
    bpy.ops.object.modifier_apply(modifier=mod.name)
    cutter.hide_set(True)

def add_cube(name, size_xyz, location_xyz, rotation_xyz):
    bpy.ops.mesh.primitive_cube_add(size=1, location=location_xyz, rotation=rotation_xyz)
    obj = bpy.context.active_object
    obj.name = name
    obj.scale = (size_xyz[0], size_xyz[1], size_xyz[2])
    bpy.ops.object.transform_apply()
    return obj

def add_cylinder(name, radius, length, location_xyz, rotation_xyz):
    bpy.ops.mesh.primitive_cylinder_add(radius=radius, depth=length, location=location_xyz, rotation=rotation_xyz)
    obj = bpy.context.active_object
    obj.name = name
    bpy.ops.object.transform_apply()
    return obj

# Convert included bend angles to turn angles
angle_top = 180.0 - bend1_angle_included
angle_bottom = 180.0 - bend2_angle_included

# Start along +Z (top flange)
theta0 = 0.0
d0 = unit_from_angle(theta0)

# After bend1, go "down" (toward -Y) by turning negative
theta1 = theta0 - angle_top
d1 = unit_from_angle(theta1)

# After bend2, go toward +Z again (or flip if needed)
theta2 = theta1 + (angle_bottom if not flip_second_bend else - angle_bottom)
d2 = unit_from_angle(theta2)

# Profile points (center surface)
p0 = Vector((0.0, 0.0, 0.0))      # free end of top flange
p1 = p0 + d0 * bracket_top_length       # bend1 line
p2 = p1 + d1 * bracket_middle_length    # bend2 line
p3 = p2 + d2 * bracket_bottom_length    # end of long leg

# -----------------------------
# Build a single connected sheet surface:
# Create two polylines separated in X, then make quads between them.
# -----------------------------
mesh = bpy.data.meshes.new("BracketShapeMesh")
bracket = bpy.data.objects.new("BracketShape", mesh)
bpy.context.collection.objects.link(bracket)
bpy.context.view_layer.objects.active = bracket
bracket.select_set(True)

bm = bmesh.new()

x0, x1 = 0.0, bracket_width

# Left side (x0)
v0a = bm.verts.new((x0, p0.y, p0.z))
v1a = bm.verts.new((x0, p1.y, p1.z))
v2a = bm.verts.new((x0, p2.y, p2.z))
v3a = bm.verts.new((x0, p3.y, p3.z))

# Right side (x1)
v0b = bm.verts.new((x1, p0.y, p0.z))
v1b = bm.verts.new((x1, p1.y, p1.z))
v2b = bm.verts.new((x1, p2.y, p2.z))
v3b = bm.verts.new((x1, p3.y, p3.z))

# Faces (one per segment)
bm.faces.new((v0a, v0b, v1b, v1a))  # top flange
bm.faces.new((v1a, v1b, v2b, v2a))  # web
bm.faces.new((v2a, v2b, v3b, v3a))  # long leg

bm.normal_update()
bm.to_mesh(mesh)
bm.free()

# -----------------------------
# Solidify to thickness (sheet metal look)
# -----------------------------
solid = bracket.modifiers.new("Solidify", type="SOLIDIFY")
solid.thickness = bracket_thickness
solid.offset = 0.0
bpy.ops.object.modifier_apply(modifier=solid.name)

# -----------------------------
# Punch the lowercase "t" on the top flange
# (Top flange is flat at Y=0; punch straight through Y)
# -----------------------------
if do_punch:
    tpost_length_y = bracket_thickness * 5  # ensure it fully cuts through

    # Crossbar rectangle
    horizontal_hole = add_cube(
        "PunchBar",
        size_xyz=(tpost_horizontal_hole_width + 2 * punch_clearance, tpost_length_y, tpost_horizontal_hole_height + 2 * punch_clearance),
        location_xyz=(punch_center_x, 13 + punch_center_y, punch_center_z),
        rotation_xyz=(math.radians(90 - bend1_angle_included / 2), math.radians(0), math.radians(0))
    )

    # Stem rectangle (placed under the bar like a lowercase "t")
    vertical_hole = add_cube(
        "PunchStem",
        size_xyz=(tpost_vertical_hole_width + 2 * punch_clearance, tpost_length_y, tpost_vertical_hole_height + 2 * punch_clearance),
        location_xyz=(punch_center_x, punch_center_y, punch_center_z),
        rotation_xyz=(math.radians(90), math.radians(0), math.radians(0))
        #rotation_xyz=(math.radians(90 - bend1_angle_included / 2), math.radians(0), math.radians(0))
    )
    boolean_diff(bracket, vertical_hole)
    boolean_diff(bracket, horizontal_hole)


for hole in range(4):
    right_hole = add_cylinder(
        "HoleRight{}".format(hole),
        radius=inch(0.125),
        length=100,
        location_xyz=(inch(0.5), -inch(2), inch(2) + inch(1.175) * hole),
        rotation_xyz=(math.radians(90), 0, 0)
    )
    left_hole = add_cylinder(
        "HoleLeft{}".format(hole),
        radius=inch(0.125),
        length=100,
        location_xyz=(inch(2.5), -inch(2), inch(2) + inch(1.175) * hole),
        rotation_xyz=(math.radians(90), 0, 0)
    )
    boolean_diff(bracket, right_hole)
    boolean_diff(bracket, left_hole)

# -----------------------------
# Optional bevel
# -----------------------------
if do_bevel:
    bev = bracket.modifiers.new("Bevel", type="BEVEL")
    bev.width = bevel_width
    bev.segments = bevel_segments
    bev.limit_method = 'ANGLE'
    #bev.angle_limit = math.radians(35)
    bev.use_clamp_overlap = False
    bpy.context.view_layer.objects.active = bracket
    bpy.ops.object.modifier_apply(modifier=bev.name)

API Documentation Links:

https://docs.blender.org/api/current/bpy.ops.mesh.html
https://docs.blender.org/api/current/bmesh.ops.html

Blender Scripting Lesson of the Week: Cylinders

Quick script for creating a cylinder using bpy

import bpy

# Clear all existing objects
for obj in list(bpy.data.objects):
    bpy.data.objects.remove(obj, do_unlink=True)

# Set Units
scene = bpy.context.scene
scene.unit_settings.system = 'METRIC'
scene.unit_settings.scale_length = 0.001  # 1 BU = 1 mm

# Create cylinder
bpy.ops.mesh.primitive_cylinder_add(
    vertices=32, radius=10.0, depth=20.0,
    end_fill_type='NGON', calc_uvs=True,
    enter_editmode=False, align='WORLD',
    location=(0.0, 0.0, -2.0), rotation=(0.0, 0.0, 0.0),
    scale=(1, 1, 1)
)

# Name cylinder
obj = bpy.context.active_object
obj.name = "MyCylinder"

# Frame Selected 
for area in bpy.context.window.screen.areas:
    if area.type == 'VIEW_3D':
        for region in area.regions:
            if region.type == 'WINDOW':
                with bpy.context.temp_override(area=area, region=region):
                    bpy.ops.view3d.view_selected(use_all_regions=False)
                break
        break

Viewing *Real* Certificate Chain

Browsers implement AIA which “helps” by repairing the certificate chain and forming a trust even without proper server configuration. Which is great for user experience, but causes a lot of challenges to people troubleshooting SSL connection failures from devices, old equipment, etc. It’s fine when I try it from my laptop!

This python script reports on the real certificate chain being served from an endpoint. Self-signed certificates will show as untrusted

And public certs will show the chain and show as trusted

Code:

import ssl
import socket
import datetime
import select

# Third-party modules (install: pip install pyopenssl cryptography)
try:
    from OpenSSL import SSL, crypto
    from cryptography import x509
    from cryptography.hazmat.primitives import hashes
except ImportError as e:
    raise SystemExit(
        "Missing required modules. Please install:\n"
        "  pip install pyopenssl cryptography\n"
        f"Import error: {e}"
    )

def prompt(text, default=None):
    s = input(text).strip()
    return s if s else default

def check_trust(hostname: str, port: int, timeout=6.0):
    """
    Attempt a TLS connection using system trust store and hostname verification.
    Returns (trusted: bool, message: str).
    """
    try:
        ctx = ssl.create_default_context()
        with socket.create_connection((hostname, port), timeout=timeout) as sock:
            with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
                # Minimal HTTP GET to ensure we fully complete the handshake
                req = f"GET / HTTP/1.1\r\nHost: {hostname}\r\nConnection: close\r\n\r\n"
                ssock.sendall(req.encode("utf-8"))
                _ = ssock.recv(1)
        return True, "TRUSTED (system trust store)"
    except ssl.SSLCertVerificationError as e:
        return False, f"NOT TRUSTED (certificate verification error): {e}"
    except ssl.SSLError as e:
        return False, f"NOT TRUSTED (SSL error): {e}"
    except Exception as e:
        return False, f"Error connecting: {e}"

def _aware_utc(dt: datetime.datetime) -> datetime.datetime:
    """
    Ensure a datetime is timezone-aware in UTC. cryptography returns naive UTC datetimes.
    """
    if dt.tzinfo is None:
        return dt.replace(tzinfo=datetime.timezone.utc)
    return dt.astimezone(datetime.timezone.utc)

def _cert_to_info(cert: x509.Certificate):
    subj = cert.subject.rfc4514_string()
    issr = cert.issuer.rfc4514_string()
    fp_sha1 = cert.fingerprint(hashes.SHA1()).hex().upper()
    nb = _aware_utc(cert.not_valid_before_utc)
    na = _aware_utc(cert.not_valid_after_utc)
    now = datetime.datetime.now(datetime.timezone.utc)
    delta_days = max(0, (na - now).days)
    return {
        "subject": subj,
        "issuer": issr,
        "sha1": fp_sha1,
        "not_before": nb,
        "not_after": na,
        "days_to_expiry": delta_days
    }

def _do_handshake_blocking(conn: SSL.Connection, sock: socket.socket, timeout: float):
    """
    Drive the TLS handshake, handling WantRead/WantWrite by waiting with select.
    """
    deadline = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
    while True:
        try:
            conn.do_handshake()
            return
        except SSL.WantReadError:
            remaining = (deadline - datetime.datetime.now()).total_seconds()
            if remaining <= 0:
                raise TimeoutError("TLS handshake timed out (WantRead)")
            r, _, _ = select.select([sock], [], [], remaining)
            if not r:
                raise TimeoutError("TLS handshake timed out (WantRead)")
            continue
        except SSL.WantWriteError:
            remaining = (deadline - datetime.datetime.now()).total_seconds()
            if remaining <= 0:
                raise TimeoutError("TLS handshake timed out (WantWrite)")
            _, w, _ = select.select([], [sock], [], remaining)
            if not w:
                raise TimeoutError("TLS handshake timed out (WantWrite)")
            continue

def fetch_presented_chain(hostname: str, port: int, timeout: float = 12.0):
    """
    Capture the presented certificate chain using pyOpenSSL.
    Returns (chain: list of {subject, issuer, sha1, not_before, not_after, days_to_expiry}, error: str or None).
    """
    # TCP connect
    try:
        sock = socket.create_connection((hostname, port), timeout=timeout)
        sock.settimeout(timeout)
    except Exception as e:
        return [], f"Error connecting: {e}"

    try:
        # TLS client context
        ctx = SSL.Context(SSL.TLS_CLIENT_METHOD)

        # Compatibility tweaks:
        # - Lower OpenSSL security level
        try:
            ctx.set_cipher_list(b"DEFAULT:@SECLEVEL=1")
        except Exception:
            pass

        # - Disable TLS 1.3 and set minimum TLS 1.2
        try:
            ctx.set_options(SSL.OP_NO_TLSv1_3)
        except Exception:
            pass
        try:
            # Ensure TLSv1.2+ (pyOpenSSL exposes set_min_proto_version on some builds)
            if hasattr(ctx, "set_min_proto_version"):
                ctx.set_min_proto_version(SSL.TLS1_2_VERSION)
        except Exception:
            pass

        # - Set ALPN to http/1.1 (some paths work better when ALPN is present)
        try:
            ctx.set_alpn_protos([b"http/1.1"])
        except Exception:
            pass

        conn = SSL.Connection(ctx, sock)
        conn.set_tlsext_host_name(hostname.encode("utf-8"))
        conn.set_connect_state()

        # Blocking mode (best effort)
        try:
            conn.setblocking(True)
        except Exception:
            pass

        # Drive handshake
        _do_handshake_blocking(conn, sock, timeout=timeout)

        # Retrieve chain (some servers only expose leaf)
        chain = conn.get_peer_cert_chain()
        infos = []
        if chain:
            for c in chain:
                der = crypto.dump_certificate(crypto.FILETYPE_ASN1, c)
                cert = x509.load_der_x509_certificate(der)
                infos.append(_cert_to_info(cert))
        else:
            peer = conn.get_peer_certificate()
            if peer is not None:
                der = crypto.dump_certificate(crypto.FILETYPE_ASN1, peer)
                cert = x509.load_der_x509_certificate(der)
                infos.append(_cert_to_info(cert))

        # Cleanup
        try:
            conn.shutdown()
        except Exception:
            pass
        finally:
            try:
                conn.close()
            except Exception:
                pass
            try:
                sock.close()
            except Exception:
                pass

        if not infos:
            return [], "No certificates captured (server did not present a chain and peer cert unavailable)"
        return infos, None

    except Exception as e:
        try:
            sock.close()
        except Exception:
            pass
        etype = type(e).__name__
        emsg = str(e) or "no message"
        return [], f"TLS handshake or chain retrieval error: {etype}: {emsg}"

def main():
    hostname = prompt("Enter hostname to test (e.g., example.domain.com): ")
    if not hostname:
        print("Hostname is required.")
        return
    port_str = prompt("Enter port [default 443]: ", "443")
    try:
        port = int(port_str)
    except ValueError:
        print("Invalid port.")
        return

    print(f"\nTesting TLS chain for {hostname}:{port} ...")
    chain, err = fetch_presented_chain(hostname, port)
    print("\nPresented chain:")
    if err:
        print(f"  [ERROR] {err}")
    elif not chain:
        print("  [No certificates captured]")
    else:
        for i, ci in enumerate(chain, 1):
            print(f"  [{i}] Subject: {ci['subject']}")
            print(f"       Issuer:  {ci['issuer']}")
            print(f"       SHA1:    {ci['sha1']}")
            nb_val = ci.get("not_before")
            na_val = ci.get("not_after")
            nb_str = nb_val.isoformat() if isinstance(nb_val, datetime.datetime) else str(nb_val)
            na_str = na_val.isoformat() if isinstance(na_val, datetime.datetime) else str(na_val)
            print(f"       Not Before: {nb_str}")
            print(f"       Not After:  {na_str}")
            dte = ci.get("days_to_expiry")
            if dte is not None:
                print(f"       Expires In: {dte} days")

    trusted, msg = check_trust(hostname, port)
    print(f"\nTrust result: {'TRUSTED' if trusted else 'NOT TRUSTED'} - {msg}")

if __name__ == "__main__":
    main()

Python: Partition and RPartition

Found a neat pair of methods that were added in Python 2.5 — it’s like split/index except it handles breaking the string into two elements for you. A tuple is returned with the part before the separator, the separator, and the part after the separator. If the separator is not found, element 0 and 1 are empty strings.

 

C:\Users\lisa> python
Python 3.13.3
Type “help”, “copyright”, “credits” or “license” for more information.
>>> test = “This is a string | with pipe characters as | delimiters in the string”
>>> print(test.rpartition(“|”)[0])
This is a string | with pipe characters as
>>> print(test.partition(“|”)[0])
This is a string
>>>

Sumo Logic: Creating Roles via API

This script creates very basic roles with no extra capabilities and restricts the role to viewing only the indicated source category’s data.

################################################################################
# This script reads an Excel file containing role data, then uses the Sumo Logic
# API to create roles based on the data. It checks each row for a role name and
# uses the source category to set data filters. The script requires a config.py
# file with access credentials.
################################################################################
import pandas as pd
import requests
import json
from config import access_id, access_key  # Import credentials from config.py

# Path to Excel file
excel_file_path = 'NewRoles.xlsx'

# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'

################################################################################
# Function to create a new role using the Sumo Logic API.
# 
# Args:
#     role_name (str): The name of the role to create.
#     role_description (str): The description of the role.
#     source_category (str): The source category to restrict the role to.
# 
# Returns:
#     None. Prints the status of the API call.
################################################################################
def create_role(role_name, role_description, source_category):
    
    url = f'{base_url}/roles'

    # Role payload
    data_filter = f'_sourceCategory={source_category}'
    payload = {
        'name': role_name,
        'description': role_description,
        'logAnalyticsDataFilter': data_filter,
        'auditDataFilter': data_filter,
        'securityDataFilter': data_filter
    }

    # Headers for the request
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }

    # Debugging line
    print(f"Attempting to create role: '{role_name}' with description: '{role_description}' and filter: '{data_filter}'")

    # Make the POST request to create a new role
    response = requests.post(url, auth=(access_id, access_key), headers=headers, data=json.dumps(payload))

    # Check the response
    if response.status_code == 201:
        print(f'Role {role_name} created successfully.')
    else:
        print(f'Failed to create role {role_name}. Status Code: {response.status_code}')
        print('Response:', response.json())

################################################################################
# Reads an Excel file and processes each row to extract role information and
# create roles using the Sumo Logic API.
# 
# Args:
#     file_path (str): The path to the Excel file containing role data.
# 
# Returns:
#     None. Processes the file and attempts to create roles based on the data.
################################################################################
def process_excel(file_path):
    # Load the spreadsheet
    df = pd.read_excel(file_path, engine='openpyxl')

    # Print column names to help debug and find correct ones
    print("Columns found in Excel:", df.columns)

    # Iterate over each row in the DataFrame
    for index, row in df.iterrows():
        role_name = row['Role Name']  # Correct column name for role name
        source_category = row['Source Category']  # Correct column name for source category to which role is restricted

        # Only create a role if the role name is not null
        if pd.notnull(role_name):
            role_description = f'Provides access to source category {source_category}'
            create_role(role_name, role_description, source_category)


# Process the Excel file
process_excel(excel_file_path)

Parsing HAR File

I am working with a new application that doesn’t seem to like when a person has multiple roles assigned to them … however, I first need to prove that is the problem. Luckily, your browser gets the SAML response and you can actually see the Role entitlements that are being sent. Just need to parse them out of the big 80 meg file that a simple “go here and log on” generates!

To gather data to be parsed, open the Dev Tools for the browser tab. Click the settings gear icon and select “Persist Logs”. Reproduce the scenario – navigate to the site, log in. Then save the dev tools session as a HAR file. The following Python script will analyze the file, extract any SAML response tokens, and print them in a human-readable format.

################################################################################
# This script reads a HAR file, identifies HTTP requests and responses containing
# SAML tokens, and decodes "SAMLResponse" values.
#
# The decoded SAML assertions are printed out for inspection in a readable format.
#
# Usage:
# - Update the str_har_file_path with your HAR file
################################################################################
# Editable Variables
str_har_file_path = 'SumoLogin.har'

# Imports
import json
import base64
import urllib.parse
from xml.dom.minidom import parseString

################################################################################
#  This function decodes SAML responses found within the HAR capture
# Args: 
#   saml_response_encoded(str): URL encoded, base-64 encoded SAML response
# Returns:
#   string: decoded string
################################################################################
def decode_saml_response(saml_response_encoded):
    url_decoded = urllib.parse.unquote(saml_response_encoded)
    base64_decoded = base64.b64decode(url_decoded).decode('utf-8')
    return base64_decoded

################################################################################
#  This function finds and decodes SAML tokens from HAR entries.
#
# Args:
#   entries(list): A list of HTTP request and response entries from a HAR file.
#
# Returns:
#   list: List of decoded SAML assertion response strings.
################################################################################
def find_saml_tokens(entries):
    saml_tokens = []
    for entry in entries:
        request = entry['request']
        response = entry['response']
        
        if request['method'] == 'POST':
            request_body = request.get('postData', {}).get('text', '')
            
            if 'SAMLResponse=' in request_body:
                saml_response_encoded = request_body.split('SAMLResponse=')[1].split('&')[0]
                saml_tokens.append(decode_saml_response(saml_response_encoded))
        
        response_body = response.get('content', {}).get('text', '')
        
        if response.get('content', {}).get('encoding') == 'base64':
            response_body = base64.b64decode(response_body).decode('utf-8', errors='ignore')
        
        if 'SAMLResponse=' in response_body:
            saml_response_encoded = response_body.split('SAMLResponse=')[1].split('&')[0]
            saml_tokens.append(decode_saml_response(saml_response_encoded))
    
    return saml_tokens

################################################################################
#  This function converts XML string to an XML dom object formatted with
# multiple lines with heirarchital indentations
#
# Args:
#   xml_string (str): The XML string to be pretty-printed.
#
# Returns:
#   dom: A pretty-printed version of the XML string.
################################################################################
def pretty_print_xml(xml_string):
    dom = parseString(xml_string)
    return dom.toprettyxml(indent="  ")

# Load HAR file with UTF-8 encoding
with open(str_har_file_path, 'r', encoding='utf-8') as file:
    har_data = json.load(file)

entries = har_data['log']['entries']

saml_tokens = find_saml_tokens(entries)
for token in saml_tokens:
    print("Decoded SAML Token:")
    print(pretty_print_xml(token))
    print('-' * 80)

Sumo Logic: Validating Collector Data Sources via API

This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation.

import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
from config import access_id, access_key  # Import your credentials from config.py

# Base URL for Sumo Logic API
base_url = 'https://api.sumologic.com/api/v1'

def get_all_collectors():
    """Retrieve all collectors with pagination support."""
    collectors = []
    limit = 1000  # Adjust as needed; check API docs for max limit
    offset = 0

    while True:
        url = f'{base_url}/collectors?limit={limit}&offset={offset}'
        response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
        if response.status_code == 200:
            result = response.json()
            collectors.extend(result.get('collectors', []))
            if len(result.get('collectors', [])) < limit:
                break  # Exit the loop if we received fewer than the limit, meaning it's the last page
            offset += limit
        else:
            print('Error fetching collectors:', response.status_code, response.text)
            break

    return collectors

def get_sources(collector_id):
    """Retrieve sources for a specific collector."""
    url = f'{base_url}/collectors/{collector_id}/sources'
    response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))
    if response.status_code == 200:
        sources = response.json().get('sources', [])
        # print(f"Log Sources for collector {collector_id}: {sources}")
        return sources
    else:
        print(f'Error fetching sources for collector {collector_id}:', response.status_code, response.text)
        return []

def check_required_logs(sources):
    """Check if the required logs are present in the sources."""
    required_logs = {
        '_security_events': False,
        '_linux_system_events': False,
        'cron_logs': False,
        'dnf_rpm_logs': False
    }

    for source in sources:
        if source['sourceType'] == 'LocalFile':
            name = source.get('name', '')
            for key in required_logs.keys():
                if name.endswith(key):
                    required_logs[key] = True

    # Determine missing logs
    missing_logs = {log: "MISSING" if not present else "" for log, present in required_logs.items()}
    return missing_logs

# Main execution
if __name__ == "__main__":
    collectors = get_all_collectors()
    report_data = []

    for collector in collectors:
        # Check if the collector's osName is 'Linux'
        if collector.get('osName') == 'Linux':
            collector_id = collector['id']
            collector_name = collector['name']
            print(f"Checking Linux Collector: ID: {collector_id}, Name: {collector_name}")

            sources = get_sources(collector_id)
            missing_logs = check_required_logs(sources)
            if any(missing_logs.values()):
                report_entry = {
                    "Collector Name": collector_name,
                    "_security_events": missing_logs['_security_events'],
                    "_linux_system_events": missing_logs['_linux_system_events'],
                    "cron_logs": missing_logs['cron_logs'],
                    "dnf_rpm_logs": missing_logs['dnf_rpm_logs']
                }
                # print(f"Missing logs for collector {collector_name}: {report_entry}")
                report_data.append(report_entry)

    # Create a DataFrame and write to Excel
    df = pd.DataFrame(report_data, columns=[
        "Collector Name", "_security_events", "_linux_system_events", "cron_logs", "dnf_rpm_logs"
    ])

    # Generate the filename with current date and time
    if not df.empty:
        timestamp = pd.Timestamp.now().strftime("%Y%m%d-%H%M")
        output_file = f"{timestamp}-missing_logs_report.xlsx"
        df.to_excel(output_file, index=False)
        print(f"\nData written to {output_file}")
    else:
        print("\nAll collectors have the required logs.")