diff --git a/src/sentry/api/urls.py b/src/sentry/api/urls.py index a2d07a482ccc3a..b53a8ebcf352cd 100644 --- a/src/sentry/api/urls.py +++ b/src/sentry/api/urls.py @@ -2612,7 +2612,7 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]: name="sentry-api-0-organization-conduit-demo", ), re_path( - r"^(?P[^/]+)/objectstore/$", + r"^(?P[^/]+)/objectstore/(?P.*)$", OrganizationObjectstoreEndpoint.as_view(), name="sentry-api-0-organization-objectstore", ), diff --git a/src/sentry/hybridcloud/apigateway/proxy.py b/src/sentry/hybridcloud/apigateway/proxy.py index 69a88919a49ea7..f0f4aacde53260 100644 --- a/src/sentry/hybridcloud/apigateway/proxy.py +++ b/src/sentry/hybridcloud/apigateway/proxy.py @@ -47,6 +47,7 @@ "sentry-api-0-installable-preprod-artifact-download": 90.0, "sentry-api-0-project-preprod-artifact-download": 90.0, "sentry-api-0-project-preprod-artifact-size-analysis-download": 90.0, + "sentry-api-0-organization-objectstore": 90.0, } # stream 0.5 MB at a time diff --git a/src/sentry/objectstore/endpoints/organization.py b/src/sentry/objectstore/endpoints/organization.py index 9af76fb6adf9af..ccc48bbb4c39c4 100644 --- a/src/sentry/objectstore/endpoints/organization.py +++ b/src/sentry/objectstore/endpoints/organization.py @@ -1,37 +1,174 @@ +from collections.abc import Callable, Generator +from typing import Literal +from urllib.parse import urljoin +from wsgiref.util import is_hop_by_hop + +import requests +from django.http import StreamingHttpResponse +from requests import Response as ExternalResponse from rest_framework.request import Request from rest_framework.response import Response -from sentry import features +from sentry import features, options from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import region_silo_endpoint from sentry.api.bases import OrganizationEndpoint from sentry.models.organization import Organization +CHUNK_SIZE = 512 * 1024 + @region_silo_endpoint class OrganizationObjectstoreEndpoint(OrganizationEndpoint): publish_status = { "GET": ApiPublishStatus.EXPERIMENTAL, "PUT": ApiPublishStatus.EXPERIMENTAL, + "POST": ApiPublishStatus.EXPERIMENTAL, "DELETE": ApiPublishStatus.EXPERIMENTAL, } owner = ApiOwner.FOUNDATIONAL_STORAGE + parser_classes = [] # accept arbitrary data and don't attempt to parse it - def get(self, request: Request, organization: Organization) -> Response: + def get( + self, request: Request, organization: Organization, path: str + ) -> Response | StreamingHttpResponse: if not features.has("organizations:objectstore-endpoint", organization, actor=request.user): return Response(status=404) - # TODO: implement - return Response(status=200) + return self._proxy("GET", path, request) - def put(self, request: Request, organization: Organization) -> Response: + def put( + self, request: Request, organization: Organization, path: str + ) -> Response | StreamingHttpResponse: if not features.has("organizations:objectstore-endpoint", organization, actor=request.user): return Response(status=404) - # TODO: implement - return Response(status=200) + return self._proxy("PUT", path, request) - def delete(self, request: Request, organization: Organization) -> Response: + def post( + self, request: Request, organization: Organization, path: str + ) -> Response | StreamingHttpResponse: if not features.has("organizations:objectstore-endpoint", organization, actor=request.user): return Response(status=404) - # TODO: implement - return Response(status=200) + return self._proxy("POST", path, request) + + def delete( + self, request: Request, organization: Organization, path: str + ) -> Response | StreamingHttpResponse: + if not features.has("organizations:objectstore-endpoint", organization, actor=request.user): + return Response(status=404) + return self._proxy("DELETE", path, request) + + def _proxy( + self, + method: Literal["GET", "PUT", "POST", "DELETE"], + path: str, + request: Request, + ) -> Response | StreamingHttpResponse: + target_base_url = options.get("objectstore.config")["base_url"].rstrip("/") + target_url = urljoin(target_base_url, path) + + headers = dict(request.headers) + if method in ("PUT", "POST") and not headers.get("Transfer-Encoding") == "chunked": + return Response("Only Transfer-Encoding: chunked is supported", status=400) + + headers.pop("Content-Length", None) + headers.pop("Transfer-Encoding", None) + + body_stream = None + if method in ("PUT", "POST"): + wsgi_input = request._request.META.get("wsgi.input") + assert wsgi_input + stream_func = wsgi_input._read + body_stream = ChunkedStreamDecoder(stream_func) + + response = requests.request( + method, + url=target_url, + headers=headers, + data=body_stream, + params=dict(request.GET) if request.GET else None, + stream=True, + allow_redirects=False, + ) + response.raise_for_status() + + return stream_response(response) + + +class ChunkedStreamDecoder: + """ + Decodes HTTP chunked transfer encoding on-the-fly without buffering. + Implements file-like interface for streaming to requests library. + """ + + def __init__(self, read_func: Callable[[int], bytes]): + self._read = read_func + self._done = False + self._current_chunk_remaining = 0 + + def read(self, size: int = -1) -> bytes: + if self._done: + return b"" + + result = [] + bytes_read = 0 + target_size = size if size > 0 else 8192 + + while bytes_read < target_size: + if self._current_chunk_remaining > 0: + to_read = min(self._current_chunk_remaining, target_size - bytes_read) + chunk = self._read(to_read) + if not chunk: + self._done = True + break + result.append(chunk) + bytes_read += len(chunk) + self._current_chunk_remaining -= len(chunk) + + if self._current_chunk_remaining == 0: + self._read(2) # Read trailing \r\n + else: + # Read next chunk size line + size_line = b"" + while not size_line.endswith(b"\r\n"): + byte = self._read(1) + if not byte: + self._done = True + return b"".join(result) + size_line += byte + + try: + chunk_size = int(size_line.strip(), 16) + except ValueError: + self._done = True + return b"".join(result) + + if chunk_size == 0: + self._read(2) # Read trailing \r\n + self._done = True + return b"".join(result) + + self._current_chunk_remaining = chunk_size + + return b"".join(result) + + +def stream_response(response: ExternalResponse) -> StreamingHttpResponse: + def stream() -> Generator[bytes]: + response.raw.decode_content = False + while True: + chunk = response.raw.read(CHUNK_SIZE) + if not chunk: + break + yield chunk + + streamed_response = StreamingHttpResponse( + streaming_content=stream(), + status=response.status_code, + ) + + for header, value in response.headers.items(): + if not is_hop_by_hop(header): + streamed_response[header] = value + + return streamed_response diff --git a/static/app/utils/api/knownSentryApiUrls.generated.ts b/static/app/utils/api/knownSentryApiUrls.generated.ts index acd6a8fd8fe259..3c1fc9351600a5 100644 --- a/static/app/utils/api/knownSentryApiUrls.generated.ts +++ b/static/app/utils/api/knownSentryApiUrls.generated.ts @@ -454,7 +454,7 @@ export type KnownSentryApiUrls = | '/organizations/$organizationIdOrSlug/notifications/actions/' | '/organizations/$organizationIdOrSlug/notifications/actions/$actionId/' | '/organizations/$organizationIdOrSlug/notifications/available-actions/' - | '/organizations/$organizationIdOrSlug/objectstore/' + | '/organizations/$organizationIdOrSlug/objectstore/$path' | '/organizations/$organizationIdOrSlug/onboarding-continuation-email/' | '/organizations/$organizationIdOrSlug/onboarding-tasks/' | '/organizations/$organizationIdOrSlug/ondemand-rules-stats/' diff --git a/tests/sentry/objectstore/endpoints/test_organization.py b/tests/sentry/objectstore/endpoints/test_organization.py index a173a94c67e08e..403b5dfb8813b8 100644 --- a/tests/sentry/objectstore/endpoints/test_organization.py +++ b/tests/sentry/objectstore/endpoints/test_organization.py @@ -1,23 +1,103 @@ -from sentry.testutils.cases import APITestCase +import pytest +import requests +from django.urls import reverse +from objectstore_client import Client, RequestError, Session, Usecase +from pytest_django.live_server_helper import LiveServer + +from sentry.testutils.cases import TransactionTestCase from sentry.testutils.helpers.features import with_feature from sentry.testutils.silo import region_silo_test +from sentry.testutils.skips import requires_objectstore + + +@pytest.fixture(scope="function") +def local_live_server(request: pytest.FixtureRequest, live_server: LiveServer) -> None: + if hasattr(request, "cls"): + request.cls.live_server = live_server + request.node.live_server = live_server @region_silo_test -class OrganizationObjectstoreEndpointTest(APITestCase): +@requires_objectstore +@pytest.mark.usefixtures("local_live_server") +class OrganizationObjectstoreEndpointTest(TransactionTestCase): endpoint = "sentry-api-0-organization-objectstore" + live_server: LiveServer def setUp(self) -> None: super().setUp() self.login_as(user=self.user) + self.organization = self.create_organization(owner=self.user) + self.api_key = self.create_api_key( + organization=self.organization, + scope_list=["org:admin"], + ) - def test_feature_flag_disabled(self): - """Without feature flag, returns 404""" - response = self.get_response(self.organization.slug) - assert response.status_code == 404 + def get_endpoint_url(self) -> str: + path = reverse( + self.endpoint, + kwargs={ + "organization_id_or_slug": self.organization.id, + "path": "", + }, + ) + return f"{self.live_server.url}{path}" + + def get_auth_headers(self) -> dict[str, str]: + auth_header = self.create_basic_auth_header(self.api_key.key) + return {"Authorization": auth_header.decode()} + + def get_session(self) -> Session: + client = Client( + self.get_endpoint_url(), connection_kwargs={"headers": self.get_auth_headers()} + ) + session = client.session(Usecase("test"), org=self.organization.id) + return session @with_feature("organizations:objectstore-endpoint") - def test_feature_flag_enabled(self): - """With feature flag, endpoint is accessible""" - response = self.get_response(self.organization.slug) - assert response.status_code == 200 + def test_health(self): + url = self.get_endpoint_url() + "health" + res = requests.get(url, headers=self.get_auth_headers()) + res.raise_for_status() + + @with_feature("organizations:objectstore-endpoint") + def test_full_cycle(self): + session = self.get_session() + + object_key = session.put(b"test data") + assert object_key is not None + + retrieved = session.get(object_key) + assert retrieved.payload.read() == b"test data" + + new_key = session.put(b"new data", key=object_key) + assert new_key == object_key + + retrieved = session.get(object_key) + assert retrieved.payload.read() == b"new data" + + session.delete(object_key) + + with pytest.raises(RequestError): + session.get(object_key) + + @with_feature("organizations:objectstore-endpoint") + def test_uncompressed(self): + session = self.get_session() + + object_key = session.put(b"test data", compression="none") + assert object_key is not None + + retrieved = session.get(object_key) + assert retrieved.payload.read() == b"test data" + + @with_feature("organizations:objectstore-endpoint") + def test_large_payload(self): + session = self.get_session() + data = b"A" * 1_000_000 + + object_key = session.put(data) + assert object_key is not None + + retrieved = session.get(object_key) + assert retrieved.payload.read() == data