{"id":11032,"date":"2024-07-05T12:59:00","date_gmt":"2024-07-05T17:59:00","guid":{"rendered":"https:\/\/www.rushworth.us\/lisa\/?p=11032"},"modified":"2024-07-05T12:59:00","modified_gmt":"2024-07-05T17:59:00","slug":"snmp-simulator","status":"publish","type":"post","link":"https:\/\/www.rushworth.us\/lisa\/?p=11032","title":{"rendered":"SNMP Simulator"},"content":{"rendered":"\n<h2 class=\"wp-block-heading\">Background<\/h2>\n\n\n\n<p>As communication between development and production platforms is limited for security and data integrity reasons, this creates a challenge when testing changes in development: we cannot access \u201creal world\u201d data with which to perform tests. Having a limited set of data in development means testing may not illuminate issues that occur at high volume or on a large scale.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Solution<\/h2>\n\n\n\n<p>While limiting communication between the prod and dev systems is reasonable, it would be beneficial to be able to replay production-like data within our development systems for testing purposes. While it is not cost effective to buy large network devices with thousands of interfaces for testing, the Python module <a href=\"https:\/\/github.com\/etingof\/snmpsim\">snmpsim<\/a> provides \u201ccanned responses\u201d that simulate real devise on the production network. For simplicity, I have a bash script that launches the SNMP responder.<\/p>\n\n\n\n<p>server03:snmpsim # cat ..\/_playback.sh<\/p>\n\n\n\n<p>#!\/bin\/bash<\/p>\n\n\n\n<p>snmpsimd.py &#8211;data-dir=\/opt\/snmp\/snmpsim\/data &#8211;cache-dir=\/opt\/snmp\/snmpsim\/cache &#8211;agent-udpv4-endpoint=0.0.0.0:161 &#8211;process-user=ljrsnmp &#8211;process-group=ljrsnmp<\/p>\n\n\n\n<p>This responder will replay data stored in the directory \/opt\/snmp\/snmpsim\/data \u2013 any file ending in snmprec will be included in the response, and the filename prior to .snmprec is the community string to access the response data. E.G. public.snmprec is the data for the public community string<\/p>\n\n\n\n<p>The response files are in the format OID|TAG|VALUE where OID is the OID number of the SNMP object, TAG is an integer defined at <a href=\"https:\/\/pypi.org\/project\/snmpsim\/0.2.3\/\">https:\/\/pypi.org\/project\/snmpsim\/0.2.3\/<\/a><\/p>\n\n\n\n<p>Valid tag values and their corresponding ASN.1\/SNMP types are:<\/p>\n\n\n\n<figure class=\"wp-block-table\"><table><tbody><tr><td><strong>ASN.1\/SNMP Type<\/strong><\/td><td><strong>Tag Value<\/strong><\/td><\/tr><tr><td>Integer32<\/td><td>2<\/td><\/tr><tr><td>Octet String<\/td><td>4<\/td><\/tr><tr><td>Null<\/td><td>5<\/td><\/tr><tr><td>Object Identifier<\/td><td>6<\/td><\/tr><tr><td>IP Address<\/td><td>64<\/td><\/tr><tr><td>Counter32<\/td><td>65<\/td><\/tr><tr><td>Gauge32<\/td><td>66<\/td><\/tr><tr><td>Time Ticks<\/td><td>67<\/td><\/tr><tr><td>Opaque<\/td><td>68<\/td><\/tr><tr><td>Counter65<\/td><td>70<\/td><\/tr><\/tbody><\/table><\/figure>\n\n\n\n<p>And the value is the data to be returned for the OID object. As an example:<\/p>\n\n\n\n<p>1.3.6.1.2.1.1.3.0|67|2293092270<\/p>\n\n\n\n<p>1.3.6.1.2.1.1.3.0 is the sysUpTime, the data type is TimeTicks, and the system up time is 2293092270 hundredths of a second. Or 6375 hours, 20 minutes, and 24 seconds.<\/p>\n\n\n\n<p>Items within the response file need to be listed in ascending order.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Generating Response Data<\/h2>\n\n\n\n<p>There are two methods for creating the data provided to an SNMP GET request. A response file can be created manually, populated with OID objects that should be included in the response as well as sample data. Alternatively, a network trace can be gathered from the production network and parsed to create the response file.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\">Manually Generated Response File<\/h3>\n\n\n\n<p>While you can literally type data into a response file, but it is far easier to use a script to generate sample data. \/opt\/snmp\/snmpsim\/_genData.py is an example of creating a response file for about 1,000 interfaces<\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: python; title: ; notranslate\" title=\"\">\nfrom datetime import datetime\nimport random\n\niRangeMax = 1000\n\ndictTags = {&#039;Integer&#039;: &#039;2&#039;, &#039;OctetString&#039;: &#039;4&#039;, &#039;NULL&#039;: &#039;5&#039;, &#039;ObjectIdentifier&#039;: &#039;6&#039;, &#039;IPAddress&#039;: &#039;64&#039;, &#039;Counter32&#039;: &#039;65&#039;, &#039;Gauge32&#039;: &#039;66&#039;, &#039;TimeTicks&#039;: &#039;67&#039;, &#039;Opaque&#039;: &#039;68&#039;,&#039;Counter64&#039;: &#039;70&#039;}  # Valid tags per https:\/\/pypi.org\/project\/snmpsim\/0.2.3\/\n\ntoday = datetime.now()\n\niftable_snmp_objects = &#x5B;\n    (&#039;1.3.6.1.2.1.2.2.1.1&#039;, &#039;Integer&#039;, lambda i: i),  # ifIndex\n    (&#039;1.3.6.1.2.1.2.2.1.2&#039;, &#039;OctetString&#039;, lambda i: f&quot;SampleInterface{i}&quot;),  # ifDescr\n    (&#039;1.3.6.1.2.1.2.2.1.3&#039;, &#039;Integer&#039;, lambda i: 6),  # ifType\n    (&#039;1.3.6.1.2.1.2.2.1.4&#039;, &#039;Integer&#039;, lambda i: 1500),  # ifMtu\n    (&#039;1.3.6.1.2.1.2.2.1.5&#039;, &#039;Gauge32&#039;, lambda i: 100000000),  # ifSpeed\n    (&#039;1.3.6.1.2.1.2.2.1.6&#039;, &#039;OctetString&#039;, lambda i: f&quot;00:00:00:00:{format(i, &#039;02x&#039;)&#x5B;:2]}:{format(i, &#039;02x&#039;)&#x5B;-2:]}&quot;),  # ifPhysAddress\n    (&#039;1.3.6.1.2.1.2.2.1.7&#039;, &#039;Integer&#039;, lambda i: 1),  # ifAdminStatus\n    (&#039;1.3.6.1.2.1.2.2.1.8&#039;, &#039;Integer&#039;, lambda i: 1),  # ifOperStatus\n    (&#039;1.3.6.1.2.1.2.2.1.9&#039;, &#039;TimeTicks&#039;, lambda i: int((datetime.now() - datetime(2024, random.randint(1, today.month), random.randint(1, today.day))).total_seconds()) * 100),  # ifLastChange\n    (&#039;1.3.6.1.2.1.2.2.1.10&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*50000)),  # ifInOctets\n    (&#039;1.3.6.1.2.1.2.2.1.11&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*50000)),  # ifInUcastPkts\n    (&#039;1.3.6.1.2.1.2.2.1.12&#039;, &#039;Counter32&#039;, lambda i: random.randint(0, 80)),  # ifInNUcastPkts\n    (&#039;1.3.6.1.2.1.2.2.1.13&#039;, &#039;Counter32&#039;, lambda i: random.randint(0, 80)),  # ifInDiscards\n    (&#039;1.3.6.1.2.1.2.2.1.14&#039;, &#039;Counter32&#039;, lambda i: random.randint(0, 80)),  # ifInErrors\n    (&#039;1.3.6.1.2.1.2.2.1.15&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*50000)),  # ifInUnknownProtos\n    (&#039;1.3.6.1.2.1.2.2.1.16&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*50000)),  # ifOutOctets\n    (&#039;1.3.6.1.2.1.2.2.1.17&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*50000)),  # ifOutUcastPkts\n    (&#039;1.3.6.1.2.1.2.2.1.18&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*50000)),  # ifOutNUcastPkts\n    (&#039;1.3.6.1.2.1.2.2.1.19&#039;, &#039;Counter32&#039;, lambda i: random.randint(0, 80)),  # ifOutDiscards\n    (&#039;1.3.6.1.2.1.2.2.1.20&#039;, &#039;Counter32&#039;, lambda i: random.randint(0, 80)),  # ifOutErrors\n]\n\nifxtable_snmp_objects = &#x5B;\n    (&#039;1.3.6.1.2.1.31.1.1.1.1&#039;, &#039;OctetString&#039;, lambda i: f&quot;SampleInterface{i}&quot;),  # ifName\n    (&#039;1.3.6.1.2.1.31.1.1.1.15&#039;, &#039;Gauge32&#039;, lambda i: &quot;100&quot;),  # ifHighSpeed\n    (&#039;1.3.6.1.2.1.31.1.1.1.6&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*50000)),  # ifHCInOctets\n    (&#039;1.3.6.1.2.1.31.1.1.1.10&#039;, &#039;Counter32&#039;, lambda i: random.randint(3, i*60000)),  # ifHCOutOctets\n]\n\n# Print IFTable data\nfor oid_base, tag_type, value_func in iftable_snmp_objects:\n    for i in range(1, iRangeMax+1):\n        value = value_func(i)\n        print(f&quot;{oid_base}.{i}|{dictTags.get(tag_type)}|{value}&quot;)\n\n# IP-MIB objects for managing IP addressing\n# ipAdEntAddr: The IP address to which this entry&#039;s addressing information pertains\nprint(f&quot;1.3.6.1.2.1.4.20.1.1|{dictTags.get(&#039;IPAddress&#039;)}|10.5.5.5&quot;)\n\n# ipAdEntIfIndex: The index value which uniquely identifies the interface to which this entry is applicable\nprint(f&quot;1.3.6.1.2.1.4.20.1.2|{dictTags.get(&#039;OctetString&#039;)}|1&quot;)\n\n# ipAdEntNetMask: The subnet mask associated with the IP address of this entry\nprint(f&quot;1.3.6.1.2.1.4.20.1.3|{dictTags.get(&#039;OctetString&#039;)}|255.255.255.0&quot;)\n\n# hrSWRunIndex: An index uniquely identifying a row in the hrSWRun table\nprint(f&quot;1.3.6.1.2.1.25.4.2.1.1.1|{dictTags.get(&#039;Integer&#039;)}|1&quot;)\n\n# hrSWRunName: The name of the software running on this device\nprint(f&quot;1.3.6.1.2.1.25.4.2.1.2.1|{dictTags.get(&#039;OctetString&#039;)}|LJRSNMPAgent&quot;)\n# hrSWRunID: The product ID of the software running on this device\nprint(f&quot;1.3.6.1.2.1.25.4.2.1.3.1|{dictTags.get(&#039;ObjectIdentifier&#039;)}|1.3.6.1.4.1.25709.55&quot;)\n\n# hrSWRunPath: The path of the software running on this device\nprint(f&quot;1.3.6.1.2.1.25.4.2.1.4.1|{dictTags.get(&#039;OctetString&#039;)}|\/opt\/snmp\/snmpsim\/_agent.sh&quot;)\n\n# hrSWRunParameters: Operational parameters for the software running on this device\nprint(f&quot;1.3.6.1.2.1.25.4.2.1.5.1|{dictTags.get(&#039;OctetString&#039;)}|-L&quot;)\n\n# hrSWRunType: The type of software running (e.g., operating system, application)\nprint(f&quot;1.3.6.1.2.1.25.4.2.1.6.1|{dictTags.get(&#039;Integer&#039;)}|4&quot;)\n\n# hrSWRunStatus: The status of this software (running, runnable, notRunnable, invalid)\nprint(f&quot;1.3.6.1.2.1.25.4.2.1.7.1|{dictTags.get(&#039;Integer&#039;)}|1&quot;)\n\n\nfor oid_base, tag_type, value_func in ifxtable_snmp_objects:\n    for i in range(1, iRangeMax+1):\n        value = value_func(i)\n        print(f&quot;{oid_base}.{i}|{dictTags.get(tag_type)}|{value}&quot;)\n\n<\/pre><\/div>\n\n\n<h3 class=\"wp-block-heading\">Network Capture<\/h3>\n\n\n\n<p>Even better, parse a network capture file.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\">Capture Data<\/h4>\n\n\n\n<p>On the server that gathers SNMP data from the host we want to simulate, use a network capture utility to gather the SNMP communication between the server and the desired device.<\/p>\n\n\n\n<p>tcpdump -i &lt;interface&gt; -w &lt;filename&gt;.pcap<\/p>\n\n\n\n<p>E.G. to record the communication with 10.5.171.114<\/p>\n\n\n\n<p>tcpdump &#8216;host 10.5.171.114 and (tcp port 161 or tcp port 162 or udp port 161 or udp port 162)&#8217; -w \/tmp\/ar.pcap<\/p>\n\n\n\n<p>Note \u2013 there Is no benefit to capturing more than one cycle of SNMP responses. If data is captured immediately, that means the devices were in the middle of a cycle. End the capture and start a new one shortly. There should be no packets captured for a bit, then packets during the SNMP polling cycle, and then another pause until the next cycle.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\">Parsing The Capture Data Into A Response File<\/h4>\n\n\n\n<p>The following script parses the capture file into an snmprec response file \u2013 note, I needed to use 2.6.0rc1 of scapy to parse SNMP data. The 2.5.0 release version failed to parse <em>most<\/em> of the packets which I believe is related to <a href=\"https:\/\/github.com\/secdev\/scapy\/issues\/3900\">https:\/\/github.com\/secdev\/scapy\/issues\/3900<\/a><\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: python; title: ; notranslate\" title=\"\">\nfrom scapy.all import rdpcap, SNMP\nfrom scapy.layers.inet import UDP\nfrom scapy.packet import Raw\nfrom scapy.layers.snmp import SNMP, SNMPvarbind, SNMPresponse, SNMPbulk\nfrom scapy.all import conf, load_layer\nfrom scapy.utils import hexdump\n\nfrom scapy.all import UDP, load_contrib\nfrom scapy.packet import bind_layers\n\nimport os\nfrom datetime import datetime\nimport argparse\n\n# Ensure Scapy&#039;s SNMP contributions are loaded\nload_contrib(&quot;snmp&quot;)\n\ndef sort_by_oid(listSNMPResponses):\n    &quot;&quot;&quot;\n    Sorts a list of &quot;OID|TAG|Value&quot; strings by the OID numerically and hierarchically.\n\n    :param listSNMPResponses: A list of &quot;OID|TAG|Value&quot; strings.\n    :return: A list of &quot;OID|TAG|Value&quot; strings sorted by OID.\n    &quot;&quot;&quot;\n    # Split each element into a tuple of (OID list, original string), converting OID to integers for proper comparison\n    oid_tuples = &#x5B;(list(map(int, element.split(&#039;|&#039;)&#x5B;0].split(&#039;.&#039;))), element) for element in listSNMPResponses]\n\n    # Sort the list of tuples by the OID part (the list of integers)\n    sorted_oid_tuples = sorted(oid_tuples, key=lambda x: x&#x5B;0])\n\n    # Extract the original strings from the sorted list of tuples\n    sorted_listSNMPResponses = &#x5B;element&#x5B;1] for element in sorted_oid_tuples]\n\n    return sorted_listSNMPResponses\n\nparser = argparse.ArgumentParser(description=&#039;This script converts an SNMP packet capture into a snmpsim response file&#039;)\nparser.add_argument(&#039;--filename&#039;, &#039;-f&#039;, help=&#039;The capture file to process&#039;, required=True)\n\nargs = parser.parse_args()\nstrFullCaptureFilePath = args.filename\nstrCaptureFilePath, strCaptureFileName = os.path.split(strFullCaptureFilePath)\n\n\n# Valid tags per https:\/\/pypi.org\/project\/snmpsim\/0.2.3\/\ndictTags = {&#039;ASN1_INTEGER&#039;: &#039;2&#039;, &#039;ASN1_STRING&#039;: &#039;4&#039;, &#039;ASN1_NULL&#039;: &#039;5&#039;, &#039;ASN1_OID&#039;: &#039;6&#039;, &#039;ASN1_IPADDRESS&#039;: &#039;64&#039;, &#039;ASN1_COUNTER32&#039;: &#039;65&#039;, &#039;ASN1_GAUGE32&#039;: &#039;66&#039;, &#039;ASN1_TIME_TICKS&#039;: &#039;67&#039;, &#039;Opaque&#039;: &#039;68&#039;,&#039;ASN1_COUNTER64&#039;: &#039;70&#039;}\n\nlistSNMPResponses = &#x5B;]\nlistSNMPResponses.append(&quot;1.3.6.1.2.1.25.4.2.1.1.1|2|1&quot;)\nlistSNMPResponses.append(&quot;1.3.6.1.2.1.25.4.2.1.2.1|4|LJRSNMPAgent&quot;)\nlistSNMPResponses.append(&quot;1.3.6.1.2.1.25.4.2.1.3.1|6|1.3.6.1.4.1.25709.55&quot;)\nlistSNMPResponses.append(&quot;1.3.6.1.2.1.25.4.2.1.4.1|4|\/opt\/snmp\/snmpsim\/_agent.sh&quot;)\nlistSNMPResponses.append(&quot;1.3.6.1.2.1.25.4.2.1.5.1|4|-L&quot;)\nlistSNMPResponses.append(&quot;1.3.6.1.2.1.25.4.2.1.6.1|2|4&quot;)\nlistSNMPResponses.append(&quot;1.3.6.1.2.1.25.4.2.1.7.1|2|1&quot;)\ni = 0\n\nif True:\n    packets = rdpcap(strFullCaptureFilePath)\n    # Packets are zero indexed, so packet 1 in script is packet 2 in Wireshark GUI\n    #for i in range(0,4):\n    for packet in packets:\n        print(f&quot;Working on packet {i}&quot;)\n        i = i + 1\n        if SNMP in packet:\n            snmp_layer = packet&#x5B;SNMP]\n            if isinstance(packet&#x5B;SNMP].PDU,SNMPresponse):\n                snmp_response = snmp_layer.getfield_and_val(&#039;PDU&#039;)&#x5B;1]\n                if hasattr(snmp_response, &#039;varbindlist&#039;) and snmp_response.varbindlist is not None:\n                    for varbind in snmp_response.varbindlist:\n                        strOID = varbind.oid.val if hasattr(varbind.oid, &#039;val&#039;) else str(varbind.oid)\n                        strValue = varbind.value.val if hasattr(varbind.value, &#039;val&#039;) else str(varbind.value)\n                        strType = type(varbind.value).__name__\n                        if dictTags.get(strType):\n                            iType = dictTags.get(strType)\n                        else:\n                            iType = strType\n\n                        if isinstance(strValue, bytes):\n                            print(f&quot;Decoding {strValue}&quot;)\n                            strValue = strValue.decode(&#039;utf-8&#039;,errors=&#039;ignore&#039;)\n\n                        print(f&quot;OID: {strOID}, Type: {strType}, Tag: {iType}, Value: {strValue}&quot;)\n                        listSNMPResponses.append(f&quot;{strOID}|{iType}|{strValue}&quot;)\n            else:\n                print(f&quot;Not a response -- type is {type(packet&#x5B;SNMP].PDU)}&quot;)\n        elif Raw in packet:\n            print(f&quot;I have a raw packet at {i}&quot;)\n        else:\n            print(dir(packet))\n            print(f&quot;No SNMP or Raw in {i}: {packet}&quot;)\n\n# Sort by OID numbers\nlistSortedSNMPResponses = sort_by_oid(listSNMPResponses)\nf = open(f&#039;\/opt\/snmp\/snmpsim\/data\/{datetime.now().strftime(&quot;%Y%m%d&quot;)}-{strCaptureFileName.rsplit(&quot;.&quot;, 1)&#x5B;0]}.deactivated&#039;, &quot;w&quot;)\nfor strSNMPResponse in listSortedSNMPResponses:\n    print(strSNMPResponse)\n    f.write(strSNMPResponse)\n    f.write(&quot;\\n&quot;)\nf.close()\n\n<\/pre><\/div>\n\n\n<p>This will create an snmpsim response file at \/opt\/snmp\/snmpsim\/data named as the capture file prefixed with the current year, month, and date. I.E. My ar.cap file results are \/opt\/snmp\/snmpsim\/data\/20240705-ar.deactivated \u2013 you can then copy the file to whatever community string you want \u2013 cp 20240705-ar.deactivated CommunityString.snmprec<\/p>\n\n\n\n<p><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Background As communication between development and production platforms is limited for security and data integrity reasons, this creates a challenge when testing changes in development: we cannot access \u201creal world\u201d data with which to perform tests. Having a limited set of data in development means testing may not illuminate issues that occur at high volume &hellip;<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1945,30],"tags":[664,2028],"class_list":["post-11032","post","type-post","status-publish","format-standard","hentry","category-python","category-system-administration","tag-python","tag-snmp"],"_links":{"self":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11032","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=11032"}],"version-history":[{"count":1,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11032\/revisions"}],"predecessor-version":[{"id":11033,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=\/wp\/v2\/posts\/11032\/revisions\/11033"}],"wp:attachment":[{"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=11032"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=11032"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.rushworth.us\/lisa\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=11032"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}