Skip to content

Commit 12e6127

Browse files
committed
pipeliney
1 parent 10bc2fb commit 12e6127

File tree

1 file changed

+65
-50
lines changed

1 file changed

+65
-50
lines changed

examples/bluesky-firehose-to-s2.ts

Lines changed: 65 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,13 @@ const s2 = new S2({
5151
});
5252

5353
const basin = s2.basin(basinName);
54-
try {
55-
await basin.streams.create({ stream: streamName });
56-
console.log("Created stream:", streamName);
57-
} catch (error: unknown) {
54+
await basin.streams.create({ stream: streamName }).catch((error: unknown) => {
5855
if (error instanceof S2Error && error.status === 409) {
5956
console.log("Stream already exists:", streamName);
60-
} else {
61-
throw error;
57+
return;
6258
}
63-
}
59+
throw error;
60+
});
6461

6562
const stream = basin.stream(streamName);
6663

@@ -72,56 +69,52 @@ const producer = new Producer(
7269
await stream.appendSession(),
7370
);
7471

72+
// Wrap WebSocket in a ReadableStream.
73+
function websocketToReadable(ws: WebSocket): ReadableStream<JetstreamEvent> {
74+
return new ReadableStream({
75+
start(controller) {
76+
ws.onmessage = (event) => {
77+
controller.enqueue(JSON.parse(event.data as string));
78+
};
79+
ws.onerror = (e) => controller.error(e);
80+
ws.onclose = () => controller.close();
81+
},
82+
cancel() {
83+
ws.close();
84+
},
85+
});
86+
}
87+
7588
const jetstreamUrl = new URL("wss://jetstream2.us-east.bsky.network/subscribe");
7689
jetstreamUrl.searchParams.set("wantedCollections", "app.bsky.feed.post");
7790

7891
console.log("Connecting to Bluesky Jetstream...");
7992
const ws = new WebSocket(jetstreamUrl.toString());
93+
await new Promise<void>((resolve, reject) => {
94+
ws.onopen = () => resolve();
95+
ws.onerror = (e) => reject(e);
96+
});
97+
98+
console.log("Connected to Bluesky Jetstream");
99+
console.log("Streaming new posts to S2...");
100+
console.log("Press Ctrl+C to stop.\n");
80101

81102
let submitted = 0;
82103
let acked = 0;
83-
let logInterval: Timer | null = null;
84-
85-
ws.onopen = () => {
86-
console.log("Connected to Bluesky Jetstream");
87-
console.log("Streaming new posts to S2...");
88-
console.log("Press Ctrl+C to stop.\n");
89-
90-
logInterval = setInterval(() => {
91-
const inflight = submitted - acked;
92-
console.log(`submitted=${submitted} acked=${acked} inflight=${inflight}`);
93-
}, 1000);
94-
};
95-
96-
ws.onerror = (error) => {
97-
console.error("WebSocket error:", error);
98-
if (logInterval) clearInterval(logInterval);
99-
};
100-
101-
ws.onclose = async () => {
102-
console.log("\nConnection closed, draining...");
103-
if (logInterval) clearInterval(logInterval);
104-
await producer.close();
105-
await stream.close();
106-
console.log(`Final: submitted=${submitted} acked=${acked}`);
107-
};
108-
109-
ws.onmessage = async (event) => {
110-
try {
111-
const data: JetstreamEvent = JSON.parse(event.data as string);
112104

105+
// Transform that filters posts and converts to AppendRecord.
106+
const toAppendRecord = new TransformStream<JetstreamEvent, AppendRecord>({
107+
transform(data, controller) {
113108
if (
114109
data.kind !== "commit" ||
115110
data.commit?.operation !== "create" ||
116-
data.commit?.collection !== "app.bsky.feed.post"
111+
data.commit?.collection !== "app.bsky.feed.post" ||
112+
!data.commit?.record?.text
117113
) {
118114
return;
119115
}
120-
121-
const post = data.commit.record;
122-
if (!post?.text) return;
123-
124-
const ticket = await producer.submit(
116+
submitted++;
117+
controller.enqueue(
125118
AppendRecord.string({
126119
body: JSON.stringify(data),
127120
headers: [
@@ -132,17 +125,39 @@ ws.onmessage = async (event) => {
132125
],
133126
}),
134127
);
135-
submitted++;
128+
},
129+
});
136130

137-
ticket.ack().then(() => {
138-
acked++;
139-
});
140-
} catch (err) {
141-
console.error("Error processing message:", err);
131+
// Ack loop runs concurrently, tracking acked count.
132+
const ackLoop = (async () => {
133+
const reader = producer.readable.getReader();
134+
while (true) {
135+
const { done } = await reader.read();
136+
if (done) break;
137+
acked++;
142138
}
143-
};
139+
})();
140+
141+
const logInterval = setInterval(() => {
142+
console.log(
143+
`submitted=${submitted} acked=${acked} inflight=${submitted - acked}`,
144+
);
145+
}, 1000);
144146

145-
process.on("SIGINT", async () => {
147+
// Graceful shutdown on Ctrl+C.
148+
process.on("SIGINT", () => {
146149
console.log("\nShutting down...");
147150
ws.close();
148151
});
152+
153+
// Pipe WebSocket -> filter/transform -> S2 producer.
154+
try {
155+
await websocketToReadable(ws)
156+
.pipeThrough(toAppendRecord)
157+
.pipeTo(producer.writable);
158+
await ackLoop;
159+
} finally {
160+
clearInterval(logInterval);
161+
await stream.close();
162+
console.log(`Final: submitted=${submitted} acked=${acked}`);
163+
}

0 commit comments

Comments
 (0)