This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | import requests from requests.auth import HTTPBasicAuth import pandas as pd from config import access_id, access_key # Import your credentials from config.py # Base URL for Sumo Logic API def get_all_collectors(): """Retrieve all collectors with pagination support.""" collectors = [] limit = 1000 # Adjust as needed; check API docs for max limit offset = 0 while True : url = f '{base_url}/collectors?limit={limit}&offset={offset}' response = requests.get(url, auth = HTTPBasicAuth(access_id, access_key)) if response.status_code = = 200 : result = response.json() collectors.extend(result.get( 'collectors' , [])) if len (result.get( 'collectors' , [])) < limit: break # Exit the loop if we received fewer than the limit, meaning it's the last page offset + = limit else : print ( 'Error fetching collectors:' , response.status_code, response.text) break return collectors def get_sources(collector_id): """Retrieve sources for a specific collector.""" url = f '{base_url}/collectors/{collector_id}/sources' response = requests.get(url, auth = HTTPBasicAuth(access_id, access_key)) if response.status_code = = 200 : sources = response.json().get( 'sources' , []) # print(f"Log Sources for collector {collector_id}: {sources}") return sources else : print (f 'Error fetching sources for collector {collector_id}:' , response.status_code, response.text) return [] def check_required_logs(sources): """Check if the required logs are present in the sources.""" required_logs = { '_security_events' : False , '_linux_system_events' : False , 'cron_logs' : False , 'dnf_rpm_logs' : False } for source in sources: if source[ 'sourceType' ] = = 'LocalFile' : name = source.get( 'name' , '') for key in required_logs.keys(): if name.endswith(key): required_logs[key] = True # Determine missing logs missing_logs = {log: "MISSING" if not present else "" for log, present in required_logs.items()} return missing_logs # Main execution if __name__ = = "__main__" : collectors = get_all_collectors() report_data = [] for collector in collectors: # Check if the collector's osName is 'Linux' if collector.get( 'osName' ) = = 'Linux' : collector_id = collector[ 'id' ] collector_name = collector[ 'name' ] print (f "Checking Linux Collector: ID: {collector_id}, Name: {collector_name}" ) sources = get_sources(collector_id) missing_logs = check_required_logs(sources) if any (missing_logs.values()): report_entry = { "Collector Name" : collector_name, "_security_events" : missing_logs[ '_security_events' ], "_linux_system_events" : missing_logs[ '_linux_system_events' ], "cron_logs" : missing_logs[ 'cron_logs' ], "dnf_rpm_logs" : missing_logs[ 'dnf_rpm_logs' ] } # print(f"Missing logs for collector {collector_name}: {report_entry}") report_data.append(report_entry) # Create a DataFrame and write to Excel df = pd.DataFrame(report_data, columns = [ "Collector Name" , "_security_events" , "_linux_system_events" , "cron_logs" , "dnf_rpm_logs" ]) # Generate the filename with current date and time if not df.empty: timestamp = pd.Timestamp.now().strftime( "%Y%m%d-%H%M" ) output_file = f "{timestamp}-missing_logs_report.xlsx" df.to_excel(output_file, index = False ) print (f "\nData written to {output_file}" ) else : print ( "\nAll collectors have the required logs." ) |