Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions .github/workflows/build-pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
name: Build Jetstream Turbo PR
on:
pull_request:
types:
- opened
- reopened
- labeled
- synchronize

jobs:
build_jetstream_turbo_pr:
name: build_jetstream_turbo
outputs:
version_tag: ${{steps.version_tag.outputs.VERSION_TAG}}
if: contains(github.event.pull_request.labels.*.name, 'auto-build')
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3

- name: version_tag
id: version_tag
run: |
short_sha=$(git rev-parse --short HEAD)
echo "_SHORT_SHA=$short_sha" >> $GITHUB_ENV
echo "VERSION_TAG=pr-${{github.event.pull_request.number}}-$short_sha-${{ github.run_number }}" >> $GITHUB_ENV
echo "VERSION_TAG=pr-${{github.event.pull_request.number}}-$short_sha-${{ github.run_number }}" >> $GITHUB_OUTPUT

- name: configure_aws_credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_KEY }}
role-to-assume: ${{ secrets.AWS_ASSUME_ROLE }}
role-external-id: grazebuilder
role-duration-seconds: 1200
role-session-name: jestream-turbo-builder
role-skip-session-tagging: true
aws-region: ${{ secrets.ECR_REGION }}

- name: Set up Docker CLI
uses: docker/setup-buildx-action@v3

- name: login_to_ecr
id: login-ecr
uses: aws-actions/amazon-ecr-login@v2
with:
mask-password: "true"

- name: build_and_push
uses: docker/build-push-action@v5
with:
file: Dockerfile
context: .
cache-from: type=registry,ref=${{secrets.ECR_REPO}}:cache
cache-to: type=registry,mode=max,image-manifest=true,oci-mediatypes=true,ref=${{secrets.ECR_REPO}}:cache
push: true
tags: |
${{ secrets.ECR_REPO }}:${{ env.VERSION_TAG }}
${{ secrets.ECR_REPO }}:latest
builder: ${{ steps.setup_buildx.name }}
platforms: linux/amd64

- name: post_to_eng_alerts
uses: slackapi/slack-github-action@v2.1.1
with:
webhook: ${{ secrets.SLACK_WEBHOOK_ENG_ALERTS }}
webhook-type: incoming-webhook
payload: |
text: "*`Jetstream Turbo Image`* ${{ env.VERSION_TAG }} Created"
blocks:
- type: "section"
text:
type: "mrkdwn"
text: "*`Jetstream Turbo` Image Created*"
- type: "divider"
- type: "section"
text:
type: "mrkdwn"
text: "Download:\n```${{secrets.ECR_REPO}}:${{ env.VERSION_TAG }}```"
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ RUN pdm install \
--fail-fast \
--lockfile ${LOCKFILE}


# Copy the rest of the project files (excluding `.venv`)
COPY src ./src/

Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ services:
jetstream_turbo:
build: .
container_name: jetstream_turbo_service
restart: always
volumes:
- .:/app # Mount everything
- /app/.venv # But create an empty volume for `.venv` so it's not overwritten
#restart: always
# volumes:
# - .:/app # Mount everything
# - /app/.venv # But create an empty volume for `.venv` so it's not overwritten
env_file:
- .env # Load environment variables from .env
1,404 changes: 687 additions & 717 deletions pdm.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ classifiers = [
]

dependencies = [
"aiohttp>=3.11.11",
"jinja2>=3.1.5",
"aiohttp-jinja2>=1.6",
"aioboto3>=14.1.0",
"pydantic-settings>=2.7.1",
"python-json-logger>=3.2.1",
"redis>=5.2.1",
"aio-statsd>=0.2.9",
"httpx>=0.28.1",
"websockets>=13.0",
"atproto>=0.0.59",
"aiorwlock>=1.5.0",
"sqsi>=0.1.9",
"types-aioboto3[essential]>=15.1.0",
"aiobotocore>=2.24.0",
"prometheus-client>=0.21.0",
]
readme = "README.md"

Expand Down
50 changes: 35 additions & 15 deletions src/social/graze/jetstream_turbo/app/__main__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,40 @@
import os
from aiohttp import web
import logging
from logging.config import dictConfig
import argparse
import asyncio
import json
import logging
import os

from social.graze.jetstream_turbo.app.turbocharger import start_turbo_charger
from social.graze.jetstream_turbo.app.config import Settings
from social.graze.jetstream_turbo.app.metrics import start_metrics_server


def configure_logging():
"""
Configures logging.
"""
logging_config_file = os.getenv("LOGGING_CONFIG_FILE", "")

if len(logging_config_file) > 0:
with open(logging_config_file) as fl:
with open(logging_config_file, encoding="utf-8") as fl:
dictConfig(json.load(fl))
return

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logging.getLogger("httpx").setLevel(logging.WARNING)


async def start():
"""
Parse arguments and start turbocharger.
"""

def main():
configure_logging()

from social.graze.jetstream_turbo.app.turbocharger import start_turbo_charger

parser = argparse.ArgumentParser(prog="turbocharger")
parser.add_argument(
"--modulo",
Expand All @@ -37,14 +49,22 @@ def main():
help="Shard to test against",
)
args = parser.parse_args()
web.run_app(
start_turbo_charger(
None,
modulo=args.modulo,
shard=args.shard,
)

# Start Prometheus metrics server
settings = Settings()
start_metrics_server(settings.metrics_port)

await start_turbo_charger(
settings,
modulo=args.modulo,
shard=args.shard,
)

def main():
"""
The entrypoint for the program, called if this module is invoked and also in `pyproject.toml`.
"""
asyncio.run(start())

if __name__ == "__main__":
main()
10 changes: 0 additions & 10 deletions src/social/graze/jetstream_turbo/app/bluesky_api.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import asyncio
import random
from typing import List, Dict, TypeVar, Callable, Awaitable

from atproto import AsyncClient, models
from atproto_client.exceptions import (
BadRequestError,
RequestException,
InvokeTimeoutError,
)

T = TypeVar("T")
Expand Down Expand Up @@ -47,14 +45,6 @@ async def load_sessions(cls, session_strings: List[str]) -> List["BlueskyAPI"]:
bads.append(ss)
return apis

async def _login_client(self, client: AsyncClient, session_string: str):
"""Logs a given AsyncClient in with a particular session string."""
try:
await client.login(session_string=session_string)
await client.user_timeline()
except (BadRequestError, RequestException, ValueError) as e:
raise RuntimeError(f"Failed to login with session string: {e}")

async def _chunked_map(
self,
items: List[T],
Expand Down
68 changes: 58 additions & 10 deletions src/social/graze/jetstream_turbo/app/client.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,73 @@
import asyncio
import json
import logging
import websockets
import websockets.exceptions

logger = logging.getLogger(__name__)

class JetstreamClient:
"""
Subscribes to a Bluesky jetstream (firehose) and yields parsed messages.
"""

def __init__(self, endpoint: str, wanted_collections: str = "app.bsky.feed.post"):
def __init__(
self,
endpoint: str,
wanted_collections: str = "app.bsky.feed.post",
max_retries: int = 5,
retry_window: float = 300
):
self.endpoint = endpoint
self.wanted_collections = wanted_collections
self.max_retries = max_retries
self.retry_window = retry_window

def jetstream_url(self):
return f"wss://{self.endpoint}/subscribe?wantedCollections={self.wanted_collections}"
def jetstream_url(self, time_us: int | None = None):
url = f"wss://{self.endpoint}/subscribe?wantedCollections={self.wanted_collections}"
if time_us:
url += f"&cursor={time_us}"
return url

async def run_stream(self):
async with websockets.connect(self.jetstream_url()) as ws:
async for message in ws:
try:
data = json.loads(message)
yield data
except (json.JSONDecodeError, KeyError):
continue
""" Run a Jetstream iterator. """

retry_times: list[float] = []
time_us: int | None = None

while True:
try:
async with websockets.connect(self.jetstream_url(time_us)) as ws:
async for message in ws:
try:
message = json.loads(message)
try:
time_us = int(message.get("time_us"))
except (TypeError, ValueError, AttributeError):
pass
yield message
except (json.JSONDecodeError, KeyError):
continue
except websockets.exceptions.ConnectionClosedError as e:
logger.error("Exception when reading from jetstream: %s", e, exc_info=True)

now = asyncio.get_event_loop().time()
retry_times = [t for t in retry_times if now - t < self.retry_window]
retry_times.append(now)
if len(retry_times) > self.max_retries:
logger.error(
"Ending `run_stream()` iterator due to hitting max retries (%d) in window (%.1f s).",
self.max_retries,
self.retry_window,
)

logger.error(
"Continuing processing (%d/%d retries in the last %.1f s).",
len(retry_times),
self.max_retries,
self.retry_window,
)
continue

# If we're not terminating due to a known exception, break and die.
break
63 changes: 25 additions & 38 deletions src/social/graze/jetstream_turbo/app/config.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
import os
import asyncio
from typing import Annotated, Final, List
from typing import List, Optional
import logging
from aio_statsd import TelegrafStatsdClient
from pydantic import (
field_validator,
PostgresDsn,
RedisDsn,
)
import base64
from pydantic_settings import BaseSettings, NoDecode
from aiohttp import web
from cryptography.fernet import Fernet
from aiohttp import ClientSession
from redis import asyncio as redis
from pydantic_settings import BaseSettings


logger = logging.getLogger(__name__)


class Settings(BaseSettings):
""" Settings for the app. """

graze_api_base_url: str = "https://api.graze.social"
stream_name: str = os.getenv("STREAM_NAME")
turbo_credential_secret: str = os.getenv("TURBO_CREDENTIAL_SECRET")
stream_name: Optional[str] = os.getenv("STREAM_NAME")
turbo_credential_secret: Optional[str] = os.getenv("TURBO_CREDENTIAL_SECRET")

s3_bucket: Optional[str] = os.getenv("S3_BUCKET")
s3_region: Optional[str] = os.getenv("S3_REGION")

input_mode: Optional[str] = (
"sqs" if os.getenv("INPUT_MODE", "").lower() == "sqs" else "websocket"
)

input_queue_url: Optional[str] = os.getenv("INPUT_QUEUE_URL")

output_to_s3: Optional[bool] = bool(os.getenv("OUTPUT_TO_S3"))
output_to_sqs: Optional[bool] = bool(os.getenv("OUTPUT_TO_SQS"))
output_to_redis: Optional[bool] = bool(os.getenv("OUTPUT_TO_REDIS"))

output_queue_url: Optional[str] = os.getenv("OUTPUT_QUEUE_URL")
media_output_queue_url: Optional[str] = os.getenv("MEDIA_OUTPUT_QUEUE_URL")

redis_url: Optional[str] = os.getenv("REDIS_URL")

jetstream_hosts: List[str] = [
"jetstream1.us-east.bsky.network",
Expand All @@ -31,26 +39,5 @@ class Settings(BaseSettings):
"jetstream2.us-west.bsky.network",
]
db_dir: str = "jetstream-messages"
s3_bucket: str = "graze-turbo-01"
s3_region: str = "us-east-1"
debug: bool = False

http_port: int = 5100

external_hostname: str = "localhost:5100"

redis_dsn: RedisDsn = RedisDsn("redis://valkey:6379/1?decode_responses=True")
worker_id: str = "worker"

statsd_host: str = "telegraf"
statsd_port: int = 8125
statsd_prefix: str = "aip"


SettingsAppKey: Final = web.AppKey("settings", Settings)
SessionAppKey: Final = web.AppKey("http_session", ClientSession)
RedisPoolAppKey: Final = web.AppKey("redis_pool", redis.ConnectionPool)
RedisClientAppKey: Final = web.AppKey("redis_client", redis.Redis)
TelegrafStatsdClientAppKey: Final = web.AppKey(
"telegraf_statsd_client", TelegrafStatsdClient
)
metrics_port: int = int(os.getenv("METRICS_PORT", "8000"))
Loading