diff --git a/.github/workflows/self-hosted.yml b/.github/workflows/self-hosted.yml index 2fcf4b90fb4fc5..e21bac1ecfc86c 100644 --- a/.github/workflows/self-hosted.yml +++ b/.github/workflows/self-hosted.yml @@ -91,6 +91,7 @@ jobs: SOURCE_COMMIT=${{ github.sha }} TARGETARCH=${{ matrix.platform }} ghcr: true + publish_on_pr: true tag_nightly: false tag_latest: false diff --git a/build-local.sh b/build-local.sh new file mode 100755 index 00000000000000..764fe1f873a65e --- /dev/null +++ b/build-local.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Accept image name as first argument +if [ $# -eq 0 ]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +IMAGE_NAME="$1" + +echo "Building: ${IMAGE_NAME}" >&2 + +# Build frontend assets +pnpm install --frozen-lockfile --production +python3 -m tools.fast_editable --path . +python3 -m sentry.build.main + +# Build Docker image +docker build \ + -f self-hosted/Dockerfile \ + -t "${IMAGE_NAME}" \ + --platform linux/amd64 \ + --build-arg SOURCE_COMMIT="$(git rev-parse HEAD)" \ + --build-arg TARGETARCH=amd64 \ + . + +# Output the image name for use in other scripts +echo "${IMAGE_NAME}" diff --git a/pyproject.toml b/pyproject.toml index 99989711042109..0d32abb3558f97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,7 @@ dependencies = [ "sentry-forked-email-reply-parser>=0.5.12.post1", "sentry-kafka-schemas>=2.1.16", "sentry-ophio>=1.1.3", - "sentry-protos>=0.4.10", + "sentry-protos @ git+https://github.com/getsentry/sentry-protos.git@george/push-broker-worker#subdirectory=py", # "sentry-protos>=0.4.10", "sentry-redis-tools>=0.5.0", "sentry-relay>=0.9.22", "sentry-sdk[http2]>=2.47.0", diff --git a/self-hosted/Dockerfile b/self-hosted/Dockerfile index 2d14e98e41ef2f..4eea48f56f9548 100644 --- a/self-hosted/Dockerfile +++ b/self-hosted/Dockerfile @@ -46,6 +46,7 @@ RUN set -x \ # uwsgi-dogstatsd && buildDeps=" \ gcc \ + git \ libpcre2-dev \ wget \ zlib1g-dev \ diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 8d4396231d65fe..f5f35932d5f49e 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2630,6 +2630,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: "taskworker-usage-dlq": "default", "taskworker-workflows-engine": "default", "taskworker-workflows-engine-dlq": "default", + "test-topic": "default", } diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index 151e1e0f83bb8b..33b2f5b21fabe0 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -109,6 +109,7 @@ class Topic(Enum): TASKWORKER_USAGE_DLQ = "taskworker-usage-dlq" TASKWORKER_WORKFLOWS_ENGINE = "taskworker-workflows-engine" TASKWORKER_WORKFLOWS_ENGINE_DLQ = "taskworker-workflows-engine-dlq" + TEST_TOPIC = "test-topic" class ConsumerDefinition(TypedDict, total=False): diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 4f4c859a67f818..1cc4d0604181d6 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -300,6 +300,12 @@ def worker(ignore_unknown_queues: bool, **options: Any) -> None: help="The number of seconds before touching the health check file", default=taskworker_constants.DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, ) +@click.option( + "--grpc-port", + help="Port for the gRPC server to listen on. Will try subsequent ports if unavailable.", + default=50052, + type=int, +) @log_options() @configuration def taskworker(**options: Any) -> None: @@ -324,6 +330,7 @@ def run_taskworker( processing_pool_name: str, health_check_file_path: str | None, health_check_sec_per_touch: float, + grpc_port: int, **options: Any, ) -> None: """ @@ -347,6 +354,7 @@ def run_taskworker( processing_pool_name=processing_pool_name, health_check_file_path=health_check_file_path, health_check_sec_per_touch=health_check_sec_per_touch, + grpc_port=grpc_port, **options, ) exitcode = worker.start() @@ -419,6 +427,7 @@ def taskbroker_send_tasks( KAFKA_CLUSTERS["default"]["common"]["bootstrap.servers"] = bootstrap_servers if kafka_topic and namespace: + print(f"overriding {namespace} to route to {kafka_topic}") options.set("taskworker.route.overrides", {namespace: kafka_topic}) try: diff --git a/src/sentry/taskworker/client/client.py b/src/sentry/taskworker/client/client.py index dd34684d473e79..1c57fbd71869e1 100644 --- a/src/sentry/taskworker/client/client.py +++ b/src/sentry/taskworker/client/client.py @@ -12,8 +12,10 @@ import grpc from google.protobuf.message import Message from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( + AddWorkerRequest, FetchNextTask, GetTaskRequest, + RemoveWorkerRequest, SetTaskStatusRequest, ) from sentry_protos.taskbroker.v1.taskbroker_pb2_grpc import ConsumerServiceStub @@ -137,10 +139,12 @@ def __init__( health_check_settings: HealthCheckSettings | None = None, rpc_secret: str | None = None, grpc_config: str | None = None, + port: int = 50052, ) -> None: assert len(hosts) > 0, "You must provide at least one RPC host to connect to" self._hosts = hosts self._rpc_secret = rpc_secret + self._port = port self._grpc_options: list[tuple[str, Any]] = [ ("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE) @@ -304,6 +308,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, + address=f"http://127.0.0.1:{self._port}", ) try: @@ -317,8 +322,24 @@ def update_task( ) with metrics.timer("taskworker.update_task.rpc", tags={"host": processing_result.host}): + logger.debug( + "Calling set task status", + extra={ + "task_id": processing_result.task_id, + "status": processing_result.status, + "host": processing_result.host, + "receive_timestamp": processing_result.receive_timestamp, + }, + ) + start_time = time.time() response = self._host_to_stubs[processing_result.host].SetTaskStatus(request) + duration_ms = (time.time() - start_time) * 1000 + logger.debug( + "Done setting task status", + extra={"duration_ms": duration_ms}, + ) except grpc.RpcError as err: + logger.warning("Failed to perform RPC - %s", err) metrics.incr( "taskworker.client.rpc_error", tags={"method": "SetTaskStatus", "status": err.code().name}, @@ -342,3 +363,68 @@ def update_task( receive_timestamp=time.monotonic(), ) return None + + def add_worker(self, host: str, address: str) -> None: + """ + Register this worker with a taskbroker. + + Sends an AddWorker message to notify the broker that this worker + is available to receive tasks. + """ + # Ensure we have a connection to this host + if host not in self._host_to_stubs: + self._host_to_stubs[host] = self._connect_to_host(host) + + request = AddWorkerRequest(address=address) + + try: + with metrics.timer("taskworker.add_worker.rpc", tags={"host": host}): + self._host_to_stubs[host].AddWorker(request) + logger.info( + "taskworker.client.add_worker.success", + extra={"host": host, "address": address}, + ) + metrics.incr("taskworker.client.add_worker.success", tags={"host": host}) + except grpc.RpcError as err: + logger.warning( + "taskworker.client.add_worker.failed", + extra={"host": host, "error": str(err), "status": err.code().name}, + ) + metrics.incr( + "taskworker.client.rpc_error", + tags={"method": "AddWorker", "status": err.code().name}, + ) + + def remove_worker(self, host: str, address: str) -> None: + """ + Unregister this worker from a taskbroker. + + Sends a RemoveWorker message to notify the broker that this worker + is shutting down and should no longer receive tasks. + """ + if host not in self._host_to_stubs: + logger.warning( + "taskworker.client.remove_worker.unknown_host", + extra={"host": host}, + ) + return + + request = RemoveWorkerRequest(address=address) + + try: + with metrics.timer("taskworker.remove_worker.rpc", tags={"host": host}): + self._host_to_stubs[host].RemoveWorker(request) + logger.info( + "taskworker.client.remove_worker.success", + extra={"host": host, "address": address}, + ) + metrics.incr("taskworker.client.remove_worker.success", tags={"host": host}) + except grpc.RpcError as err: + logger.warning( + "taskworker.client.remove_worker.failed", + extra={"host": host, "error": str(err), "status": err.code().name}, + ) + metrics.incr( + "taskworker.client.rpc_error", + tags={"method": "RemoveWorker", "status": err.code().name}, + ) diff --git a/src/sentry/taskworker/registry.py b/src/sentry/taskworker/registry.py index 9cca426baf46bf..ea73bb7d69551e 100644 --- a/src/sentry/taskworker/registry.py +++ b/src/sentry/taskworker/registry.py @@ -153,6 +153,7 @@ def _handle_produce_future(self, future: ProducerFuture, tags: dict[str, str]) - def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) -> None: topic = self.router.route_namespace(self.name) + print(f"sending task to topic {topic} in namespace {self.name}") with sentry_sdk.start_span( op=OP.QUEUE_PUBLISH, diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py index 285035bf9acae2..ecba2734350734 100644 --- a/src/sentry/taskworker/tasks/examples.py +++ b/src/sentry/taskworker/tasks/examples.py @@ -1,10 +1,10 @@ from __future__ import annotations import logging +import time from time import sleep from typing import Any -from sentry.taskworker.constants import CompressionType from sentry.taskworker.namespaces import exampletasks from sentry.taskworker.retry import LastAction, NoRetriesRemainingError, Retry, RetryTaskError from sentry.taskworker.retry import retry_task as retry_task_helper @@ -52,8 +52,12 @@ def will_retry(failure: str) -> None: @exampletasks.register(name="examples.simple_task") def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None: - sleep(0.1) - logger.debug("simple_task complete") + logger.info("Starting simple task...") + + sleep_time = 0.1 + time.sleep(sleep_time) + + logger.info("Simple task complete!") @exampletasks.register( @@ -90,7 +94,8 @@ def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, logger.debug("timed_task complete") -@exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD) -def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None: - sleep(0.1) - logger.debug("simple_task_compressed complete") +# @exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD) +# def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None: +# sleep(0.1) +# logger.debug("simple_task_compressed complete") +# logger.debug("simple_task_compressed complete") diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index 16e13255e21610..b02dcbf0125d8e 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -2,6 +2,7 @@ import logging import multiprocessing +import os import queue import signal import threading @@ -14,6 +15,7 @@ import grpc from sentry_protos.taskbroker.v1.taskbroker_pb2 import FetchNextTask +from sentry_protos.taskworker.v1 import taskworker_pb2, taskworker_pb2_grpc from sentry import options from sentry.taskworker.app import import_app @@ -36,6 +38,41 @@ logger = logging.getLogger("sentry.taskworker.worker") +class WorkerServicer(taskworker_pb2_grpc.WorkerServiceServicer): + """ + gRPC servicer that receives task activations pushed from the broker + """ + + def __init__(self, worker: TaskWorker) -> None: + self.worker = worker + + def PushTask( + self, + request: taskworker_pb2.PushTaskRequest, + context: grpc.ServicerContext, + ) -> taskworker_pb2.PushTaskResponse: + """Handle incoming task activation.""" + # Create `InflightTaskActivation` from the pushed task + inflight = InflightTaskActivation( + activation=request.task, + host=request.callback_url, + receive_timestamp=time.monotonic(), + ) + + # Push the task to the worker queue + added = self.worker._push_task(inflight) + + # Read the shared counter + queue_size = self.worker._child_tasks.qsize() + + return taskworker_pb2.PushTaskResponse(added=added, queue_size=queue_size) + + +def get_host() -> str: + pod_ip = os.environ.get("POD_IP") + return pod_ip if pod_ip else "localhost" + + class TaskWorker: """ A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations. @@ -63,6 +100,7 @@ def __init__( process_type: str = "spawn", health_check_file_path: str | None = None, health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, + grpc_port: int = 50052, **kwargs: dict[str, Any], ) -> None: self.options = kwargs @@ -82,6 +120,7 @@ def __init__( ), rpc_secret=app.config["rpc_secret"], grpc_config=options.get("taskworker.grpc_service_config"), + port=grpc_port, ) if process_type == "fork": self.mp_context = multiprocessing.get_context("fork") @@ -106,13 +145,14 @@ def __init__( self._setstatus_backoff_seconds = 0 self._processing_pool_name: str = processing_pool_name or "unknown" + self._grpc_port: int = grpc_port def start(self) -> int: """ - Run the worker main loop + Run the worker gRPC server - Once started a Worker will loop until it is killed, or - completes its max_task_count when it shuts down. + Once started a Worker will run a gRPC server that receives task activations + until it is killed or shuts down. """ self.start_result_thread() self.start_spawn_children_thread() @@ -126,22 +166,73 @@ def signal_handler(*args: Any) -> None: signal.signal(signal.SIGTERM, signal_handler) try: - while True: - self.run_once() + # Start gRPC server + server = grpc.server(ThreadPoolExecutor(max_workers=10)) + taskworker_pb2_grpc.add_WorkerServiceServicer_to_server(WorkerServicer(self), server) + server.add_insecure_port(f"[::]:{self._grpc_port}") + server.start() + logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) + + # Register this worker with all connected taskbrokers + self._register_with_brokers() + + # Wait for shutdown signal + server.wait_for_termination() + except KeyboardInterrupt: + server.stop(grace=5) self.shutdown() - raise + + return 0 def run_once(self) -> None: """Access point for tests to run a single worker loop""" self._add_task() + def _register_with_brokers(self) -> None: + """ + Register this worker with all connected taskbrokers. + + Sends an AddWorker message to each broker to notify them that + this worker is available to receive tasks. + """ + address = f"http://{get_host()}:{self._grpc_port}" + + logger.info( + "taskworker.worker.registering_with_brokers", + extra={"broker_count": len(self.client._hosts), "address": address}, + ) + + for host in self.client._hosts: + self.client.add_worker(host, address) + + def _unregister_from_brokers(self) -> None: + """ + Unregister this worker from all connected taskbrokers. + + Sends a RemoveWorker message to each broker to notify them that + this worker is shutting down and should no longer receive tasks. + """ + address = f"http://{get_host()}:{self._grpc_port}" + + logger.info( + "taskworker.worker.unregistering_from_brokers", + extra={"broker_count": len(self.client._hosts), "address": address}, + ) + + for host in self.client._hosts: + self.client.remove_worker(host, address) + def shutdown(self) -> None: """ Shutdown cleanly Activate the shutdown event and drain results before terminating children. """ logger.info("taskworker.worker.shutdown.start") + + # Unregister this worker from all connected taskbrokers + self._unregister_from_brokers() + self._shutdown_event.set() logger.info("taskworker.worker.shutdown.spawn_children") @@ -169,6 +260,33 @@ def shutdown(self) -> None: logger.info("taskworker.worker.shutdown.complete") + def _push_task(self, inflight: InflightTaskActivation) -> bool: + """ + Push a task to child tasks queue. Returns False if the task could not be added. + """ + try: + start_time = time.monotonic() + self._child_tasks.put(inflight, block=False) + metrics.distribution( + "taskworker.worker.child_task.put.duration", + time.monotonic() - start_time, + tags={"processing_pool": self._processing_pool_name}, + ) + except queue.Full: + metrics.incr( + "taskworker.worker.child_tasks.put.full", + tags={"processing_pool": self._processing_pool_name}, + ) + logger.warning( + "taskworker.add_task.child_task_queue_full", + extra={ + "task_id": inflight.activation.id, + "processing_pool": self._processing_pool_name, + }, + ) + return False + return True + def _add_task(self) -> bool: """ Add a task to child tasks queue. Returns False if no new task was fetched. diff --git a/uv.lock b/uv.lock index ad7fc05460da50..47701151298688 100644 --- a/uv.lock +++ b/uv.lock @@ -273,7 +273,7 @@ wheels = [ [[package]] name = "devservices" -version = "1.2.3" +version = "1.2.4" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "packaging", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -283,7 +283,7 @@ dependencies = [ { name = "supervisor", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/devservices-1.2.3-py3-none-any.whl", hash = "sha256:19beb1dabb533c5dcbd021d6a34e3f357e5c868670f0dfe8945911d3965a6494" }, + { url = "https://pypi.devinfra.sentry.io/wheels/devservices-1.2.4-py3-none-any.whl", hash = "sha256:637055d5dae3dd01899ba066d511aa14537d791ed7f997292bc6a72dd5ddf416" }, ] [[package]] @@ -2196,7 +2196,7 @@ requires-dist = [ { name = "sentry-forked-email-reply-parser", specifier = ">=0.5.12.post1" }, { name = "sentry-kafka-schemas", specifier = ">=2.1.16" }, { name = "sentry-ophio", specifier = ">=1.1.3" }, - { name = "sentry-protos", specifier = ">=0.4.10" }, + { name = "sentry-protos", git = "https://github.com/getsentry/sentry-protos.git?subdirectory=py&rev=george%2Fpush-broker-worker" }, { name = "sentry-redis-tools", specifier = ">=0.5.0" }, { name = "sentry-relay", specifier = ">=0.9.22" }, { name = "sentry-sdk", extras = ["http2"], specifier = ">=2.47.0" }, @@ -2227,7 +2227,7 @@ requires-dist = [ dev = [ { name = "black", specifier = ">=25.1.0" }, { name = "covdefaults", specifier = ">=2.3.0" }, - { name = "devservices", specifier = ">=1.2.3" }, + { name = "devservices", specifier = ">=1.2.4" }, { name = "docker", specifier = ">=7.1.0" }, { name = "ephemeral-port-reserve", specifier = ">=1.1.4" }, { name = "flake8", specifier = ">=7.3.0" }, @@ -2402,15 +2402,12 @@ wheels = [ [[package]] name = "sentry-protos" version = "0.4.10" -source = { registry = "https://pypi.devinfra.sentry.io/simple" } +source = { git = "https://github.com/getsentry/sentry-protos.git?subdirectory=py&rev=george%2Fpush-broker-worker#50006eba5f52efdca34d83a0096543aab90223aa" } dependencies = [ { name = "grpc-stubs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpcio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] -wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.4.10-py3-none-any.whl", hash = "sha256:fe32f66f6d074978fb3b72be20932cb8354a49a119fac9861c4a876b9f476b2e" }, -] [[package]] name = "sentry-redis-tools"