Skip to content

Commit 782f7bd

Browse files
committed
Update certstream
1 parent 18aa302 commit 782f7bd

File tree

6 files changed

+359
-23
lines changed

6 files changed

+359
-23
lines changed

.env

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ SEARX_COMMAND=-f
9898
# Please add the Watcher Docker containers local subnet network 10.10.10.0/24 in your host server NO_PROXY env variable.
9999

100100
# CertStream URL
101-
CERT_STREAM_URL=ws://certstream:8080
101+
# Use ws:// for internal Docker network
102+
CERT_STREAM_URL=ws://certstream:8080/
102103

103104
# If you have a proxy, please fill these variables
104105
HTTP_PROXY=
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
# coding=utf-8
2+
"""
3+
CertStream WebSocket Client
4+
Replaces the certstream library with a native WebSocket implementation
5+
that connects to certstream-server-go and supports enterprise proxy configuration.
6+
"""
7+
import json
8+
import logging
9+
import time
10+
import threading
11+
from django.conf import settings
12+
import websocket
13+
14+
logger = logging.getLogger('watcher.dns_finder.certstream')
15+
16+
17+
class CertStreamClient:
18+
"""
19+
WebSocket client for connecting to certstream-server-go.
20+
Handles automatic reconnection and proxy configuration.
21+
"""
22+
23+
def __init__(self, url, callback, reconnect_interval=5, ping_interval=30):
24+
"""
25+
Initialize the CertStream client.
26+
27+
:param url: WebSocket URL (e.g., ws://certstream:8080/)
28+
:param callback: Callback function to process certificate messages
29+
:param reconnect_interval: Seconds to wait before reconnecting (default: 5)
30+
:param ping_interval: Seconds between ping messages (default: 30)
31+
"""
32+
self.url = url
33+
self.callback = callback
34+
self.reconnect_interval = reconnect_interval
35+
self.ping_interval = ping_interval
36+
self.ws = None
37+
self.running = False
38+
self.thread = None
39+
40+
# Proxy configuration from environment
41+
self.http_proxy = getattr(settings, 'HTTP_PROXY', '')
42+
self.https_proxy = getattr(settings, 'HTTPS_PROXY', '')
43+
44+
logger.info(f"CertStream client initialized for URL: {url}")
45+
if self.http_proxy:
46+
logger.info(f"HTTP Proxy configured: {self.http_proxy}")
47+
if self.https_proxy:
48+
logger.info(f"HTTPS Proxy configured: {self.https_proxy}")
49+
50+
def _on_message(self, ws, message):
51+
"""
52+
Handle incoming WebSocket messages.
53+
54+
:param ws: WebSocket instance
55+
:param message: Raw message from certstream
56+
"""
57+
try:
58+
data = json.loads(message)
59+
message_type = data.get('message_type')
60+
61+
# Only process certificate_update messages
62+
if message_type == 'certificate_update':
63+
# Call the user callback with message and context (for compatibility)
64+
self.callback(data, None)
65+
elif message_type == 'heartbeat':
66+
logger.debug("Received heartbeat from certstream-server-go")
67+
else:
68+
logger.debug(f"Received message type: {message_type}")
69+
70+
except json.JSONDecodeError as e:
71+
logger.error(f"Failed to decode JSON message: {e}")
72+
except Exception as e:
73+
logger.error(f"Error processing message: {e}", exc_info=True)
74+
75+
def _on_error(self, ws, error):
76+
"""
77+
Handle WebSocket errors.
78+
79+
:param ws: WebSocket instance
80+
:param error: Error object
81+
"""
82+
logger.error(f"WebSocket error: {error}")
83+
84+
def _on_close(self, ws, close_status_code, close_msg):
85+
"""
86+
Handle WebSocket connection close.
87+
88+
:param ws: WebSocket instance
89+
:param close_status_code: Close status code
90+
:param close_msg: Close message
91+
"""
92+
logger.warning(f"WebSocket connection closed (code: {close_status_code}, msg: {close_msg})")
93+
94+
# Attempt to reconnect if still running
95+
if self.running:
96+
logger.info(f"Attempting to reconnect in {self.reconnect_interval} seconds...")
97+
time.sleep(self.reconnect_interval)
98+
self._connect()
99+
100+
def _on_open(self, ws):
101+
"""
102+
Handle WebSocket connection opened.
103+
104+
:param ws: WebSocket instance
105+
"""
106+
logger.info(f"Successfully connected to certstream-server-go at {self.url}")
107+
108+
# Start ping thread to keep connection alive
109+
def send_ping():
110+
while self.running and self.ws and self.ws.sock and self.ws.sock.connected:
111+
try:
112+
time.sleep(self.ping_interval)
113+
if self.running:
114+
self.ws.ping()
115+
logger.debug("Sent ping to certstream-server-go")
116+
except Exception as e:
117+
logger.error(f"Error sending ping: {e}")
118+
break
119+
120+
ping_thread = threading.Thread(target=send_ping, daemon=True)
121+
ping_thread.start()
122+
123+
def _connect(self):
124+
"""
125+
Establish WebSocket connection with proxy support.
126+
"""
127+
try:
128+
# Configure proxy for WebSocket
129+
proxy_config = {}
130+
if self.http_proxy:
131+
proxy_config = {
132+
"http_proxy_host": self._parse_proxy_host(self.http_proxy),
133+
"http_proxy_port": self._parse_proxy_port(self.http_proxy),
134+
}
135+
if self.https_proxy:
136+
proxy_config.update({
137+
"proxy_type": "http",
138+
})
139+
140+
# Enable debug logging for troubleshooting
141+
# websocket.enableTrace(True)
142+
143+
# Create WebSocket connection
144+
self.ws = websocket.WebSocketApp(
145+
self.url,
146+
on_message=self._on_message,
147+
on_error=self._on_error,
148+
on_close=self._on_close,
149+
on_open=self._on_open
150+
)
151+
152+
# Run WebSocket with proxy configuration
153+
if proxy_config:
154+
logger.info(f"Connecting through proxy: {proxy_config}")
155+
self.ws.run_forever(
156+
ping_interval=self.ping_interval,
157+
ping_timeout=10,
158+
**proxy_config
159+
)
160+
else:
161+
self.ws.run_forever(
162+
ping_interval=self.ping_interval,
163+
ping_timeout=10
164+
)
165+
166+
except Exception as e:
167+
logger.error(f"Failed to connect to certstream: {e}", exc_info=True)
168+
if self.running:
169+
logger.info(f"Retrying connection in {self.reconnect_interval} seconds...")
170+
time.sleep(self.reconnect_interval)
171+
self._connect()
172+
173+
def _parse_proxy_host(self, proxy_url):
174+
"""
175+
Extract hostname from proxy URL.
176+
177+
:param proxy_url: Proxy URL (e.g., http://proxy.company.com:8080)
178+
:return: Hostname
179+
"""
180+
try:
181+
from urllib.parse import urlparse
182+
parsed = urlparse(proxy_url)
183+
return parsed.hostname
184+
except Exception:
185+
# Fallback: simple parsing
186+
proxy_url = proxy_url.replace('http://', '').replace('https://', '')
187+
if ':' in proxy_url:
188+
return proxy_url.split(':')[0]
189+
return proxy_url
190+
191+
def _parse_proxy_port(self, proxy_url):
192+
"""
193+
Extract port from proxy URL.
194+
195+
:param proxy_url: Proxy URL (e.g., http://proxy.company.com:8080)
196+
:return: Port number
197+
"""
198+
try:
199+
from urllib.parse import urlparse
200+
parsed = urlparse(proxy_url)
201+
return parsed.port or 8080
202+
except Exception:
203+
# Fallback: simple parsing
204+
proxy_url = proxy_url.replace('http://', '').replace('https://', '')
205+
if ':' in proxy_url:
206+
return int(proxy_url.split(':')[1])
207+
return 8080
208+
209+
def start(self):
210+
"""
211+
Start the CertStream client in a background thread.
212+
"""
213+
if self.running:
214+
logger.warning("CertStream client is already running")
215+
return
216+
217+
self.running = True
218+
self.thread = threading.Thread(target=self._connect, daemon=True)
219+
self.thread.start()
220+
logger.info("CertStream client started in background thread")
221+
222+
def stop(self):
223+
"""
224+
Stop the CertStream client and close the connection.
225+
"""
226+
logger.info("Stopping CertStream client...")
227+
self.running = False
228+
229+
if self.ws:
230+
try:
231+
self.ws.close()
232+
except Exception as e:
233+
logger.error(f"Error closing WebSocket: {e}")
234+
235+
if self.thread and self.thread.is_alive():
236+
self.thread.join(timeout=5)
237+
238+
logger.info("CertStream client stopped")
239+
240+
241+
def listen_for_events(callback, url=None, skip_heartbeats=True):
242+
"""
243+
Listen for certificate transparency events from certstream-server-go.
244+
This function provides a compatible API with the original certstream library.
245+
246+
:param callback: Callback function(message, context) to process certificate updates
247+
:param url: WebSocket URL (defaults to settings.CERT_STREAM_URL)
248+
:param skip_heartbeats: Whether to skip heartbeat messages (default: True)
249+
"""
250+
if url is None:
251+
url = settings.CERT_STREAM_URL
252+
253+
logger.info(f"Starting CertStream listener for {url}")
254+
255+
# Create and start the client
256+
client = CertStreamClient(url, callback)
257+
258+
try:
259+
client.start()
260+
261+
# Keep the main thread alive
262+
while True:
263+
time.sleep(1)
264+
265+
except KeyboardInterrupt:
266+
logger.info("Received keyboard interrupt, shutting down...")
267+
client.stop()
268+
except Exception as e:
269+
logger.error(f"Fatal error in CertStream listener: {e}", exc_info=True)
270+
client.stop()
271+
raise

Watcher/Watcher/dns_finder/core.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import tzlocal
1111
from .models import Alert, DnsMonitored, DnsTwisted, Subscriber, KeywordMonitored
1212
from common.models import LegitimateDomain
13-
import certstream
13+
from .certstream_client import listen_for_events
1414
from common.core import send_app_specific_notifications
1515
from common.core import send_app_specific_notifications_group
1616
from common.core import send_only_thehive_notifications
@@ -116,9 +116,10 @@ def print_callback(message, context):
116116

117117
def main_certificate_transparency():
118118
"""
119-
Launch CertStream scan.
119+
Launch CertStream scan using certstream-server-go via WebSocket.
120120
"""
121-
certstream.listen_for_events(print_callback, url=settings.CERT_STREAM_URL)
121+
logger.info(f"Connecting to certstream-server-go at {settings.CERT_STREAM_URL}")
122+
listen_for_events(print_callback, url=settings.CERT_STREAM_URL)
122123

123124

124125
def main_dns_twist():

Watcher/requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Web Framework and Application Enhancements
22
mysqlclient==2.2.7
3-
django==5.2.11
3+
django==5.2.9
44
django-mysql==4.19.0
55
djangorestframework==3.16.1
66
django-rest-knox==4.2.0
@@ -25,7 +25,7 @@ python-tlsh==4.5.0
2525
# Networking and DNS Utilities
2626
dnspython==2.8.0
2727
dnstwist==20250130
28-
certstream==1.12
28+
websocket-client==1.8.0
2929
tldextract==5.3.0
3030

3131
# Environment Configuration

certstream-config.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# certstream-server-go Configuration for Watcher
2+
# This configuration ensures optimal performance and proxy compatibility
3+
4+
webserver:
5+
listen_addr: "0.0.0.0"
6+
listen_port: 8080
7+
8+
# WebSocket endpoints
9+
full_url: "/full-stream"
10+
lite_url: "/"
11+
domains_only_url: "/domains-only"
12+
13+
# Enable compression for bandwidth optimization
14+
compression_enabled: true
15+
16+
# TLS/SSL certificates (optional - not needed for internal Docker network)
17+
cert_path: ""
18+
cert_key_path: ""
19+
20+
# IP configuration
21+
real_ip: false
22+
whitelist: []
23+
24+
# Prometheus metrics (optional)
25+
prometheus:
26+
enabled: false
27+
listen_addr: "0.0.0.0"
28+
listen_port: 8081
29+
metrics_url: "/metrics"
30+
expose_system_metrics: false
31+
cert_path: ""
32+
cert_key_path: ""
33+
real_ip: false
34+
whitelist: []
35+
36+
general:
37+
# Use Google's default CT logs (recommended)
38+
disable_default_logs: false
39+
40+
# Add custom CT logs if needed
41+
additional_logs: []
42+
43+
# Buffer sizes for performance tuning
44+
buffer_sizes:
45+
websocket: 300
46+
ctlog: 1000
47+
broadcastmanager: 10000
48+
49+
# Cleanup old logs automatically
50+
drop_old_logs: true
51+
52+
# Recovery configuration (optional)
53+
recovery:
54+
enabled: false
55+
ct_index_file: "./ct_index.json"

0 commit comments

Comments
 (0)