Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ worker:
test:
$(TEST_CMD) -s -v

test_:
$(TEST_CMD) -s -v --last-failed

test/k:
$(TEST_CMD) -s -v -k $(K)

Expand Down
52 changes: 50 additions & 2 deletions intbot/core/endpoints/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def github_webhook_endpoint(request):
extra={},
)
process_webhook.enqueue(str(wh.uuid))
return JsonResponse({"status": "ok"})
return JsonResponse({"status": "created", "guid": wh.uuid})

return HttpResponseNotAllowed("Only POST")

Expand All @@ -84,6 +84,54 @@ def verify_github_signature(request) -> str:
expected = "sha256=" + hashed.hexdigest()

if not hmac.compare_digest(expected, signature):
raise ValueError("Signature's don't match")
raise ValueError("Signatures don't match")

return signature


@csrf_exempt
def zammad_webhook_endpoint(request):
if request.method == "POST":
zammad_headers = {
k: v for k, v in request.headers.items() if k.startswith("X-Zammad")
}

try:
signature = verify_zammad_signature(request)
except ValueError as e:
return HttpResponseForbidden(e)

wh = Webhook.objects.create(
source="zammad",
meta=zammad_headers,
signature=signature,
content=json.loads(request.body),
extra={},
)
process_webhook.enqueue(str(wh.uuid))
return JsonResponse({"status": "created", "guid": wh.uuid})


return HttpResponseNotAllowed("Only POST")


def verify_zammad_signature(request) -> str:
"""Verify that the payload was sent by our zammad"""

if "X-Hub-Signature" not in request.headers:
raise ValueError("X-Hub-Signature is missing")

signature = request.headers["X-Hub-Signature"]

hashed = hmac.new(
settings.ZAMMAD_WEBHOOK_SECRET_TOKEN.encode("utf-8"),
msg=request.body,
digestmod=hashlib.sha1,
)

expected = "sha1=" + hashed.hexdigest()

if not hmac.compare_digest(expected, signature):
raise ValueError("Signatures don't match")

return signature
11 changes: 9 additions & 2 deletions intbot/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def process_webhook(wh_uuid: str):
elif wh.source == "github":
process_github_webhook(wh)

elif wh.source == "zammad":
process_zammad_webhook(wh)

else:
raise ValueError(f"Unsupported source {wh.source}")

Expand All @@ -32,8 +35,6 @@ def process_internal_webhook(wh: Webhook):
DiscordMessage.objects.create(
channel_id=channel.channel_id,
channel_name=channel.channel_name,
# channel_id=settings.DISCORD_TEST_CHANNEL_ID,
# channel_name=settings.DISCORD_TEST_CHANNEL_NAME,
content=f"Webhook content: {wh.content}",
# Mark as not sent - to be sent with the next batch
sent_at=None,
Expand Down Expand Up @@ -70,3 +71,9 @@ def process_github_webhook(wh: Webhook):
)
wh.processed_at = timezone.now()
wh.save()


def process_zammad_webhook(wh: Webhook):
# NOTE(artcz) Do nothing for now. Just a placeholder.
# Processing will come in the next PR.
return None
3 changes: 3 additions & 0 deletions intbot/intbot/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ def get(name) -> str:
GITHUB_EP2025_PROJECT_ID = get("GITHUB_EP2025_PROJECT_ID")
GITHUB_EM_PROJECT_ID = get("GITHUB_EM_PROJECT_ID")

# Zammad
ZAMMAD_WEBHOOK_SECRET_TOKEN = get("ZAMMAD_WEBHOOK_SECRET_TOKEN")

if DJANGO_ENV == "dev":
DEBUG = True
ALLOWED_HOSTS = ["127.0.0.1", "localhost"]
Expand Down
7 changes: 6 additions & 1 deletion intbot/intbot/urls.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from core.endpoints.basic import index
from core.endpoints.webhooks import github_webhook_endpoint, internal_webhook_endpoint
from core.endpoints.webhooks import (
github_webhook_endpoint,
internal_webhook_endpoint,
zammad_webhook_endpoint,
)
from django.contrib import admin
from django.urls import path

Expand All @@ -9,4 +13,5 @@
# Internal Webhooks
path("webhook/internal/", internal_webhook_endpoint),
path("webhook/github/", github_webhook_endpoint),
path("webhook/zammad/", zammad_webhook_endpoint),
]
151 changes: 147 additions & 4 deletions intbot/tests/test_webhooks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import hashlib
import hmac
import json

import pytest
from django.conf import settings
from core.models import Webhook
from django.conf import settings


@pytest.mark.django_db
Expand All @@ -14,7 +18,7 @@ def test_internal_wh_endpoint_checks_authorization_token(client):

response = client.post(
"/webhook/internal/",
json=webhook_body,
json.dumps(webhook_body),
content_type="application/json",
)

Expand All @@ -35,7 +39,7 @@ def test_internal_wh_endpoint_fails_with_bad_token(client):

response = client.post(
"/webhook/internal/",
json=webhook_body,
json.dumps(webhook_body),
content_type="application/json",
HTTP_AUTHORIZATION="random-incorrect-token",
)
Expand All @@ -57,7 +61,7 @@ def test_internal_wh_endpoint_works_with_correct_token(client):

response = client.post(
"/webhook/internal/",
json=webhook_body,
json.dumps(webhook_body),
content_type="application/json",
HTTP_AUTHORIZATION=settings.WEBHOOK_INTERNAL_TOKEN,
)
Expand All @@ -67,3 +71,142 @@ def test_internal_wh_endpoint_works_with_correct_token(client):
assert response["Content-Type"] == "application/json"
assert response.json()["status"] == "created"
assert response.json()["guid"] == str(wh.uuid)


@pytest.mark.django_db
def test_github_webhook_endpoint_checks_authorization_token(client):
webhook_body = {}
response = client.post(
"/webhook/github/",
json.dumps(webhook_body),
content_type="application/json",
)

assert response.status_code == 403
assert response.content == "X-Hub-Signature-256 is missing".encode("utf-8")

def sign_github_webhook(webhook_body):
hashed = hmac.new(
settings.GITHUB_WEBHOOK_SECRET_TOKEN.encode("utf-8"),
msg=json.dumps(webhook_body).encode("utf-8"),
digestmod=hashlib.sha256,
)
signature = "sha256=" + hashed.hexdigest()

return signature


@pytest.mark.django_db
def test_github_webhook_endpoint_fails_with_bad_token(client):
webhook_body = {
"event": "test1",
"content": {
"random": "content",
},
}

response = client.post(
"/webhook/github/",
json.dumps(webhook_body),
content_type="application/json",
headers={"X-Hub-Signature-256": "bad signature"},
)

assert response.status_code == 403
assert response.content == "Signatures don't match".encode("utf-8")
assert True


@pytest.mark.django_db
def test_github_webhook_endpoint_works_with_correct_token(client):
webhook_body = {
"event": "test1",
"content": {
"random": "content",
},
}

signature = sign_github_webhook(webhook_body)

response = client.post(
"/webhook/github/",
json.dumps(webhook_body),
content_type="application/json",
headers={"X-Hub-Signature-256": signature},
)
assert response.status_code == 200
wh = Webhook.objects.get()
assert response["Content-Type"] == "application/json"
assert response.json()["status"] == "created"
assert response.json()["guid"] == str(wh.uuid)
assert wh.source == "github"


def sign_zammad_webhook(webhook_body):
hashed = hmac.new(
settings.ZAMMAD_WEBHOOK_SECRET_TOKEN.encode("utf-8"),
msg=json.dumps(webhook_body).encode("utf-8"),
digestmod=hashlib.sha1,
)
signature = "sha1=" + hashed.hexdigest()

return signature


@pytest.mark.django_db
def test_zammad_webhook_endpoint_checks_authorization_token(client):
webhook_body = {}

response = client.post(
"/webhook/zammad/",
json.dumps(webhook_body),
content_type="application/json",
)

assert response.status_code == 403
assert response.content == "X-Hub-Signature is missing".encode("utf-8")


@pytest.mark.django_db
def test_zammad_webhook_endpoint_fails_with_bad_token(client):
webhook_body = {
"event": "test1",
"content": {
"random": "content",
},
}

response = client.post(
"/webhook/zammad/",
json.dumps(webhook_body),
content_type="application/json",
headers={"X-Hub-Signature": "bad signature"},
)

assert response.status_code == 403
assert response.content == "Signatures don't match".encode("utf-8")


@pytest.mark.django_db
def test_zammad_webhook_endpoint_works_with_correct_token(client):
webhook_body = {
"event": "test1",
"content": {
"random": "content",
},
}

signature = sign_zammad_webhook(webhook_body)

response = client.post(
"/webhook/zammad/",
json.dumps(webhook_body),
content_type="application/json",
headers={"X-Hub-Signature": signature},
)
assert response.status_code == 200
wh = Webhook.objects.get()
assert response["Content-Type"] == "application/json"
assert response.json()["status"] == "created"
assert response.json()["guid"] == str(wh.uuid)
assert wh.source == "zammad"