-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscanner.py
More file actions
78 lines (63 loc) · 2.21 KB
/
scanner.py
File metadata and controls
78 lines (63 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import argparse
import json
import os
import subprocess
import logging
logging.basicConfig(level=logging.INFO)
from google.cloud import pubsub_v1
from utils import download_file, upload_file, set_metadata
from clamav_updater import update_clamav_db
LANDING_BUCKET = os.getenv("LANDING_BUCKET")
CLEAN_BUCKET = os.getenv("CLEAN_BUCKET")
INFECTED_TOPIC = os.getenv("INFECTED_TOPIC")
def scan_with_clamav(file_path):
logging.info(f"Scanning file: {file_path}")
# Use clamdscan to connect to the daemon
result = subprocess.run(
["clamdscan", "--fdpass", file_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = result.stdout
logging.info(output)
# clamdscan returns 0 for clean, 1 for infected
if result.returncode == 0:
return "CLEAN"
else:
return "INFECTED"
def notify_infected(filename, reason):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
os.getenv("PROJECT_ID"), INFECTED_TOPIC
)
payload = json.dumps({"file": filename, "reason": reason})
publisher.publish(topic_path, payload.encode("utf-8"))
logging.info("Infected file notification sent.")
def handle_scan(event):
data = json.loads(event["message"]["data"].decode("utf-8"))
bucket = data["bucket"]
name = data["name"]
logging.info(f"Processing file: gs://{bucket}/{name}")
local = f"/tmp/{name.split('/')[-1]}"
download_file(bucket, name, local)
status = scan_with_clamav(local)
set_metadata(bucket, name, {"scan": status})
if status == "CLEAN":
upload_file(CLEAN_BUCKET, local, name)
logging.info("File is clean. Uploaded to clean bucket.")
else:
notify_infected(name, "Virus detected")
logging.info("File is infected. Notification sent.")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode", choices=["scan", "update-db"], default="scan")
parser.add_argument("--event", help="Pub/Sub message")
args = parser.parse_args()
if args.mode == "scan":
event = json.loads(args.event)
handle_scan(event)
elif args.mode == "update-db":
update_clamav_db()
if __name__ == "__main__":
main()