This repository was archived by the owner on Feb 8, 2019. It is now read-only.
forked from AlienVault-OTX/OTX-Apps-TAXII
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathOTXv2.py
More file actions
108 lines (90 loc) · 3.02 KB
/
OTXv2.py
File metadata and controls
108 lines (90 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python
import json
import logging
try:
# For Python2
from urllib2 import URLError, build_opener
except ImportError:
# For Python3
from urllib.error import URLError
from urllib.request import build_opener
logger = logging.getLogger("OTXv2")
class InvalidAPIKey(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class BadRequest(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class OTXv2(object):
"""
Main class to interact with the AlienVault OTX API.
"""
def __init__(self, key, server="http://otx.alienvault.com"):
self.key = key
self.server = server
def get(self, url):
request = build_opener()
request.addheaders = [('X-OTX-API-KEY', self.key)]
response = None
try:
response = request.open(url)
except URLError as e:
if e.code == 403:
raise InvalidAPIKey("Invalid API Key")
elif e.code == 400:
raise BadRequest("Bad Request")
data = response.read().decode('utf-8')
json_data = json.loads(data)
return json_data
def getall(self, limit=20):
pulses = []
uri = "{}/api/v1/pulses/subscribed?limit={}"
next = uri.format(self.server, limit)
while next:
json_data = self.get(next)
for r in json_data["results"]:
pulses.append(r)
next = json_data["next"]
return pulses
def getall_iter(self, limit=20):
pulses = []
uri = "{}/api/v1/pulses/subscribed?limit={}"
next = uri.format(self.server, limit)
while next:
json_data = self.get(next)
for r in json_data["results"]:
yield r
next = json_data["next"]
def getsince(self, mytimestamp, limit=20):
pulses = []
uri = "{}/api/v1/pulses/subscribed?limit={}&modified_since={}"
next = uri.format(self.server, limit, mytimestamp)
while next:
json_data = self.get(next)
for r in json_data["results"]:
pulses.append(r)
next = json_data["next"]
return pulses
def getsince_iter(self, mytimestamp, limit=20):
pulses = []
uri = "{}/api/v1/pulses/subscribed?limit={}&modified_since={}"
next = uri.format(self.server, limit, mytimestamp)
while next:
json_data = self.get(next)
for r in json_data["results"]:
yield r
next = json_data["next"]
def getevents_since(self, mytimestamp, limit=20):
events = []
uri = "{}/api/v1/pulses/events?limit={}&since={}"
next = uri.format(self.server, limit, mytimestamp)
while next:
json_data = self.get(next)
for r in json_data["results"]:
events.append(r)
next = json_data["next"]
return events