-
Notifications
You must be signed in to change notification settings - Fork 0
Use AT protocol triggers #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
1fe0e4b
476bff8
d5b5dbe
6a0a9bb
df9b304
ef0eb99
c093002
0012e02
0731d17
79431bd
23ce962
9c4c511
75ddbcf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,10 @@ use crate::{ | |
| attestation::{build_attestation_payload, ReplyData}, | ||
| host::LogLevel, | ||
| wasi::keyvalue::store::{self, Bucket}, | ||
| wavs::{operator::input::TriggerData, types::events::TriggerDataCron}, | ||
| wavs::{ | ||
| operator::input::TriggerData, | ||
| types::events::{TriggerDataAtprotoEvent, TriggerDataCron}, | ||
| }, | ||
| }; | ||
|
|
||
| wit_bindgen::generate!({ | ||
|
|
@@ -28,7 +31,7 @@ const POSTS_KEY: &str = "posts"; | |
| const DID_KEY: &str = "did"; | ||
| const REPLIES_KEY_PREFIX: &str = "replies"; | ||
| const POST_TTL_NANOS: u64 = 1_209_600_000_000_000; // 2 weeks in nanos | ||
| const POST_LIMT: u32 = 20; | ||
| const POST_LIMIT: u32 = 20; | ||
|
|
||
| #[derive(Serialize, Deserialize)] | ||
| struct PostConfig { | ||
|
|
@@ -45,9 +48,8 @@ struct Component; | |
|
|
||
| impl Guest for Component { | ||
| fn run(action: TriggerAction) -> std::result::Result<Vec<WasmResponse>, String> { | ||
| let target_handle = std::env::var("WAVS_ENV_OPERATOR_TARGET_HANDLE").map_err(|_| { | ||
| "No target_handle provided (set WAVS_ENV_OPERATOR_TARGET_HANDLE)".to_string() | ||
| })?; | ||
| let target_handle = | ||
| host::config_var("TARGET_HANDLE").ok_or("No target_handle configured".to_string())?; | ||
|
|
||
| match action.data { | ||
| TriggerData::Cron(TriggerDataCron { trigger_time }) => { | ||
|
|
@@ -65,6 +67,36 @@ impl Guest for Component { | |
| .map_err(|e| e.to_string()) | ||
| }) | ||
| } | ||
| TriggerData::AtprotoEvent(TriggerDataAtprotoEvent { | ||
| sequence: _, | ||
| timestamp, | ||
| repo, | ||
| collection, | ||
| rkey, | ||
| action, | ||
| cid: _, | ||
| record_data, | ||
| }) => { | ||
| let client = AtProtocolClient::new(); | ||
| let timestamp: u64 = timestamp | ||
| .try_into() | ||
| .map_err(|_| "Invalid timestamp for atproto event".to_string())?; | ||
|
|
||
| block_on(async move { | ||
| handle_atproto_event( | ||
| client, | ||
| target_handle, | ||
| timestamp, | ||
| repo, | ||
| collection, | ||
| rkey, | ||
| action, | ||
| record_data, | ||
| ) | ||
| .await | ||
| .map_err(|e| e.to_string()) | ||
| }) | ||
| } | ||
| _ => unimplemented!("Trigger data variant not implemented"), | ||
| } | ||
| } | ||
|
|
@@ -90,7 +122,7 @@ pub async fn generate_payload( | |
| &format!("Resolved target handle {} to DID {did}", &target_handle), | ||
| ); | ||
|
|
||
| let posts = client.fetch_posts(&did, POST_LIMT).await?; | ||
| let posts = client.fetch_posts(&did, POST_LIMIT).await?; | ||
|
|
||
| let mut responses = vec![]; | ||
| let schema_uid = get_schema_uid()?; | ||
|
|
@@ -179,9 +211,157 @@ pub async fn generate_payload( | |
| Ok(responses) | ||
| } | ||
|
|
||
| #[allow(clippy::too_many_arguments)] | ||
| pub async fn handle_atproto_event( | ||
| client: AtProtocolClient, | ||
| target_handle: String, | ||
| timestamp: u64, | ||
| repo: String, | ||
| collection: String, | ||
| rkey: String, | ||
| action: String, | ||
| record_data: Option<String>, | ||
| ) -> anyhow::Result<Vec<WasmResponse>> { | ||
| let did_bucket = store::open(DID_KEY)?; | ||
|
|
||
| // Resolve and cache the target handle's DID for quick comparisons | ||
| let target_did = if let Some(cached_did) = did_bucket.get(&target_handle)? { | ||
| String::from_utf8(cached_did)? | ||
| } else { | ||
| let did = client.fetch_did(&target_handle).await?; | ||
| did_bucket.set(&target_handle, did.as_bytes())?; | ||
| did | ||
| }; | ||
|
|
||
| // Only process post creation events | ||
| if collection != "app.bsky.feed.post" || action != "create" { | ||
| host::log( | ||
| LogLevel::Info, | ||
| &format!( | ||
| "Ignoring event: collection={}, action={}", | ||
| collection, action | ||
| ), | ||
| ); | ||
| return Ok(vec![]); | ||
| } | ||
|
|
||
| // Construct the post URI from the event data (repo already includes full DID prefix) | ||
| let post_uri = format!("at://{}/app.bsky.feed.post/{}", repo, rkey); | ||
|
Comment on lines
+249
to
+250
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. repo == did no need to validate if we get to this point I think
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is called just at the beginning of handle event, where that field is decoded into a string. |
||
|
|
||
| // Parse the included record; if absent or invalid, skip without fetching | ||
| let current_post_record = match record_data.as_ref() { | ||
| Some(raw) => match serde_json::from_str::<post::Record>(raw) { | ||
| Ok(record) => record, | ||
| Err(e) => { | ||
| host::log( | ||
| LogLevel::Error, | ||
| &format!("Failed to parse record_data: {}", e), | ||
| ); | ||
| return Ok(vec![]); | ||
| } | ||
ueco-jb marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| }, | ||
| None => { | ||
| host::log( | ||
| LogLevel::Error, | ||
| "No record_data supplied on atproto event; skipping", | ||
| ); | ||
| return Ok(vec![]); | ||
| } | ||
| }; | ||
|
|
||
| // Check if this post has a parent (is a reply) | ||
| let parent_uri = match current_post_record.reply.as_ref() { | ||
| Some(reply) => { | ||
| host::log( | ||
| LogLevel::Debug, | ||
| &format!("Post is a reply to parent: {}", reply.parent.uri), | ||
| ); | ||
| reply.parent.uri.clone() | ||
| } | ||
| None => { | ||
| host::log(LogLevel::Debug, "Post is not a reply, skipping"); | ||
| return Ok(vec![]); | ||
| } | ||
| }; | ||
|
|
||
| // Extract the parent author's DID from the parent post URI | ||
| // The URI format is: at://did:plc:.../app.bsky.feed.post/... | ||
| let parent_did = match extract_did_from_uri(&parent_uri) { | ||
| Ok(did) => did, | ||
| Err(e) => { | ||
| host::log( | ||
| LogLevel::Error, | ||
| &format!("Error extracting DID from parent URI: {}", e), | ||
| ); | ||
| return Ok(vec![]); | ||
| } | ||
| }; | ||
|
|
||
| // Check if the parent post is by the targeted DID (faster than handle lookups) | ||
| if parent_did != target_did { | ||
| return Ok(vec![]); | ||
| } | ||
|
|
||
| host::log( | ||
| LogLevel::Info, | ||
| "Parent post is by targeted handle! Creating response for reply", | ||
| ); | ||
|
|
||
| // Extract the author's DID from the current post URI | ||
| let author_did = match extract_did_from_uri(&post_uri) { | ||
ueco-jb marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(did) => did, | ||
| Err(e) => { | ||
| host::log( | ||
| LogLevel::Error, | ||
| &format!("Error extracting DID from post URI: {}", e), | ||
| ); | ||
| return Ok(vec![]); | ||
| } | ||
| }; | ||
|
|
||
| // Create the reply data | ||
| let reply_data = ReplyData { | ||
| author_did, | ||
| reply_text: current_post_record.text.clone(), | ||
| reply_uri: post_uri.clone(), | ||
| parent_post_uri: parent_uri.clone(), | ||
| timestamp, | ||
| }; | ||
|
|
||
| // Build and return the attestation | ||
| let schema_uid = get_schema_uid()?; | ||
| match build_attestation_payload(reply_data, schema_uid) { | ||
| Ok(response) => { | ||
| host::log( | ||
| LogLevel::Info, | ||
| &format!("Created attestation for reply to {}", target_handle), | ||
| ); | ||
| Ok(vec![response]) | ||
| } | ||
| Err(e) => { | ||
| host::log( | ||
| LogLevel::Error, | ||
| &format!("Failed to build attestation: {}", e), | ||
| ); | ||
| Ok(vec![]) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Extract DID from an ATProto URI | ||
| /// URI format: at://did:.../collection/rkey | ||
| fn extract_did_from_uri(uri: &str) -> anyhow::Result<String> { | ||
| let parts: Vec<&str> = uri.split('/').collect(); | ||
| if parts.len() >= 3 && parts[0] == "at:" { | ||
| Ok(parts[2].to_string()) | ||
| } else { | ||
| Err(anyhow::anyhow!("Invalid ATProto URI format: {}", uri)) | ||
| } | ||
| } | ||
ueco-jb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| fn get_schema_uid() -> anyhow::Result<FixedBytes<32>> { | ||
| let schema_str = std::env::var("WAVS_ENV_EAS_SCHEMA_UID") | ||
| .map_err(|_| anyhow::anyhow!("WAVS_ENV_EAS_SCHEMA_UID not set"))?; | ||
| let schema_str = | ||
| host::config_var("EAS_SCHEMA_UID").ok_or(anyhow::anyhow!("EAS_SCHEMA_UID not set"))?; | ||
|
|
||
| schema_str | ||
| .parse() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.