{"id":11346,"date":"2024-12-31T13:06:04","date_gmt":"2024-12-31T18:06:04","guid":{"rendered":"https:\/\/www.rushworth.us\/lisa\/?p=11346"},"modified":"2025-01-08T13:08:12","modified_gmt":"2025-01-08T18:08:12","slug":"sumo-logic-validating-collector-data-sources-via-api","status":"publish","type":"post","link":"https:\/\/www.rushworth.us\/lisa\/?p=11346","title":{"rendered":"Sumo Logic: Validating Collector Data Sources via API"},"content":{"rendered":"\n<p>This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation. <\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: python; title: ; notranslate\" title=\"\">\nimport requests\nfrom requests.auth import HTTPBasicAuth\nimport pandas as pd\nfrom config import access_id, access_key  # Import your credentials from config.py\n\n# Base URL for Sumo Logic API\nbase_url = &#039;https:\/\/api.sumologic.com\/api\/v1&#039;\n\ndef get_all_collectors():\n    &quot;&quot;&quot;Retrieve all collectors with pagination support.&quot;&quot;&quot;\n    collectors = &#x5B;]\n    limit = 1000  # Adjust as needed; check API docs for max limit\n    offset = 0\n\n    while True:\n        url = f&#039;{base_url}\/collectors?limit={limit}&amp;offset={offset}&#039;\n        response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))\n        if response.status_code == 200:\n            result = response.json()\n            collectors.extend(result.get(&#039;collectors&#039;, &#x5B;]))\n            if len(result.get(&#039;collectors&#039;, &#x5B;])) &lt; limit:\n                break  # Exit the loop if we received fewer than the limit, meaning it&#039;s the last page\n            offset += limit\n        else:\n            print(&#039;Error fetching collectors:&#039;, response.status_code, response.text)\n            break\n\n    return collectors\n\ndef get_sources(collector_id):\n    &quot;&quot;&quot;Retrieve sources for a specific collector.&quot;&quot;&quot;\n    url = f&#039;{base_url}\/collectors\/{collector_id}\/sources&#039;\n    response = requests.get(url, auth=HTTPBasicAuth(access_id, access_key))\n    if response.status_code == 200:\n        sources = response.json().get(&#039;sources&#039;, &#x5B;])\n        # print(f&quot;Log Sources for collector {collector_id}: {sources}&quot;)\n        return sources\n    else:\n        print(f&#039;Error fetching sources for collector {collector_id}:&#039;, response.status_code, response.text)\n        return &#x5B;]\n\ndef check_required_logs(sources):\n    &quot;&quot;&quot;Check if the required logs are present in the sources.&quot;&quot;&quot;\n    required_logs = {\n        &#039;_security_events&#039;: False,\n        &#039;_linux_system_events&#039;: False,\n        &#039;cron_logs&#039;: False,\n        &#039;dnf_rpm_logs&#039;: False\n    }\n\n    for source in sources:\n        if source&#x5B;&#039;sourceType&#039;] == &#039;LocalFile&#039;:\n            name = source.get(&#039;name&#039;, &#039;&#039;)\n            for key in required_logs.keys():\n                if name.endswith(key):\n                    required_logs&#x5B;key] = True\n\n    # Determine missing logs\n    missing_logs = {log: &quot;MISSING&quot; if not present else &quot;&quot; for log, present in required_logs.items()}\n    return missing_logs\n\n# Main execution\nif __name__ == &quot;__main__&quot;:\n    collectors = get_all_collectors()\n    report_data = &#x5B;]\n\n    for collector in collectors:\n        # Check if the collector&#039;s osName is &#039;Linux&#039;\n        if collector.get(&#039;osName&#039;) == &#039;Linux&#039;:\n            collector_id = collector&#x5B;&#039;id&#039;]\n            collector_name = collector&#x5B;&#039;name&#039;]\n            print(f&quot;Checking Linux Collector: ID: {collector_id}, Name: {collector_name}&quot;)\n\n            sources = get_sources(collector_id)\n            missing_logs = check_required_logs(sources)\n            if any(missing_logs.values()):\n                report_entry = {\n                    &quot;Collector Name&quot;: collector_name,\n                    &quot;_security_events&quot;: missing_logs&#x5B;&#039;_security_events&#039;],\n                    &quot;_linux_system_events&quot;: missing_logs&#x5B;&#039;_linux_system_events&#039;],\n                    &quot;cron_logs&quot;: missing_logs&#x5B;&#039;cron_logs&#039;],\n                    &quot;dnf_rpm_logs&quot;: missing_logs&#x5B;&#039;dnf_rpm_logs&#039;]\n                }\n                # print(f&quot;Missing logs for collector {collector_name}: {report_entry}&quot;)\n                report_data.append(report_entry)\n\n    # Create a DataFrame and write to Excel\n    df = pd.DataFrame(report_data, columns=&#x5B;\n        &quot;Collector Name&quot;, &quot;_security_events&quot;, &quot;_linux_system_events&quot;, &quot;cron_logs&quot;, &quot;dnf_rpm_logs&quot;\n    ])\n\n    # Generate the filename with current date and time\n    if not df.empty:\n        timestamp = pd.Timestamp.now().strftime(&quot;%Y%m%d-%H%M&quot;)\n        output_file = f&quot;{timestamp}-missing_logs_report.xlsx&quot;\n        df.to_excel(output_file, index=False)\n        print(f&quot;\\nData written to {output_file}&quot;)\n    else:\n        print(&quot;\\nAll collectors have the required logs.&quot;)\n<\/pre><\/div>","protected":false},"excerpt":{"rendered":"<p>This script is an example of using the Sumo Logic API to retrieve collector details. This particular script looks for Linux servers and validates that each collector has the desired log sources defined. Those that do not contain all desired sources are denoted for farther investigation.<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[2076],"tags":[664,2074,2075],"class_list":["post-11346","post","type-post","status-publish","format-standard","hentry","category-sumo-logic","tag-python","tag-sumo-logic","tag-sumo-logic-api"],"_links":{"self":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11346","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=11346"}],"version-history":[{"count":1,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11346\/revisions"}],"predecessor-version":[{"id":11347,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11346\/revisions\/11347"}],"wp:attachment":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=11346"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=11346"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=11346"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}