@@ -10,6 +10,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
1010use tokio:: runtime:: Runtime ;
1111use twmq:: error:: TwmqError ;
1212use twmq:: job:: JobError ;
13+ use twmq:: { BorrowedJob , UserCancellable } ;
1314
1415use twmq:: {
1516 DurableExecution , Queue ,
@@ -44,6 +45,14 @@ impl From<TwmqError> for BenchmarkErrorData {
4445 }
4546}
4647
48+ impl UserCancellable for BenchmarkErrorData {
49+ fn user_cancelled ( ) -> Self {
50+ BenchmarkErrorData {
51+ reason : "Transaction cancelled by user" . to_string ( ) ,
52+ }
53+ }
54+ }
55+
4756// Shared metrics across all benchmark jobs
4857#[ derive( Clone ) ]
4958pub struct BenchmarkMetrics {
@@ -100,7 +109,10 @@ impl DurableExecution for BenchmarkJobHandler {
100109 type ErrorData = BenchmarkErrorData ;
101110 type JobData = BenchmarkJobData ;
102111
103- async fn process ( & self , job : & Job < Self :: JobData > ) -> JobResult < Self :: Output , Self :: ErrorData > {
112+ async fn process (
113+ & self ,
114+ job : & BorrowedJob < Self :: JobData > ,
115+ ) -> JobResult < Self :: Output , Self :: ErrorData > {
104116 let start_time = SystemTime :: now ( )
105117 . duration_since ( UNIX_EPOCH )
106118 . unwrap ( )
@@ -119,7 +131,7 @@ impl DurableExecution for BenchmarkJobHandler {
119131 . fetch_add ( processing_time, Ordering :: SeqCst ) ;
120132
121133 // Fresh random decision each processing attempt
122- if rand:: thread_rng ( ) . gen_bool ( job. data . nack_probability ) {
134+ if rand:: thread_rng ( ) . gen_bool ( job. job . data . nack_probability ) {
123135 self . metrics . jobs_nacked . fetch_add ( 1 , Ordering :: SeqCst ) ;
124136
125137 // Random position for nacks as requested
@@ -140,7 +152,7 @@ impl DurableExecution for BenchmarkJobHandler {
140152 self . metrics . jobs_succeeded . fetch_add ( 1 , Ordering :: SeqCst ) ;
141153
142154 Ok ( BenchmarkOutput {
143- job_id : job. id . clone ( ) ,
155+ job_id : job. id ( ) . to_string ( ) ,
144156 processed_at : end_time,
145157 } )
146158 }
0 commit comments