This repository was archived by the owner on Jul 25, 2025. It is now read-only.
forked from pwnlandia/shockpot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutil.py
More file actions
75 lines (70 loc) · 2.59 KB
/
util.py
File metadata and controls
75 lines (70 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import hpfeeds
from logger import LOGGER
import psycopg2
import socket
import requests
from requests.exceptions import Timeout, ConnectionError
import logging
logger = logging.getLogger(__name__)
def get_hpfeeds_client(config):
hpc = None
if config['hpfeeds.enabled'].lower() == 'true':
LOGGER.info('hpfeeds enabled, creating connection to {}:{}'.format(config['hpfeeds.host'], config['hpfeeds.port']))
hpc = hpfeeds.new(
config['hpfeeds.host'],
int(config['hpfeeds.port']),
config['hpfeeds.identity'],
config['hpfeeds.secret']
)
hpc.s.settimeout(0.01)
else:
LOGGER.info( 'hpfeeds is disabled')
return hpc
def get_postgresql_handler(config):
dbh = None
if config['postgresql.enabled'].lower() == 'true':
LOGGER.info('postgresql enabled, creating connection to {}:{}'.format(config['postgresql.host'], config['postgresql.port']))
dbh = psycopg2.connect(database=config['postgresql.database'], user=config['postgresql.user'], password=config['postgresql.password'], host=config['postgresql.host'], port=config['postgresql.port'])
cursor = dbh.cursor()
cursor.execute("""CREATE TABLE IF NOT EXISTS
connections (
connection SERIAL PRIMARY KEY,
method TEXT,
url TEXT,
path TEXT,
query_string TEXT,
headers TEXT,
source_ip TEXT,
source_port INTEGER,
dest_host TEXT,
dest_port INTEGER,
is_shellshock TEXT,
command TEXT,
command_data TEXT,
timestamp INTEGER
);""")
dbh.commit()
else:
LOGGER.info( 'postgresql is disabled')
return dbh
def valid_ip(ip):
try:
socket.inet_aton(ip)
return True
except:
return False
def get_ext_ip(urls):
for url in urls:
try:
req = requests.get(url)
if req.status_code == 200:
data = req.text.strip()
if data is None or not valid_ip(data):
continue
else:
return data
else:
raise ConnectionError
except (Timeout, ConnectionError) as e:
logger.warning('Could not fetch public ip from {0}'.format(url))
return None