11use agent_twitter_client:: { models:: Tweet , scraper:: Scraper , search:: SearchMode } ;
2- use anda_core:: { Agent , BoxError , CacheExpiry , CacheFeatures , CompletionFeatures , StateFeatures } ;
2+ use anda_core:: {
3+ Agent , BoxError , CacheFeatures , CompletionFeatures , Path , PutMode , StateFeatures ,
4+ StoreFeatures ,
5+ } ;
36use anda_engine:: {
4- context:: AgentCtx , engine:: Engine , extension:: character:: CharacterAgent , rand_number,
7+ context:: AgentCtx ,
8+ engine:: Engine ,
9+ extension:: character:: CharacterAgent ,
10+ rand_number,
511} ;
612use anda_lancedb:: knowledge:: KnowledgeStore ;
7- use std:: { collections:: BTreeSet , sync:: Arc } ;
13+ use ciborium:: from_reader;
14+ use ic_cose_types:: to_cbor_bytes;
15+ use std:: sync:: Arc ;
816use tokio:: {
917 sync:: RwLock ,
1018 time:: { sleep, Duration } ,
@@ -13,8 +21,8 @@ use tokio_util::sync::CancellationToken;
1321
1422use crate :: handler:: ServiceStatus ;
1523
16- const MAX_TWEET_LENGTH : usize = 280 ;
1724const MAX_HISTORY_TWEETS : i64 = 21 ;
25+ const MAX_SEEN_TWEET_IDS : usize = 1000 ;
1826
1927static LOG_TARGET : & str = "twitter" ;
2028
@@ -40,7 +48,50 @@ impl TwitterDaemon {
4048 }
4149 }
4250
51+ async fn init_seen_tweet_ids < F > ( & self , ctx : & F )
52+ where
53+ F : CacheFeatures + StoreFeatures ,
54+ {
55+ // load seen_tweet_ids from store
56+ let seen_tweet_ids: Vec < String > = ctx
57+ . store_get ( & Path :: from ( "seen_tweet_ids" ) )
58+ . await
59+ . map ( |( v, _) | from_reader ( & v[ ..] ) . unwrap_or_default ( ) )
60+ . unwrap_or_default ( ) ;
61+
62+ ctx. cache_set ( "seen_tweet_ids" , ( seen_tweet_ids, None ) )
63+ . await ;
64+ }
65+
66+ async fn get_seen_tweet_ids < F > ( & self , ctx : & F ) -> Vec < String >
67+ where
68+ F : CacheFeatures + StoreFeatures ,
69+ {
70+ ctx. cache_get ( "seen_tweet_ids" ) . await . unwrap_or_default ( )
71+ }
72+
73+ async fn set_seen_tweet_ids < F > ( & self , ctx : F , val : Vec < String > )
74+ where
75+ F : CacheFeatures + StoreFeatures + Send + Sync + ' static ,
76+ {
77+ ctx. cache_set ( "seen_tweet_ids" , ( val. clone ( ) , None ) ) . await ;
78+ tokio:: spawn ( async move {
79+ let _ = ctx
80+ . store_put (
81+ & Path :: from ( "seen_tweet_ids" ) ,
82+ PutMode :: Overwrite ,
83+ to_cbor_bytes ( & val) . into ( ) ,
84+ )
85+ . await ;
86+ } ) ;
87+ }
88+
4389 pub async fn run ( & self , cancel_token : CancellationToken ) -> Result < ( ) , BoxError > {
90+ let ctx = self . engine . ctx_with ( self . agent . as_ref ( ) , None , None ) ?;
91+
92+ // load seen_tweet_ids from store
93+ self . init_seen_tweet_ids ( & ctx) . await ;
94+
4495 log:: info!( target: LOG_TARGET , "starting Twitter bot" ) ;
4596
4697 loop {
@@ -89,7 +140,7 @@ impl TwitterDaemon {
89140 }
90141 }
91142
92- if rand_number ( 0 ..=7 ) == 0 {
143+ if rand_number ( 0 ..=5 ) == 0 {
93144 if let Err ( err) = self . handle_home_timeline ( ) . await {
94145 log:: error!( target: LOG_TARGET , "handle_home_timeline error: {err:?}" ) ;
95146 }
@@ -157,10 +208,14 @@ impl TwitterDaemon {
157208 None ,
158209 ) ?;
159210
160- let mut seen_tweet_ids: BTreeSet < String > =
161- ctx. cache_get ( "seen_tweet_ids" ) . await . unwrap_or_default ( ) ;
162- let seen: Vec < String > = seen_tweet_ids. iter ( ) . cloned ( ) . collect ( ) ;
163- let tweets = self . scraper . get_home_timeline ( 1 , seen) . await ?;
211+ let mut seen_tweet_ids: Vec < String > = self . get_seen_tweet_ids ( & ctx) . await ;
212+ if seen_tweet_ids. len ( ) >= MAX_SEEN_TWEET_IDS {
213+ seen_tweet_ids. drain ( 0 ..MAX_SEEN_TWEET_IDS / 2 ) ;
214+ }
215+ let tweets = self
216+ . scraper
217+ . get_home_timeline ( 1 , seen_tweet_ids. clone ( ) )
218+ . await ?;
164219 log:: info!( target: LOG_TARGET , "process home timeline, {} tweets" , tweets. len( ) ) ;
165220
166221 let mut likes = 0 ;
@@ -191,7 +246,7 @@ impl TwitterDaemon {
191246 if seen_tweet_ids. contains ( & tweet_id) {
192247 continue ;
193248 }
194- seen_tweet_ids. insert ( tweet_id. clone ( ) ) ;
249+ seen_tweet_ids. push ( tweet_id. clone ( ) ) ;
195250
196251 let res: Result < ( ) , BoxError > = async {
197252 if self . handle_like ( & ctx, & tweet_content, & tweet_id) . await ? {
@@ -215,8 +270,7 @@ impl TwitterDaemon {
215270 sleep ( Duration :: from_secs ( rand_number ( 3 ..=10 ) ) ) . await ;
216271 }
217272
218- ctx. cache_set ( "seen_tweet_ids" , ( seen_tweet_ids, None ) )
219- . await ;
273+ self . set_seen_tweet_ids ( ctx, seen_tweet_ids) . await ;
220274 log:: info!( target: LOG_TARGET , "home timeline: likes {}, retweets {}, quotes {}" , likes, retweets, quotes) ;
221275 Ok ( ( ) )
222276 }
@@ -235,20 +289,13 @@ impl TwitterDaemon {
235289 let ctx = self
236290 . engine
237291 . ctx_with ( self . agent . as_ref ( ) , Some ( tweet_user. clone ( ) ) , None ) ?;
292+ let mut seen_tweet_ids: Vec < String > = self . get_seen_tweet_ids ( & ctx) . await ;
238293
239- let handle_key = format ! ( "D_{}" , tweet_id) ;
240- if ctx. cache_contains ( & handle_key) {
294+ if seen_tweet_ids. contains ( & tweet_id) {
241295 return Ok ( ( ) ) ;
242296 }
243297
244- ctx. cache_set (
245- & handle_key,
246- (
247- true ,
248- Some ( CacheExpiry :: TTL ( Duration :: from_secs ( 3600 * 24 * 3 ) ) ) ,
249- ) ,
250- )
251- . await ;
298+ seen_tweet_ids. push ( tweet_id. clone ( ) ) ;
252299
253300 let thread = self . build_conversation_thread ( & tweet) . await ?;
254301 let messages: Vec < String > = thread
@@ -268,29 +315,17 @@ impl TwitterDaemon {
268315 messages. join ( "\n " )
269316 } ;
270317
271- let res = self . agent . run ( ctx, tweet_text, None ) . await ?;
318+ let res = self . agent . run ( ctx. clone ( ) , tweet_text, None ) . await ?;
319+ if res. failed_reason . is_none ( ) {
320+ // Reply to the original tweet
321+ let tweet: Option < & str > = tweet. id . as_deref ( ) ;
322+ let _ = self . scraper . send_tweet ( & res. content , tweet, None ) . await ?;
272323
273- if res. failed_reason . is_some ( ) {
274- return Ok ( ( ) ) ;
324+ log:: info!( target: LOG_TARGET , "handle mention: {} - {}, {} chars" , tweet_user, tweet_id, res. content. chars( ) . count( ) ) ;
275325 }
276326
277- // Split response into tweet-sized chunks if necessary
278- let chunks: Vec < String > = res
279- . content
280- . chars ( )
281- . collect :: < Vec < char > > ( )
282- . chunks ( MAX_TWEET_LENGTH )
283- . map ( |chunk| chunk. iter ( ) . collect :: < String > ( ) )
284- . collect ( ) ;
285-
286- // Reply to the original tweet
287- let tweet: Option < & str > = tweet. id . as_deref ( ) ;
288- for chunk in & chunks {
289- let _ = self . scraper . send_tweet ( chunk. as_str ( ) , tweet, None ) . await ?;
290- sleep ( Duration :: from_secs ( rand_number ( 1 ..=3 ) ) ) . await ;
291- }
327+ self . set_seen_tweet_ids ( ctx, seen_tweet_ids. clone ( ) ) . await ;
292328
293- log:: info!( target: LOG_TARGET , "handle mention: {} - {}, {} chunks" , tweet_user, tweet_id, chunks. len( ) ) ;
294329 Ok ( ( ) )
295330 }
296331
0 commit comments