Skip to content

Conversation

@manveerxyz
Copy link
Member

@manveerxyz manveerxyz commented Jan 20, 2026

Note

Implements presigned URL flow for sample uploads, replacing direct samples POSTs with an async, fire-and-forget R2 upload plus confirmation.

  • Adds _prepare_samples to serialize rollout data and minimize payload fields
  • New async path: request presigned URL (/samples/presign), PUT JSON to R2 with exponential backoff, then confirm (/samples/confirm)
  • Updates log_samples to use the new flow; logs and guards improved (empty-sample check, timing)
  • Minor: import json and add helper methods for presign, upload, and confirmation

Written by Cursor Bugbot for commit 7b0e043. This will update automatically on new commits. Configure here.

@manveerxyz manveerxyz requested a review from Jackmin801 January 20, 2026 03:41
@JannikSt
Copy link
Member

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7b0e04302e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Member

@Jackmin801 Jackmin801 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! lgtm

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements presigned URL flow for uploading training samples to R2 storage, replacing the previous direct POST approach to improve scalability and reduce backend load.

Changes:

  • Refactored sample upload to use a three-step async flow: request presigned URL, upload JSON to R2, confirm upload with backend
  • Extracted sample preparation logic into _prepare_samples method to separate serialization from upload
  • Implemented exponential backoff retry logic for R2 uploads

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


self.last_log_samples_step = step
self.logger.debug(
f"Logged samples at step {step} to Prime Intellect API in {time.perf_counter() - start_time:.2f}s"
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timing log is misleading. Since the upload is now fire-and-forget async, the logged time only reflects the time to prepare samples and queue the upload task, not the actual upload duration. The log message claims "Logged samples at step {step}" but the upload hasn't actually completed yet. Consider either removing this timing log or clarifying that it only measures the synchronous preparation phase.

Suggested change
f"Logged samples at step {step} to Prime Intellect API in {time.perf_counter() - start_time:.2f}s"
f"Prepared and queued sample upload for step {step} in "
f"{time.perf_counter() - start_time:.2f}s (synchronous phase only)"

Copilot uses AI. Check for mistakes.
# Use presigned URL flow for uploading samples
self._upload_samples_via_presigned_url(samples, step)

self.last_log_samples_step = step
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting last_log_samples_step before the async upload completes creates a race condition. If log_samples is called again quickly or the upload fails, this step is already marked as logged even though the upload hasn't completed. Consider either moving this update to after the confirmation succeeds, or using a different tracking mechanism that distinguishes between initiated and completed uploads.

Copilot uses AI. Check for mistakes.
Comment on lines +203 to +204
presigned_url = presign_data["presigned_url"]
s3_key = presign_data["s3_key"]
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation for the presigned URL response structure. If the backend response doesn't include the expected "presigned_url" or "s3_key" fields, the code will raise a KeyError instead of a more helpful error. Consider validating the response structure and providing a clearer error message if required fields are missing.

Suggested change
presigned_url = presign_data["presigned_url"]
s3_key = presign_data["s3_key"]
presigned_url = presign_data.get("presigned_url")
s3_key = presign_data.get("s3_key")
if not presigned_url or not s3_key:
self.logger.warning(
f"Presigned URL response missing required fields for samples at step {step}: "
f"presigned_url_present={presigned_url is not None}, s3_key_present={s3_key is not None}"
)
return

Copilot uses AI. Check for mistakes.
Comment on lines +216 to +217
except Exception as e:
self.logger.warning(f"Failed to upload samples via presigned URL at step {step}: {type(e).__name__}: {e}")
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broad Exception catch on line 216 may mask the KeyError that would be raised on lines 203-204 if the presigned URL response is missing required fields. Consider moving the KeyError-prone code outside the try-catch block or handling it specifically to provide better error diagnostics.

Copilot uses AI. Check for mistakes.
Comment on lines +188 to 192
asyncio.run_coroutine_threadsafe(
self._upload_samples_via_presigned_url_async(samples, step),
self._loop,
)

Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fire-and-forget pattern with asyncio.run_coroutine_threadsafe doesn't check if the Future completes successfully. If the event loop thread has died or the loop is stopped, the upload will silently fail without any error being logged. Consider storing the Future and adding error callbacks or periodic health checks on the event loop.

Suggested change
asyncio.run_coroutine_threadsafe(
self._upload_samples_via_presigned_url_async(samples, step),
self._loop,
)
try:
future = asyncio.run_coroutine_threadsafe(
self._upload_samples_via_presigned_url_async(samples, step),
self._loop,
)
except RuntimeError as e:
# This typically indicates that the event loop is not running or has been closed.
self.logger.warning(
f"Failed to schedule async samples upload at step {step}: {type(e).__name__}: {e}"
)
return
def _log_result(fut) -> None:
try:
exc = fut.exception()
except Exception as callback_exc:
self.logger.warning(
f"Error while checking async samples upload result at step {step}: "
f"{type(callback_exc).__name__}: {callback_exc}"
)
else:
if exc is not None:
self.logger.warning(
f"Async samples upload failed at step {step}: {type(exc).__name__}: {exc}"
)
future.add_done_callback(_log_result)

Copilot uses AI. Check for mistakes.
delay = 2**attempt
self.logger.debug(f"Retrying R2 upload in {delay}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(delay)
return False
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return statement on line 250 is unreachable. The loop only completes when all retries have been exhausted, and in that case, the function already returns False on line 246. This line can be removed.

Suggested change
return False

Copilot uses AI. Check for mistakes.
Comment on lines +252 to +263
async def _confirm_samples_upload(self, step: int, s3_key: str, sample_count: int) -> None:
"""Confirm samples upload with the backend."""
headers = {"x-api-key": self.api_key, "Content-Type": "application/json"}
try:
response = await self._client.post(
f"{self.base_url}/samples/confirm",
headers=headers,
json={"run_id": self.run_id, "step": step, "s3_key": s3_key, "sample_count": sample_count},
)
response.raise_for_status()
except Exception as e:
self.logger.warning(f"Failed to confirm samples upload: {type(e).__name__}: {e}")
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The confirmation endpoint call lacks retry logic, unlike the presigned URL request and R2 upload. If the confirmation fails due to a transient network issue after a successful R2 upload, the backend won't know the upload completed. Consider adding retry logic similar to _upload_to_r2 or using _make_request_async which already has retry logic.

Suggested change
async def _confirm_samples_upload(self, step: int, s3_key: str, sample_count: int) -> None:
"""Confirm samples upload with the backend."""
headers = {"x-api-key": self.api_key, "Content-Type": "application/json"}
try:
response = await self._client.post(
f"{self.base_url}/samples/confirm",
headers=headers,
json={"run_id": self.run_id, "step": step, "s3_key": s3_key, "sample_count": sample_count},
)
response.raise_for_status()
except Exception as e:
self.logger.warning(f"Failed to confirm samples upload: {type(e).__name__}: {e}")
async def _confirm_samples_upload(
self, step: int, s3_key: str, sample_count: int, max_retries: int = 3
) -> None:
"""Confirm samples upload with the backend."""
headers = {"x-api-key": self.api_key, "Content-Type": "application/json"}
for attempt in range(max_retries):
try:
response = await self._client.post(
f"{self.base_url}/samples/confirm",
headers=headers,
json={
"run_id": self.run_id,
"step": step,
"s3_key": s3_key,
"sample_count": sample_count,
},
)
response.raise_for_status()
return
except Exception as e:
# Log and retry on transient errors, up to max_retries
self.logger.warning(
f"Failed to confirm samples upload (attempt {attempt + 1}/{max_retries}): "
f"{type(e).__name__}: {e}"
)
if attempt < max_retries - 1:
# Simple linear backoff: 1s, 2s, ...
await asyncio.sleep(1 * (attempt + 1))
self.logger.warning(
f"Giving up confirming samples upload after {max_retries} failed attempt(s) for step {step}, s3_key={s3_key}."
)

Copilot uses AI. Check for mistakes.

presigned_url = presign_data["presigned_url"]
s3_key = presign_data["s3_key"]
json_bytes = json.dumps(samples).encode("utf-8")
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The json.dumps call could raise a TypeError if samples contain non-JSON-serializable objects. Since this happens inside the try-except block, it will be caught and logged as a generic warning. Consider validating that samples are JSON-serializable or handling JSONEncodeError specifically to provide a clearer error message.

Suggested change
json_bytes = json.dumps(samples).encode("utf-8")
try:
json_bytes = json.dumps(samples).encode("utf-8")
except TypeError as e:
self.logger.warning(
f"Samples at step {step} are not JSON-serializable: {type(e).__name__}: {e}"
)
return

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants