Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/self-hosted.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ jobs:
SOURCE_COMMIT=${{ github.sha }}
TARGETARCH=${{ matrix.platform }}
ghcr: true
publish_on_pr: true
tag_nightly: false
tag_latest: false

Expand Down
29 changes: 29 additions & 0 deletions build-local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env bash
set -euo pipefail

# Accept image name as first argument
if [ $# -eq 0 ]; then
echo "Usage: $0 <image-name>" >&2
exit 1
fi

IMAGE_NAME="$1"

echo "Building: ${IMAGE_NAME}" >&2

# Build frontend assets
pnpm install --frozen-lockfile --production
python3 -m tools.fast_editable --path .
python3 -m sentry.build.main

# Build Docker image
docker build \
-f self-hosted/Dockerfile \
-t "${IMAGE_NAME}" \
--platform linux/amd64 \
--build-arg SOURCE_COMMIT="$(git rev-parse HEAD)" \
--build-arg TARGETARCH=amd64 \
.

# Output the image name for use in other scripts
echo "${IMAGE_NAME}"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ dependencies = [
"sentry-forked-email-reply-parser>=0.5.12.post1",
"sentry-kafka-schemas>=2.1.16",
"sentry-ophio>=1.1.3",
"sentry-protos>=0.4.10",
"sentry-protos @ git+https://github.com/getsentry/sentry-protos.git@george/push-broker-worker#subdirectory=py", # "sentry-protos>=0.4.10",
"sentry-redis-tools>=0.5.0",
"sentry-relay>=0.9.22",
"sentry-sdk[http2]>=2.47.0",
Expand Down
1 change: 1 addition & 0 deletions self-hosted/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ RUN set -x \
# uwsgi-dogstatsd
&& buildDeps=" \
gcc \
git \
libpcre2-dev \
wget \
zlib1g-dev \
Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2630,6 +2630,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
"taskworker-usage-dlq": "default",
"taskworker-workflows-engine": "default",
"taskworker-workflows-engine-dlq": "default",
"test-topic": "default",
}


Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/types/kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class Topic(Enum):
TASKWORKER_USAGE_DLQ = "taskworker-usage-dlq"
TASKWORKER_WORKFLOWS_ENGINE = "taskworker-workflows-engine"
TASKWORKER_WORKFLOWS_ENGINE_DLQ = "taskworker-workflows-engine-dlq"
TEST_TOPIC = "test-topic"


class ConsumerDefinition(TypedDict, total=False):
Expand Down
9 changes: 9 additions & 0 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ def worker(ignore_unknown_queues: bool, **options: Any) -> None:
help="The number of seconds before touching the health check file",
default=taskworker_constants.DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH,
)
@click.option(
"--grpc-port",
help="Port for the gRPC server to listen on. Will try subsequent ports if unavailable.",
default=50052,
type=int,
)
@log_options()
@configuration
def taskworker(**options: Any) -> None:
Expand All @@ -324,6 +330,7 @@ def run_taskworker(
processing_pool_name: str,
health_check_file_path: str | None,
health_check_sec_per_touch: float,
grpc_port: int,
**options: Any,
) -> None:
"""
Expand All @@ -347,6 +354,7 @@ def run_taskworker(
processing_pool_name=processing_pool_name,
health_check_file_path=health_check_file_path,
health_check_sec_per_touch=health_check_sec_per_touch,
grpc_port=grpc_port,
**options,
)
exitcode = worker.start()
Expand Down Expand Up @@ -419,6 +427,7 @@ def taskbroker_send_tasks(

KAFKA_CLUSTERS["default"]["common"]["bootstrap.servers"] = bootstrap_servers
if kafka_topic and namespace:
print(f"overriding {namespace} to route to {kafka_topic}")
options.set("taskworker.route.overrides", {namespace: kafka_topic})

try:
Expand Down
86 changes: 86 additions & 0 deletions src/sentry/taskworker/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import grpc
from google.protobuf.message import Message
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
AddWorkerRequest,
FetchNextTask,
GetTaskRequest,
RemoveWorkerRequest,
SetTaskStatusRequest,
)
from sentry_protos.taskbroker.v1.taskbroker_pb2_grpc import ConsumerServiceStub
Expand Down Expand Up @@ -137,10 +139,12 @@ def __init__(
health_check_settings: HealthCheckSettings | None = None,
rpc_secret: str | None = None,
grpc_config: str | None = None,
port: int = 50052,
) -> None:
assert len(hosts) > 0, "You must provide at least one RPC host to connect to"
self._hosts = hosts
self._rpc_secret = rpc_secret
self._port = port

self._grpc_options: list[tuple[str, Any]] = [
("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE)
Expand Down Expand Up @@ -304,6 +308,7 @@ def update_task(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=fetch_next_task,
address=f"http://127.0.0.1:{self._port}",
)

try:
Expand All @@ -317,8 +322,24 @@ def update_task(
)

with metrics.timer("taskworker.update_task.rpc", tags={"host": processing_result.host}):
logger.debug(
"Calling set task status",
extra={
"task_id": processing_result.task_id,
"status": processing_result.status,
"host": processing_result.host,
"receive_timestamp": processing_result.receive_timestamp,
},
)
start_time = time.time()
response = self._host_to_stubs[processing_result.host].SetTaskStatus(request)
duration_ms = (time.time() - start_time) * 1000
logger.debug(
"Done setting task status",
extra={"duration_ms": duration_ms},
)
except grpc.RpcError as err:
logger.warning("Failed to perform RPC - %s", err)
metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "SetTaskStatus", "status": err.code().name},
Expand All @@ -342,3 +363,68 @@ def update_task(
receive_timestamp=time.monotonic(),
)
return None

def add_worker(self, host: str, address: str) -> None:
"""
Register this worker with a taskbroker.

Sends an AddWorker message to notify the broker that this worker
is available to receive tasks.
"""
# Ensure we have a connection to this host
if host not in self._host_to_stubs:
self._host_to_stubs[host] = self._connect_to_host(host)

request = AddWorkerRequest(address=address)

try:
with metrics.timer("taskworker.add_worker.rpc", tags={"host": host}):
self._host_to_stubs[host].AddWorker(request)
logger.info(
"taskworker.client.add_worker.success",
extra={"host": host, "address": address},
)
metrics.incr("taskworker.client.add_worker.success", tags={"host": host})
except grpc.RpcError as err:
logger.warning(
"taskworker.client.add_worker.failed",
extra={"host": host, "error": str(err), "status": err.code().name},
)
metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "AddWorker", "status": err.code().name},
)

def remove_worker(self, host: str, address: str) -> None:
"""
Unregister this worker from a taskbroker.

Sends a RemoveWorker message to notify the broker that this worker
is shutting down and should no longer receive tasks.
"""
if host not in self._host_to_stubs:
logger.warning(
"taskworker.client.remove_worker.unknown_host",
extra={"host": host},
)
return

request = RemoveWorkerRequest(address=address)

try:
with metrics.timer("taskworker.remove_worker.rpc", tags={"host": host}):
self._host_to_stubs[host].RemoveWorker(request)
logger.info(
"taskworker.client.remove_worker.success",
extra={"host": host, "address": address},
)
metrics.incr("taskworker.client.remove_worker.success", tags={"host": host})
except grpc.RpcError as err:
logger.warning(
"taskworker.client.remove_worker.failed",
extra={"host": host, "error": str(err), "status": err.code().name},
)
metrics.incr(
"taskworker.client.rpc_error",
tags={"method": "RemoveWorker", "status": err.code().name},
)
1 change: 1 addition & 0 deletions src/sentry/taskworker/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def _handle_produce_future(self, future: ProducerFuture, tags: dict[str, str]) -

def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) -> None:
topic = self.router.route_namespace(self.name)
print(f"sending task to topic {topic} in namespace {self.name}")

with sentry_sdk.start_span(
op=OP.QUEUE_PUBLISH,
Expand Down
19 changes: 12 additions & 7 deletions src/sentry/taskworker/tasks/examples.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import logging
import time
from time import sleep
from typing import Any

from sentry.taskworker.constants import CompressionType
from sentry.taskworker.namespaces import exampletasks
from sentry.taskworker.retry import LastAction, NoRetriesRemainingError, Retry, RetryTaskError
from sentry.taskworker.retry import retry_task as retry_task_helper
Expand Down Expand Up @@ -52,8 +52,12 @@ def will_retry(failure: str) -> None:

@exampletasks.register(name="examples.simple_task")
def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(0.1)
logger.debug("simple_task complete")
logger.info("Starting simple task...")

sleep_time = 0.1
time.sleep(sleep_time)

logger.info("Simple task complete!")


@exampletasks.register(
Expand Down Expand Up @@ -90,7 +94,8 @@ def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str,
logger.debug("timed_task complete")


@exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD)
def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(0.1)
logger.debug("simple_task_compressed complete")
# @exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD)
# def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None:
# sleep(0.1)
# logger.debug("simple_task_compressed complete")
# logger.debug("simple_task_compressed complete")
Loading
Loading