Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions api_app/analyzers_manager/migrations/0188_ja4dbentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Django 4.2.27 on 2026-03-27 00:00

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("analyzers_manager", "0187_remove_dehashed_analyzer"),
]

operations = [
migrations.CreateModel(
name="Ja4DBEntry",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("updated_at", models.DateTimeField(auto_now=True)),
("fingerprint_type", models.CharField(db_index=True, max_length=20)),
("fingerprint_value", models.CharField(db_index=True, max_length=1024)),
("details", models.JSONField(default=dict)),
],
options={
"verbose_name": "JA4 DB Entry",
"verbose_name_plural": "JA4 DB Entries",
},
),
]
13 changes: 13 additions & 0 deletions api_app/analyzers_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,3 +430,16 @@ class Meta:

def __str__(self):
return f"{self.ip_or_subnet} - {self.list_name}"


class Ja4DBEntry(LocalAnalyzerDBEntry):
fingerprint_type = models.CharField(max_length=20, db_index=True)
fingerprint_value = models.CharField(max_length=1024, db_index=True)
details = models.JSONField(default=dict)

class Meta:
verbose_name = "JA4 DB Entry"
verbose_name_plural = "JA4 DB Entries"

def __str__(self):
return f"{self.fingerprint_type}: {self.fingerprint_value}"
123 changes: 56 additions & 67 deletions api_app/analyzers_manager/observable_analyzers/ja4_db.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,77 @@
import json
import logging
import os

import requests
from django.conf import settings
from django.db import transaction

from api_app.analyzers_manager import classes
from api_app.analyzers_manager.models import Ja4DBEntry

logger = logging.getLogger(__name__)


class Ja4DB(classes.ObservableAnalyzer):
"""
We are only checking JA4 "traditional" fingerprints here
We should support all the JAX types as well but it is difficult
to add them considering that
it is not easy to understand the format and how to avoid
to run this analyzer even in cases
where a ja4x has not been submitted.
This should probably require a rework where those fingerprints
are saved in a table/collection
"""

class NotJA4Exception(Exception):
pass

url = " https://ja4db.com/api/read/"

@classmethod
def location(cls) -> str:
db_name = "ja4_db.json"
return f"{settings.MEDIA_ROOT}/{db_name}"

def check_ja4_fingerprint(self, observable: str) -> str:
message = ""
try:
# https://github.com/FoxIO-LLC/ja4/blob/main/technical_details/README.md
if observable[0] not in ["t", "q"]:
# checks for protocol,
# TCP(t) and QUIC(q) are the only supported protocols
raise self.NotJA4Exception("only TCP and QUIC protocols are supported")
if observable[1:3] not in ["12", "13"]:
# checks for the version of the protocol
raise self.NotJA4Exception("procotol version wrong")
if observable[3] not in ["d", "i"]:
# SNI or no SNI
raise self.NotJA4Exception("SNI value not valid")
if not observable[4:8].isdigit():
# number of cipher suits and extensions
raise self.NotJA4Exception("cipher suite must be a number")
if len(observable) > 70 or len(observable) < 20:
raise self.NotJA4Exception("invalid length")
if not observable.count("_") >= 2:
raise self.NotJA4Exception("missing underscores")
except self.NotJA4Exception as e:
message = f"{self.observable_name} is not valid JA4 because {e}"
logger.info(message)

return message
url = "https://ja4db.com/api/read/"
fingerprint_fields = (
"ja4_fingerprint",
"ja4s_fingerprint",
"ja4h_fingerprint",
"ja4x_fingerprint",
"ja4t_fingerprint",
"ja4ts_fingerprint",
"ja4tscan_fingerprint",
)

@classmethod
def update(cls):
def update(cls) -> bool:
logger.info(f"Updating database from {cls.url}")
response = requests.get(url=cls.url)
response = requests.get(url=cls.url, timeout=30)
response.raise_for_status()
data = response.json()
database_location = cls.location()

with open(database_location, "w", encoding="utf-8") as f:
json.dump(data, f)
logger.info(f"Database updated at {database_location}")
db_entries = []
for item in data:
for fingerprint_type in cls.fingerprint_fields:
fingerprint_value = item.get(fingerprint_type)
if fingerprint_value:
db_entries.append(
Ja4DBEntry(
fingerprint_type=fingerprint_type,
fingerprint_value=fingerprint_value,
details=item,
)
)

with transaction.atomic():
Ja4DBEntry.objects.all().delete()
Ja4DBEntry.objects.bulk_create(db_entries, batch_size=1000)

logger.info(f"Updated {len(db_entries)} JA4 DB fingerprint entries")
return True

def run(self):
reason = self.check_ja4_fingerprint(self.observable_name)
if reason:
return {"not_supported": reason}
if not Ja4DBEntry.objects.exists():
logger.info("Ja4DBEntry table is empty, triggering update...")
try:
self.update()
except requests.RequestException as exc:
logger.exception("Failed to update JA4 DB entries from %s: %s", self.url, exc)
return {"error": f"Unable to update JA4 DB: {exc}"}
except Exception as exc:
logger.exception("Unexpected error while updating JA4 DB entries from %s: %s", self.url, exc)
return {"error": f"Unexpected JA4 DB update failure: {exc}"}

database_location = self.location()
if not os.path.exists(database_location):
logger.info(f"Database does not exist in {database_location}, initialising...")
self.update()
with open(database_location, "r") as f:
db = json.load(f)
for application in db:
if application["ja4_fingerprint"] == self.observable_name:
return application
matches = []
seen = set()
for details in Ja4DBEntry.objects.filter(fingerprint_value=self.observable_name).values_list(
"details", flat=True
):
serialized = json.dumps(details, sort_keys=True)
if serialized not in seen:
seen.add(serialized)
matches.append(details)
if len(matches) == 1:
return matches[0]
if matches:
return matches
return {"found": False}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from unittest.mock import patch

import requests

from api_app.analyzers_manager.models import Ja4DBEntry
from api_app.analyzers_manager.observable_analyzers.ja4_db import Ja4DB
from tests.api_app.analyzers_manager.unit_tests.observable_analyzers.base_test_class import (
BaseAnalyzerTest,
Expand Down Expand Up @@ -53,3 +56,85 @@ def get_mocked_response():
},
]
return patch("requests.get", return_value=MockUpResponse(sample_data, 200))

def test_run_matches_ja4h_fingerprint(self):
Ja4DBEntry.objects.create(
fingerprint_type="ja4h_fingerprint",
fingerprint_value="ge11cn20enus_60ca1bd65281_ac95b44401d9_8df6a44f726c",
details={
"application": "Chrome",
"ja4h_fingerprint": "ge11cn20enus_60ca1bd65281_ac95b44401d9_8df6a44f726c",
},
)

analyzer = self._setup_analyzer(
None,
"generic",
"ge11cn20enus_60ca1bd65281_ac95b44401d9_8df6a44f726c",
)

response = analyzer.run()

self.assertEqual(response["application"], "Chrome")

def test_run_matches_ja4t_fingerprint(self):
Ja4DBEntry.objects.create(
fingerprint_type="ja4t_fingerprint",
fingerprint_value="1024_2_1460_00",
details={"application": "Nmap", "ja4t_fingerprint": "1024_2_1460_00"},
)

analyzer = self._setup_analyzer(None, "generic", "1024_2_1460_00")

response = analyzer.run()

self.assertEqual(response["application"], "Nmap")

def test_run_matches_ja4x_fingerprint(self):
Ja4DBEntry.objects.create(
fingerprint_type="ja4x_fingerprint",
fingerprint_value="3082024b308201b3a00302010202143d0f5c",
details={
"application": "Example TLS Cert",
"ja4x_fingerprint": "3082024b308201b3a00302010202143d0f5c",
},
)

analyzer = self._setup_analyzer(None, "generic", "3082024b308201b3a00302010202143d0f5c")

response = analyzer.run()

self.assertEqual(response["application"], "Example TLS Cert")

def test_run_returns_all_matching_records(self):
fingerprint_value = "shared-fingerprint"
Ja4DBEntry.objects.create(
fingerprint_type="ja4h_fingerprint",
fingerprint_value=fingerprint_value,
details={"application": "Chrome", "ja4h_fingerprint": fingerprint_value},
)
Ja4DBEntry.objects.create(
fingerprint_type="ja4tscan_fingerprint",
fingerprint_value=fingerprint_value,
details={"application": "Scanner", "ja4tscan_fingerprint": fingerprint_value},
)

analyzer = self._setup_analyzer(None, "generic", fingerprint_value)

response = analyzer.run()

self.assertIsInstance(response, list)
self.assertEqual(len(response), 2)
self.assertEqual(
{item["application"] for item in response},
{"Chrome", "Scanner"},
)

def test_run_returns_error_when_initial_update_fails(self):
analyzer = self._setup_analyzer(None, "generic", "missing-fingerprint")
Ja4DBEntry.objects.all().delete()

with patch.object(Ja4DB, "update", side_effect=requests.RequestException("network down")):
response = analyzer.run()

self.assertEqual(response, {"error": "Unable to update JA4 DB: network down"})
32 changes: 17 additions & 15 deletions tests/test_crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,27 +235,27 @@ def test_tweetfeed_updater(self, mock_get=None):
"device": None,
"os": None,
"user_agent_string": """Mozilla/5.0
(Windows NT 10.0; Win64; x64)
AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/125.0.0.0
Safari/537.36""",
(Windows NT 10.0; Win64; x64)
AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/125.0.0.0
Safari/537.36""",
"certificate_authority": None,
"observation_count": 1,
"verified": False,
"notes": None,
"ja4_fingerprint": """t13d1517h2_
8daaf6152771_
b0da82dd1658""",
8daaf6152771_
b0da82dd1658""",
"ja4_fingerprint_string": """t13d1517h2_002f,0035,009c,
009d,1301,1302,1303,c013,c014,c02b,c02c,c02f,c030,cca8,
cca9_0005,000a,000b,000d,0012,0017,001b,0023,0029,002b,
002d,0033,4469,fe0d,ff01_0403,0804,0401,
0503,0805,0501,0806,0601""",
009d,1301,1302,1303,c013,c014,c02b,c02c,c02f,c030,cca8,
cca9_0005,000a,000b,000d,0012,0017,001b,0023,0029,002b,
002d,0033,4469,fe0d,ff01_0403,0804,0401,
0503,0805,0501,0806,0601""",
"ja4s_fingerprint": None,
"ja4h_fingerprint": """ge11cn20enus_
60ca1bd65281_
ac95b44401d9_
8df6a44f726c""",
60ca1bd65281_
ac95b44401d9_
8df6a44f726c""",
"ja4x_fingerprint": None,
"ja4t_fingerprint": None,
"ja4ts_fingerprint": None,
Expand All @@ -264,11 +264,13 @@ def test_tweetfeed_updater(self, mock_get=None):
],
200,
),
),
)
)
def test_ja4_db_updater(self, mock_get=None):
ja4_db.Ja4DB.update()
self.assertTrue(os.path.exists(ja4_db.Ja4DB.location()))
from api_app.analyzers_manager.models import Ja4DBEntry

self.assertTrue(Ja4DBEntry.objects.exists())

def test_quark_updater(self):
from quark.config import DIR_PATH
Expand Down
Loading