import
requests
from
requests.auth
import
HTTPBasicAuth
import
time
from
collections
import
defaultdict
import
cx_Oracle
import
pandas as pd
import
ipaddress
from
datetime
import
datetime
from
ipwhois
import
IPWhois
from
ipwhois.exceptions
import
IPDefinedError
from
config
import
access_id, access_key, oracle_username, oracle_password
cx_Oracle.init_oracle_client(lib_dir
=
r
"C:\Oracle\instantclient_21_15"
)
oracle_dsn
=
cx_Oracle.makedsn(
'cmdb_db.example.com'
,
1521
, service_name
=
'cmdb_db.example.com'
)
def
query_oracle_cmdb(strIPAddress):
with cx_Oracle.connect(user
=
oracle_username, password
=
oracle_password, dsn
=
oracle_dsn) as connection:
cursor
=
connection.cursor()
query
=
cursor.execute(query, [strIPAddress])
result
=
cursor.fetchone()
cursor.close()
return
result
if
result
else
("",)
*
8
def
get_ip_ownership(ip):
internal_networks
=
[
ipaddress.IPv4Network(
"10.0.0.0/8"
),
ipaddress.IPv4Network(
"172.16.0.0/12"
),
ipaddress.IPv4Network(
"192.168.0.0/16"
)
]
ip_obj
=
ipaddress.IPv4Address(ip)
if
any
(ip_obj
in
network
for
network
in
internal_networks):
return
"INTERNAL"
try
:
obj
=
IPWhois(ip)
result
=
obj.lookup_rdap(depth
=
1
)
ownership
=
result[
'network'
][
'name'
]
except
IPDefinedError:
ownership
=
"Reserved IP"
except
Exception as e:
print
(f
"Error looking up IP {ip}: {e}"
)
ownership
=
"UNKNOWN"
return
ownership
search_query
=
def
run_search_job(start_time, end_time):
search_job_data
=
{
'query'
: search_query,
'from'
: start_time,
'to'
: end_time,
'timeZone'
:
'UTC'
}
search_job_url
=
f
'{base_url}/search/jobs'
response
=
requests.post(
search_job_url,
auth
=
HTTPBasicAuth(access_id, access_key),
json
=
search_job_data
)
if
response.status_code !
=
202
:
print
(
'Error starting search job:'
, response.status_code, response.text)
return
None
job_id
=
response.json()[
'id'
]
print
(
'Search Job ID:'
, job_id)
job_status_url
=
f
'{search_job_url}/{job_id}'
while
True
:
response
=
requests.get(job_status_url, auth
=
HTTPBasicAuth(access_id, access_key))
status
=
response.json().get(
'state'
,
None
)
print
(
'Search Job Status:'
, status)
if
status
in
[
'DONE GATHERING RESULTS'
,
'CANCELLED'
,
'FAILED'
]:
break
time.sleep(
5
)
return
job_id
if
status
=
=
'DONE GATHERING RESULTS'
else
None
def
retrieve_results(job_id):
dst_counts
=
defaultdict(
int
)
results_url
=
f
'{base_url}/search/jobs/{job_id}/messages'
offset
=
0
limit
=
1000
while
True
:
params
=
{
'offset'
: offset,
'limit'
: limit}
try
:
response
=
requests.get(results_url, auth
=
HTTPBasicAuth(access_id, access_key), params
=
params, timeout
=
30
)
if
response.status_code
=
=
200
:
results
=
response.json()
messages
=
results.get(
'messages'
, [])
for
message
in
messages:
message_map
=
message[
'map'
]
dst
=
message_map.get(
'dst'
)
if
dst:
dst_counts[dst]
+
=
1
if
len
(messages) < limit:
break
offset
+
=
limit
else
:
print
(
'Error retrieving results:'
, response.status_code, response.text)
break
except
requests.exceptions.RequestException as e:
print
(f
'Error during request: {e}'
)
time.sleep(
5
)
continue
return
dst_counts
if
__name__
=
=
"__main__"
:
start_date_input
=
input
(
"Enter the start date (YYYY-MM-DD): "
)
try
:
start_time
=
datetime.strptime(start_date_input,
"%Y-%m-%d"
).strftime(
"%Y-%m-%dT00:00:00"
)
except
ValueError:
print
(
"Invalid date format. Please enter the date in YYYY-MM-DD format."
)
exit()
end_time
=
datetime.now().strftime(
"%Y-%m-%dT00:00:00"
)
job_id
=
run_search_job(start_time, end_time)
if
job_id:
dst_counts
=
retrieve_results(job_id)
data_for_excel
=
[]
print
(
"\nDestination IP Counts and Oracle Data:"
)
for
dst, count
in
dst_counts.items():
oracle_data
=
query_oracle_cmdb(dst)
ownership
=
get_ip_ownership(dst)
combined_data
=
(dst, count, ownership)
+
oracle_data
data_for_excel.append(combined_data)
print
(combined_data)
df
=
pd.DataFrame(data_for_excel, columns
=
[
"IP Address"
,
"Occurrence Count"
,
"Ownership"
,
"CMDB_Hostname"
,
"CMDB_Friendly Name"
,
"CMDB_Status"
,
"CMDB_Collection Time"
,
"CMDB_Retired By"
,
"CMDB_Retired Date"
,
"CMDB_Support Team"
,
"CMDB_Environment"
])
timestamp
=
datetime.now().strftime(
"%Y%m%d-%H%M"
)
output_file
=
f
"{timestamp}-sumo_oracle_data.xlsx"
df.to_excel(output_file, index
=
False
)
print
(f
"\nData written to {output_file}"
)
else
:
print
(
'Search job did not complete successfully.'
)