Skip to content

chore: example that streams bsky firehose to an s2 stream#112

Merged
shikhar merged 6 commits intomainfrom
bsky-ex
Jan 28, 2026
Merged

chore: example that streams bsky firehose to an s2 stream#112
shikhar merged 6 commits intomainfrom
bsky-ex

Conversation

@shikhar
Copy link
Member

@shikhar shikhar commented Jan 28, 2026

No description provided.

@shikhar shikhar requested a review from a team as a code owner January 28, 2026 16:37
@greptile-apps
Copy link

greptile-apps bot commented Jan 28, 2026

Greptile Overview

Greptile Summary

Adds a new example demonstrating how to stream Bluesky posts from the Jetstream firehose to an S2 stream using WebSocket and the Producer API.

Key changes:

  • Connects to Bluesky's Jetstream WebSocket API to receive real-time post events
  • Filters for app.bsky.feed.post creation events
  • Uses S2's Producer with BatchTransform to efficiently batch and append records
  • Includes metrics tracking (submitted, acked, inflight) with periodic logging
  • Properly handles stream creation (with 409 conflict handling) and cleanup

Issues found:

  • Critical: logInterval starts immediately at script initialization but is only cleared in onclose, causing a resource leak if the WebSocket connection fails during setup or errors occur before successful connection
  • The interval should be started in onopen and defensively cleared in both onerror and onclose handlers

Confidence Score: 3/5

  • Safe to merge after fixing the interval cleanup issue to prevent resource leaks
  • The example demonstrates good use of the S2 SDK patterns and includes proper error handling for stream creation, but has a critical resource leak where the logging interval starts before WebSocket connection and won't be cleared if connection fails
  • The interval initialization and cleanup logic in examples/bluesky-firehose-to-s2.ts needs attention

Important Files Changed

Filename Overview
examples/bluesky-firehose-to-s2.ts New example streaming Bluesky posts to S2 via WebSocket - has interval cleanup issue that can cause resource leak if connection fails

Sequence Diagram

sequenceDiagram
    participant User
    participant Example as bluesky-firehose-to-s2.ts
    participant S2 as S2 Client
    participant Basin
    participant Stream
    participant Producer
    participant WS as Bluesky Jetstream WebSocket
    
    User->>Example: Run script
    Example->>Example: Validate env vars (S2_ACCESS_TOKEN, S2_BASIN)
    Example->>S2: new S2(config)
    Example->>Basin: s2.basin(basinName)
    Example->>Basin: streams.create(streamName)
    alt Stream exists
        Basin-->>Example: 409 Conflict
    else Stream created
        Basin-->>Example: Stream created
    end
    
    Example->>Stream: basin.stream(streamName)
    Example->>Stream: appendSession()
    Stream-->>Example: AppendSession
    Example->>Producer: new Producer(BatchTransform, appendSession)
    
    Example->>WS: new WebSocket(jetstreamUrl)
    Example->>Example: Start logInterval (every 1s)
    
    WS->>Example: onopen
    Example->>Example: Log connection status
    
    loop Firehose streaming
        WS->>Example: onmessage(event)
        Example->>Example: Parse JetstreamEvent
        alt Not a create post
            Example->>Example: Skip event
        else Valid post with text
            Example->>Producer: submit(AppendRecord)
            Producer-->>Example: RecordSubmitTicket
            Example->>Example: submitted++
            Example->>Example: ticket.ack().then()
            Note over Example: Async ack handler
            Producer-->>Example: Ack resolved
            Example->>Example: acked++
        end
    end
    
    alt User presses Ctrl+C
        User->>Example: SIGINT
        Example->>WS: close()
    else WebSocket error
        WS->>Example: onerror
    else WebSocket closed
        WS->>Example: onclose
    end
    
    Example->>Example: clearInterval(logInterval)
    Example->>Producer: close()
    Example->>Stream: close()
    Example->>Example: Log final stats
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@shikhar shikhar merged commit 54486da into main Jan 28, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments