Conversation
+just lint
There was a problem hiding this comment.
There's a lot of at-protocol-specific knowledge that I'm unsure about (e.g. the different string formats for time, etc.), but overall the implementation looks great!
Aside for the request for moving the config var out, just a couple other high-level things:
- Needs an e2e test before we can merge
- Please pull in changes from wit repo instead of baking in here
- More of a question than request for changes, but we shouldn't merge until we land on an answer (raised elsewhere too): I see there's some logic for kinda dynamically filling in some data like record... maybe we should only use the sequence as the EventIdSalt, and then if they have different content it will break consensus rather than be treated as different events?
packages/wavs/src/subsystems/trigger/streams/atproto_jetstream.rs
Outdated
Show resolved
Hide resolved
1.) 2.) 3.) |
improve uniqueness for event id
Not really, I missed it in simulated trigger, nice solution! BUT - it would be better if we can treat it like our chain triggers, i.e. spin up a local server... otherwise it's not really an "end to end" test, i.e. there's no testing that it really works in the wild I'll open a separate issue for that though, not a blocker for this PR: #1101 |
dakom
left a comment
There was a problem hiding this comment.
Really nice work! You covered a ton of ground, even remembering things like to remove triggers, comments, etc.
Left a bunch of comments but I think it's close to getting in :)
packages/wavs/src/subsystems/trigger/streams/atproto_jetstream.rs
Outdated
Show resolved
Hide resolved
| wanted_dids: None, // Listen to all repos | ||
| cursor: None, | ||
| compression: false, | ||
| max_message_size: self.config.jetstream_max_message_size, |
There was a problem hiding this comment.
How does this relate to the message truncation (previous comment, to 512 bytes)?
There was a problem hiding this comment.
This is jetstream config https://github.com/bluesky-social/jetstream
maxMessageSizeBytes - The maximum size of a payload that this client would like to receive. Zero means no limit, negative values are treated as zero. (Default "0" or empty = no maximum size)
| for ((collection_pattern, repo_did_filter, action_filter), lookup_ids) in | ||
| triggers_by_atproto_lock.iter() | ||
| { | ||
| // Skip if already matched by exact lookup |
There was a problem hiding this comment.
wait, so why are we iterating on the same collection if we're just going to exit for hits we just saw?
this could even be a performance problem if there's lots of triggers... comment isn't quite right, we're "skipping" but only skipping the more expensive pattern-match test, the intersection is itself not free
can you please change it to two completely separate collections:
- Exact match (strategy 1, fast path)
- Pattern match (like strategy 2, slower path, but will only need to iterate over the triggers in this collection)
?
You can do do this in a follow-up PR if you prefer, since it's a performance issue only at scale and will affect this file quite a bit, if you do - just open an issue to track
| StreamTriggers::AtProto { event } => { | ||
| // Convert CommitAction to AtProtoAction | ||
| let action_enum = match event.action { | ||
| streams::atproto_jetstream::CommitAction::Create => AtProtoAction::Create, |
There was a problem hiding this comment.
Why not just get rid of CommitAction and use AtProtoAction everywhere?
| { | ||
| let stream = async_stream::stream! { | ||
| let mut reconnect_count = 0; | ||
| let max_reconnects = 10; |
There was a problem hiding this comment.
As David's comment, same here - this should be configurable via JetstreamConfig.
There was a problem hiding this comment.
ref comment above
| let delay = std::cmp::min( | ||
| base_delay * 2_u32.pow(reconnect_count), | ||
| max_delay | ||
| ) + Duration::from_millis(rand::random::<u64>() % 1000); |
There was a problem hiding this comment.
nit:
| ) + Duration::from_millis(rand::random::<u64>() % 1000); | |
| ) + Duration::from_millis(rand::thread_rng().gen_range(0..1000)); |
IMO more readable plus random::<u64> can generate big numbers, on the other hand my suggestion requires an extra import. 🤷
packages/wavs/src/subsystems/trigger/streams/atproto_jetstream.rs
Outdated
Show resolved
Hide resolved
| /// Set an ATProto Jetstream event trigger for a workflow | ||
| SetAtProtocol { | ||
| /// Collection NSID to filter for (e.g., "app.bsky.feed.post") | ||
| /// Supports wildcards with prefix matching (e.g., "app.bsky.feed.*") |
There was a problem hiding this comment.
| /// Supports wildcards with prefix matching (e.g., "app.bsky.feed.*") | |
| /// Supports wildcards with suffix matching (e.g., "app.bsky.feed.*") |
the function matches at .*.
There was a problem hiding this comment.
imo correct as is, because the prefix is what's matched when using wildcard.
also adds comment on max message size (0 = no max size)
dakom
left a comment
There was a problem hiding this comment.
Thanks for fixing up all those comments, LGTM!
|
I also had two suggestions in the base comment. #1100 (review) |
Working locally with events coming (e2e set up but with a simulated trigger)
TODO: