Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
04d9eaa
Progress on at triggers
ismellike Dec 2, 2025
ac593d7
has_start_atproto_stream
ismellike Dec 2, 2025
91b9333
Update events.wit
ismellike Dec 2, 2025
08cecf9
add config
ismellike Dec 2, 2025
793184d
wkg wit fetch
ismellike Dec 4, 2025
cf42c3b
Update bindings
ismellike Dec 4, 2025
7514aef
at proto support in echo data
ismellike Dec 4, 2025
6d75ba8
just wasi-build-native
ismellike Dec 4, 2025
67a8147
atproto echo data test
ismellike Dec 4, 2025
0c2add3
atproto-echo
ismellike Dec 5, 2025
cf3d09a
no jetstream compression
ismellike Dec 5, 2025
bc701d9
Working e2e
ismellike Dec 5, 2025
c020429
Update atproto_jetstream.rs
ismellike Dec 5, 2025
958ecae
Update atproto_jetstream.rs
ismellike Dec 5, 2025
63ea938
Merge branch 'main' into at-protocol-triggers
ismellike Dec 5, 2025
7e8fbbc
Remove default jetstream config
ismellike Dec 8, 2025
92f0ea3
Merge branch 'at-protocol-triggers' of https://github.com/Lay3rLabs/W…
ismellike Dec 8, 2025
644e5a2
Update atproto_jetstream.rs
ismellike Dec 8, 2025
35376d5
at proto action + cli
ismellike Dec 8, 2025
5b693c0
Update trigger.rs
ismellike Dec 8, 2025
93f9427
just download-wit at-protocol-triggers
ismellike Dec 8, 2025
2d5fc45
Ignore sequence and timestamp from eventIdSalt creation
ismellike Dec 8, 2025
4e722c8
Add rev + op_index
ismellike Dec 8, 2025
c207ae5
all tests
ismellike Dec 10, 2025
4e973c3
Improve at proto eventidsalt comment
ismellike Dec 10, 2025
f7c633b
Remove redundant TryFrom atprotoaction
ismellike Dec 10, 2025
1d8a56d
Remove jetstream truncation
ismellike Dec 10, 2025
ba65bc9
Use strongly-typed JetStreamEvent
ismellike Dec 10, 2025
d740547
just wasi-build-native
ismellike Dec 10, 2025
0151331
Improve jetstream types
ismellike Dec 10, 2025
fe307d8
Improve connection state handling
ismellike Dec 10, 2025
030bcda
Split at_proto triggers into exact and pattern
ismellike Dec 10, 2025
026f287
Improve jetstream url creation
ismellike Dec 10, 2025
291258f
Add metrics for jetstream
ismellike Dec 10, 2025
fc3829a
bump version
ismellike Dec 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions checksums.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
49f13595653139b26ce6d4906068cc9ebf7653388d17db9ac842cdaeb4b8947c ./examples/build/components/chain_trigger_lookup.wasm
66b0d53dc12e7b129e595aa7833e30fa2d3a8c83b3784b94b8b53372e2d01e70 ./examples/build/components/cosmos_query.wasm
2f1ed9ec91bdfa67ae8a10ec91851a446a3edf328349b2ab60ed942e753005c3 ./examples/build/components/echo_block_interval.wasm
a832577cef71c406e3a2b2e62399b76a2cc21c8ca275049d4511312cc612709a ./examples/build/components/echo_cron_interval.wasm
232c4e66312d17d8b1d7b1b0b69b16720d5a61a84ebd7343eecff873c5ba0128 ./examples/build/components/echo_data.wasm
0fd4871e78fca7a4bb8c5673820217989bfe71ca932e990cf07511a65d842393 ./examples/build/components/example_helpers.wasm
c057412a98072cc525d4904f2ce69e9b3db6828127c0d0e74fc5d27bb9e2f6c0 ./examples/build/components/chain_trigger_lookup.wasm
f64db958702d02165d01f649b835de27fb26604458cd944aa402d2a64a9abea6 ./examples/build/components/cosmos_query.wasm
0bae464a586b193ce50351739149b7cf1628a349ef496d865d945d35a0fff83d ./examples/build/components/echo_block_interval.wasm
63911898c815d989511631cfb9cb78c2aa9c69c116909f22a28157aad0e015b3 ./examples/build/components/echo_cron_interval.wasm
ddbf07e22e4a93e640e70a2580c8dbf15cef2223116ce5abcfdfbe41a5de152b ./examples/build/components/echo_data.wasm
f5669a2536858efb2e1f83793b6c36e8743e3c21f4ddb2dfa42cc4200f40429c ./examples/build/components/example_helpers.wasm
980288228c1f56076eff41eff796fcbdf0d400890d4322f043e843559620fae2 ./examples/build/components/example_types.wasm
59f8d70572b29620807a4e9a2ec85c6069d1148bd6021277372478d8cedc1fed ./examples/build/components/kv_store.wasm
8d7a5c7e2b8e141d72fa04aa4d699417d69afddd9b6ffb840ce68da8d9c2424f ./examples/build/components/permissions.wasm
fe469ca930b46b071a599ed7bb25ae72d4c1dd25a3ed456386c1c7a7a302c5fa ./examples/build/components/simple_aggregator.wasm
780b81942931cc4793279e04f41fa44ef0c4ab98bf10d08e65d4215634583be7 ./examples/build/components/square.wasm
4705544270cbb69ac37ced36305f15dc9edf3fb390dfa172c01aae90f88fdc8d ./examples/build/components/timer_aggregator.wasm
de29003bff691d534240cad4d161099587c33a8afd095dbe071323cbb4e94ffb ./examples/build/components/kv_store.wasm
d1d1feb85561f1dcefd44c251be170b6c861018bfad4f9711f8f84741fc5f784 ./examples/build/components/permissions.wasm
d5839a45389557485143c2fd47f0964ef3cfdd4afcf92359d2a61e6337ee6cef ./examples/build/components/simple_aggregator.wasm
696500314f1a6f2be557ea32c11691e0cddc33ded0e06b3070c154eb002459dc ./examples/build/components/square.wasm
e36abf968346149fb588601c03b1952bb11885446706f4b64cb55a39e74973bd ./examples/build/components/timer_aggregator.wasm
Binary file modified examples/build/components/chain_trigger_lookup.wasm
Binary file not shown.
Binary file modified examples/build/components/cosmos_query.wasm
Binary file not shown.
Binary file modified examples/build/components/echo_block_interval.wasm
Binary file not shown.
Binary file modified examples/build/components/echo_cron_interval.wasm
Binary file not shown.
Binary file modified examples/build/components/echo_data.wasm
Binary file not shown.
Binary file modified examples/build/components/example_helpers.wasm
Binary file not shown.
Binary file modified examples/build/components/kv_store.wasm
Binary file not shown.
Binary file modified examples/build/components/permissions.wasm
Binary file not shown.
Binary file modified examples/build/components/simple_aggregator.wasm
Binary file not shown.
Binary file modified examples/build/components/square.wasm
Binary file not shown.
Binary file modified examples/build/components/timer_aggregator.wasm
Binary file not shown.
15 changes: 14 additions & 1 deletion examples/components/_helpers/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
use crate::bindings::world::wavs::{
operator::{input as component_input, output as component_output},
types::{
events::{TriggerDataCosmosContractEvent, TriggerDataEvmContractEvent},
events::{
TriggerDataAtprotoEvent, TriggerDataCosmosContractEvent, TriggerDataEvmContractEvent,
},
service::ServiceManager,
},
};
Expand Down Expand Up @@ -35,6 +37,17 @@ pub fn decode_trigger_event(trigger_data: component_input::TriggerData) -> Resul
Ok((trigger_info.triggerId, trigger_info.data.to_vec()))
}
component_input::TriggerData::Raw(bytes) => Ok((0, bytes)),
component_input::TriggerData::AtprotoEvent(TriggerDataAtprotoEvent {
record_data,
sequence,
..
}) => Ok((
sequence.try_into().expect("Expected sequence to be u64"),
record_data
.expect("Record data was not provided")
.as_bytes()
.to_vec(),
)),
_ => Err(anyhow::anyhow!("Unsupported trigger data type")),
}
}
Expand Down
4 changes: 3 additions & 1 deletion examples/components/echo-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl Guest for Component {
}

let (maybe_trigger_id, data) = match trigger_action.data {
TriggerData::EvmContractEvent(_) | TriggerData::CosmosContractEvent(_) => {
TriggerData::EvmContractEvent(_)
| TriggerData::CosmosContractEvent(_)
| TriggerData::AtprotoEvent(_) => {
let (trigger_id, data) =
decode_trigger_event(trigger_action.data).map_err(|e| e.to_string())?;

Expand Down
22 changes: 21 additions & 1 deletion packages/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use utils::{
serde::deserialize_vec_string,
};
use wasm_pkg_client::{PackageRef, Version};
use wavs_types::{ChainKey, ComponentDigest, Credential, ServiceStatus, Timestamp, WorkflowId};
use wavs_types::{
AtProtoAction, ChainKey, ComponentDigest, Credential, ServiceStatus, Timestamp, WorkflowId,
};

use crate::config::Config;

Expand Down Expand Up @@ -386,6 +388,24 @@ pub enum TriggerCommand {
#[clap(long)]
end_time: Option<Timestamp>,
},

/// Set an ATProto Jetstream event trigger for a workflow
SetAtProtocol {
/// Collection NSID to filter for (e.g., "app.bsky.feed.post")
/// Supports wildcards with prefix matching (e.g., "app.bsky.feed.*")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Supports wildcards with prefix matching (e.g., "app.bsky.feed.*")
/// Supports wildcards with suffix matching (e.g., "app.bsky.feed.*")

the function matches at .*.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo correct as is, because the prefix is what's matched when using wildcard.

#[clap(long)]
collection: String,

/// Optional DID to filter for specific repositories
/// If not provided, will match events from any repository
#[clap(long)]
repo_did: Option<String>,

/// Action type to filter for (create, update, delete)
/// If not provided, will match all action types
#[clap(long)]
action: Option<AtProtoAction>,
},
}

impl Command {
Expand Down
23 changes: 23 additions & 0 deletions packages/cli/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ impl HttpClient {
Ok(response.digest)
}

pub async fn simulate_trigger(&self, req: wavs_types::SimulatedTriggerRequest) -> Result<()> {
let url = format!("{}/dev/triggers", self.endpoint);

let response = self
.inner
.post(&url)
.json(&req)
.send()
.await
.context("Failed to send simulated trigger request")?;

if response.status().is_success() {
Ok(())
} else {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "<failed to read body>".to_string());
anyhow::bail!("{} from {}: {}", status, url, body);
}
}

pub async fn register_aggregator_service(
&self,
service_manager: &ServiceManager,
Expand Down
65 changes: 61 additions & 4 deletions packages/cli/src/command/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use std::{
use utils::{config::WAVS_ENV_PREFIX, service::fetch_bytes, wkg::WkgClient};
use uuid::Uuid;
use wavs_types::{
AggregatorBuilder, AllowedHostPermission, AnyChainConfig, ByteArray, ChainKey, Component,
ComponentBuilder, ComponentDigest, ComponentSource, Registry, ServiceBuilder, ServiceManager,
ServiceManagerBuilder, ServiceStatus, SignatureKind, Submit, SubmitBuilder, Timestamp, Trigger,
TriggerBuilder, WorkflowBuilder, WorkflowId,
AggregatorBuilder, AllowedHostPermission, AnyChainConfig, AtProtoAction, ByteArray, ChainKey,
Component, ComponentBuilder, ComponentDigest, ComponentSource, Registry, ServiceBuilder,
ServiceManager, ServiceManagerBuilder, ServiceStatus, SignatureKind, Submit, SubmitBuilder,
Timestamp, Trigger, TriggerBuilder, WorkflowBuilder, WorkflowId,
};

use crate::{
Expand Down Expand Up @@ -116,6 +116,14 @@ pub async fn handle_service_command(
let result = set_cron_trigger(&file, id, schedule, start_time, end_time)?;
display_result(ctx, result, json)?;
}
TriggerCommand::SetAtProtocol {
collection,
repo_did,
action,
} => {
let result = set_atproto_trigger(&file, id, collection, repo_did, action)?;
display_result(ctx, result, json)?;
}
},
WorkflowCommand::Submit { id, command } => match command {
SubmitCommand::SetAggregator { url } => {
Expand Down Expand Up @@ -813,6 +821,55 @@ pub fn set_cron_trigger(
})
}

/// Set an ATProto Jetstream event trigger for a workflow
pub fn set_atproto_trigger(
file_path: &Path,
workflow_id: WorkflowId,
collection: String,
repo_did: Option<String>,
action: Option<AtProtoAction>,
) -> Result<WorkflowTriggerResult> {
modify_service_file(file_path, |mut service| {
let workflow = service.workflows.get_mut(&workflow_id).ok_or_else(|| {
anyhow::anyhow!("Workflow with ID '{}' not found in service", workflow_id)
})?;

// Validate collection format (basic NSID validation)
if !collection.contains('.') {
return Err(anyhow!(
"Invalid collection format '{}'. Expected NSID format like 'app.bsky.feed.post'",
collection
));
}

// Validate DID format if provided
if let Some(ref did) = repo_did {
if !did.starts_with("did:") {
return Err(anyhow!(
"Invalid DID format '{}'. Must start with 'did:'",
did
));
}
}

let trigger = Trigger::AtProtoEvent {
collection,
repo_did,
action,
};
workflow.trigger = TriggerBuilder::Trigger(trigger.clone());

Ok((
service,
WorkflowTriggerResult {
workflow_id,
trigger,
file_path: file_path.to_path_buf(),
},
))
})
}

/// Update workflow component using unified logic
pub async fn update_workflow_component(
ipfs_gateway: &str,
Expand Down
18 changes: 18 additions & 0 deletions packages/cli/src/command/service/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,24 @@ impl std::fmt::Display for WorkflowTriggerResult {
writeln!(f, " End Time: None")?;
}
}
Trigger::AtProtoEvent {
collection,
repo_did,
action,
} => {
writeln!(f, " Trigger Type: ATProto Event")?;
writeln!(f, " Collection: {}", collection)?;
if let Some(did) = repo_did {
writeln!(f, " Repo DID: {}", did)?;
} else {
writeln!(f, " Repo DID: None")?;
}
if let Some(act) = action {
writeln!(f, " Action: {}", act)?;
} else {
writeln!(f, " Action: None")?;
}
}
}

writeln!(f, " Updated: {}", self.file_path.display())
Expand Down
5 changes: 4 additions & 1 deletion packages/cli/src/command/service/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ pub async fn validate_contracts_exist(
}
}
// Other trigger types don't need contract validation
Trigger::Cron { .. } | Trigger::Manual | Trigger::BlockInterval { .. } => {}
Trigger::Cron { .. }
| Trigger::Manual
| Trigger::BlockInterval { .. }
| Trigger::AtProtoEvent { .. } => {}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/service_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ServiceJsonExt for ServiceBuilder {
));
}
}
Trigger::Manual => {}
Trigger::Manual | Trigger::AtProtoEvent { .. } => {}
},
}

Expand Down
5 changes: 5 additions & 0 deletions packages/engine/src/bindings/types/component_to_wavs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ impl TryFrom<component_service::Trigger> for wavs_types::Trigger {
start_time: source.start_time.map(Into::into),
end_time: source.end_time.map(Into::into),
},
component_service::Trigger::AtprotoEvent(source) => wavs_types::Trigger::AtProtoEvent {
collection: source.collection,
repo_did: source.repo_did,
action: source.action.map(TryInto::try_into).transpose()?,
},
})
}
}
Expand Down
82 changes: 82 additions & 0 deletions packages/engine/src/bindings/types/wavs_to_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ impl TryFrom<wavs_types::Trigger> for component_service::Trigger {
start_time: start_time.map(Into::into),
end_time: end_time.map(Into::into),
}),
wavs_types::Trigger::AtProtoEvent {
collection,
repo_did,
action,
} => component_service::Trigger::AtprotoEvent(component_service::TriggerAtprotoEvent {
collection,
repo_did,
action: action.map(|a| a.to_string()),
}),
})
}
}
Expand Down Expand Up @@ -432,6 +441,37 @@ impl TryFrom<wavs_types::TriggerData> for component_input::TriggerData {
trigger_time: trigger_time.into(),
}),
),
wavs_types::TriggerData::AtProtoEvent {
sequence,
timestamp,
repo,
collection,
rkey,
action,
cid,
record,
rev,
op_index,
} => {
let record_data = record
.map(|value| serde_json::to_string(&value))
.transpose()?;

Ok(component_input::TriggerData::AtprotoEvent(
component_events::TriggerDataAtprotoEvent {
sequence,
timestamp,
repo,
collection,
rkey,
action: action.to_string(),
cid,
record_data,
rev,
op_index,
},
))
}
wavs_types::TriggerData::Raw(data) => Ok(component_input::TriggerData::Raw(data)),
}
}
Expand Down Expand Up @@ -633,6 +673,37 @@ impl TryFrom<wavs_types::TriggerData> for aggregator_types::TriggerData {
trigger_time: trigger_time.into(),
}),
),
wavs_types::TriggerData::AtProtoEvent {
sequence,
timestamp,
repo,
collection,
rkey,
action,
cid,
record,
rev,
op_index,
} => {
let record_data = record
.map(|value| serde_json::to_string(&value))
.transpose()?;

Ok(aggregator_types::TriggerData::AtprotoEvent(
aggregator_events::TriggerDataAtprotoEvent {
sequence,
timestamp,
repo,
collection,
rkey,
action: action.to_string(),
cid,
record_data,
rev,
op_index,
},
))
}
wavs_types::TriggerData::Raw(data) => Ok(aggregator_types::TriggerData::Raw(data)),
}
}
Expand Down Expand Up @@ -804,6 +875,17 @@ impl TryFrom<wavs_types::Trigger> for aggregator_service::Trigger {
start_time: start_time.map(Into::into),
end_time: end_time.map(Into::into),
}),
wavs_types::Trigger::AtProtoEvent {
collection,
repo_did,
action,
} => {
aggregator_service::Trigger::AtprotoEvent(aggregator_service::TriggerAtprotoEvent {
collection,
repo_did,
action: action.map(|a| a.to_string()),
})
}
})
}
}
Expand Down
Loading
Loading