diff --git a/cloudflare/Cargo.toml b/cloudflare/Cargo.toml index c7a6fbc..f10b1b5 100644 --- a/cloudflare/Cargo.toml +++ b/cloudflare/Cargo.toml @@ -36,6 +36,7 @@ thiserror = "2" url = "2.2" urlencoding = "2.1.3" uuid = { version = "1.0", features = ["serde"] } +paste = "1.0.15" [dev-dependencies] mockito = { version = "1.6.1" } diff --git a/cloudflare/src/endpoints/api_endpoint.rs b/cloudflare/src/endpoints/api_endpoint.rs new file mode 100644 index 0000000..cd5adc5 --- /dev/null +++ b/cloudflare/src/endpoints/api_endpoint.rs @@ -0,0 +1,83 @@ +#[macro_export] +macro_rules! api_results { + ($name:ident { + $( + $(#[$field_meta:meta])* + $field:ident : $ty:ty + ),* $(,)? + }) => { + paste! { + #[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] + pub struct [<$name Results>] { + $( + $(#[$field_meta])* + pub $field: Option<$ty>, + )* + } + } + }; +} +#[macro_export] +macro_rules! api_gen { + ($st:ty) => { + impl ApiResult for $st {} + }; +} +#[macro_export] +macro_rules! api_endpoint { + // Pattern 1: With params (explicit method and params marker) + ($(#[$docs:meta])* + $method:ident, $name:ident => $response:ty , $path:expr; $($field:ident),+; params: $params:ty) => { + $(#[$docs])* + #[derive(Debug)] + pub struct $name<'a> { + $(pub $field: &'a str,)+ + pub params: $params, + } + + impl<'a> EndpointSpec for $name<'a> { + type JsonResponse = $response; + type ResponseType = ApiSuccess; + + fn method(&self) -> Method { + Method::$method + } + + fn path(&self) -> String { + format!($path, $(self.$field),+) + } + + #[inline] + fn body(&self) -> Option { + Some(RequestBody::Json( + serde_json::to_string(&self.params) + .expect("Failed to serialize request body") + )) + } + } + }; + + // Pattern 2: Without params (explicit method) and return type + ($(#[$docs:meta])* $method:ident, $name:ident => $response:ty , $path:expr; $($field:ident),+) => { + $(#[$docs])* + #[derive(Debug)] + pub struct $name<'a> { + $(pub $field: &'a str,)+ + } + + impl<'a> EndpointSpec for $name<'a> { + type JsonResponse = $response; + type ResponseType = ApiSuccess; + + fn method(&self) -> Method { + Method::$method + } + + fn path(&self) -> String { + format!($path, $(self.$field),+) + } + + } + }; + +} diff --git a/cloudflare/src/endpoints/mod.rs b/cloudflare/src/endpoints/mod.rs index f590cc9..aae7ac2 100644 --- a/cloudflare/src/endpoints/mod.rs +++ b/cloudflare/src/endpoints/mod.rs @@ -9,6 +9,9 @@ pub mod argo_tunnel; pub mod cfd_tunnel; pub mod dns; pub mod load_balancing; +#[macro_use] +mod api_endpoint; +pub mod queue; pub mod r2; pub mod workers; pub mod workerskv; diff --git a/cloudflare/src/endpoints/queue/consumer.rs b/cloudflare/src/endpoints/queue/consumer.rs new file mode 100644 index 0000000..b8e6b3a --- /dev/null +++ b/cloudflare/src/endpoints/queue/consumer.rs @@ -0,0 +1,58 @@ +use crate::api_endpoint; +use crate::framework::endpoint::{EndpointSpec, Method, RequestBody}; +use crate::framework::response::{ApiResult, ApiSuccess}; + +use super::consumer_types::Consumer; + +// INFO: ref. https://developers.cloudflare.com/api/resources/queues/subresources/consumers/ + +api_gen!(Consumer); +api_endpoint!( + /// Create A Queue Consumer + /// Creates a new consumer for a Queue. + /// + POST, + CreateQueueConsumer => Consumer, + "accounts/{}/queues/{}/consumers"; + account_id, + queue_id; + params: Consumer +); + +api_endpoint!( + /// Delete Queue Consumer + /// Deletes the consumer for a queue. + /// + DELETE, + DeleteQueueConsumer => (), + "accounts/{}/queues/{}/consumers/{}"; + account_id, + queue_id, + consumer_id +); + +api_gen!(Vec); + +api_endpoint!( + /// List Queue Consumers + /// Returns the consumers for a Queue. + /// + GET, + ListQueueConsumer => Vec, + "accounts/{}/queues/{}/consumers"; + account_id, + queue_id +); + +api_endpoint!( + /// Update Queue Consumer + /// Updates the consumer for a queue, or creates one if it does not exist.. + /// + PUT, + UpdateQueueConsumer => Consumer, + "accounts/{}/queues/{}/consumers/{}"; + account_id, + queue_id, + consumer_id; + params: Consumer +); diff --git a/cloudflare/src/endpoints/queue/consumer_types.rs b/cloudflare/src/endpoints/queue/consumer_types.rs new file mode 100644 index 0000000..b74158a --- /dev/null +++ b/cloudflare/src/endpoints/queue/consumer_types.rs @@ -0,0 +1,114 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(untagged)] +pub enum Consumer { + Worker(MqWorkerConsumer), + HttpPull(MqHttpConsumer), +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub struct MqWorkerConsumer { + /// A Resource identifier. + #[serde(skip_serializing_if = "Option::is_none")] + pub consumer_id: Option, // Optional (maxLength: 32) + + #[serde(skip_serializing_if = "Option::is_none")] + pub created_on: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub dead_letter_queue: Option, + + /// A Resource identifier. + /// Optional (maxLength: 32) + #[serde(skip_serializing_if = "Option::is_none")] + pub queue_id: Option, // Optional (maxLength: 32) + + /// Name of a Worker + #[serde(skip_serializing_if = "Option::is_none")] + pub script: Option, + + /// Name of a Worker + #[serde(skip_serializing_if = "Option::is_none")] + pub script_name: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub settings: Option, + + #[serde(rename = "type")] + pub type_: Option, // "worker" +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ConsumerType { + Worker, + HttpPull, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct WorkerSettings { + /// The maximum number of messages to include in a batch. + #[serde(skip_serializing_if = "Option::is_none")] + pub batch_size: Option, + + /// Maximum number of concurrent consumers that may consume from this Queue. Set to null to automatically opt in to the platform's maximum (recommended). + #[serde(skip_serializing_if = "Option::is_none")] + pub max_concurrency: Option, + + /// The maximum number of retries + #[serde(skip_serializing_if = "Option::is_none")] + pub max_retries: Option, + + /// The number of milliseconds to wait for a batch to fill up before attempting to deliver it + #[serde(skip_serializing_if = "Option::is_none")] + pub max_wait_time_ms: Option, + + /// The number of seconds to delay before making the message available for another attempt. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_delay: Option, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub struct MqHttpConsumer { + /// A Resource identifier. + #[serde(skip_serializing_if = "Option::is_none")] + pub consumer_id: Option, // Optional (maxLength: 32) + + #[serde(skip_serializing_if = "Option::is_none")] + pub created_on: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub dead_letter_queue: Option, + + /// A Resource identifier. + #[serde(skip_serializing_if = "Option::is_none")] + pub queue_id: Option, // Optional (maxLength: 32) + + #[serde(skip_serializing_if = "Option::is_none")] + pub settings: Option, + + #[serde(rename = "type")] + pub type_: Option, // "http_pull" +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct HttpSettings { + /// The maximum number of messages to include in a batch. + #[serde(skip_serializing_if = "Option::is_none")] + pub batch_size: Option, + + /// The maximum number of retries + #[serde(skip_serializing_if = "Option::is_none")] + pub max_retries: Option, + + /// The number of seconds to delay before making the message available for another attempt. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_delay: Option, + + /// The number of milliseconds that a message is exclusively leased. After the timeout, the message becomes available for another attempt. + #[serde(skip_serializing_if = "Option::is_none")] + pub visibility_timeout_ms: Option, +} diff --git a/cloudflare/src/endpoints/queue/messages.rs b/cloudflare/src/endpoints/queue/messages.rs new file mode 100644 index 0000000..028a033 --- /dev/null +++ b/cloudflare/src/endpoints/queue/messages.rs @@ -0,0 +1,30 @@ +use crate::api_endpoint; +use crate::framework::endpoint::{EndpointSpec, Method, RequestBody}; +use crate::framework::response::{ApiResult, ApiSuccess}; + +use super::messages_types::{AckQueueResults, ActionMessage, PullQueueResults}; + +// INFO: ref. https://developers.cloudflare.com/api/resources/queues/subresources/messages/ + +api_gen!(AckQueueResults); +api_endpoint!( + /// Acknowledge + Retry messages from a Queue. + /// + POST, + AckRetry => AckQueueResults, + "accounts/{}/queues/{}/messages/ack"; + account_id, + queue_id; + params: ActionMessage +); + +api_gen!(PullQueueResults); +api_endpoint!( + /// Pull a batch of messages from a Queue. + /// + POST, + PullQueue => PullQueueResults, + "/accounts/{}/queues/{}/messages/pull"; + account_id, + queue_id +); diff --git a/cloudflare/src/endpoints/queue/messages_types.rs b/cloudflare/src/endpoints/queue/messages_types.rs new file mode 100644 index 0000000..d8cde37 --- /dev/null +++ b/cloudflare/src/endpoints/queue/messages_types.rs @@ -0,0 +1,40 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct ActionMessage { + #[serde(skip_serializing_if = "Option::is_none")] + acks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + retries: Option>, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct AcksRequest { + /// An ID that represents an "in-flight" message that has been pulled from a Queue. You must hold on to this ID and use it to acknowledge this message. + lease_id: String, +} +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct RetriesRequest { + /// The number of seconds to delay before making the message available for another attempt. + #[serde(skip_serializing_if = "Option::is_none")] + delay_seconds: Option, + /// An ID that represents an "in-flight" message that has been pulled from a Queue. You must hold on to this ID and use it to acknowledge this message. + #[serde(skip_serializing_if = "Option::is_none")] + lease_id: Option, +} + +api_results!(AckQueue { + /// The number of messages that were succesfully acknowledged. + ackCount: u64, + /// The number of messages that were succesfully retried. + retryCount: u64, + warnings: Vec, +}); + +api_results!(PullQueue { + id: String, + attempts: i64, + body: String, + /// An ID that represents an "in-flight" message that has been pulled from a Queue. You must hold on to this ID and use it to acknowledge this message. + lease_id: String, +}); diff --git a/cloudflare/src/endpoints/queue/mod.rs b/cloudflare/src/endpoints/queue/mod.rs new file mode 100644 index 0000000..335ce8c --- /dev/null +++ b/cloudflare/src/endpoints/queue/mod.rs @@ -0,0 +1,63 @@ +use crate::api_endpoint; +use crate::framework::endpoint::{EndpointSpec, Method, RequestBody}; +use crate::framework::response::{ApiResult, ApiSuccess}; +pub mod consumer; +pub mod consumer_types; +pub mod messages; +pub mod messages_types; +pub mod purge; +pub mod types; +use types::*; + +api_gen!(Queue); +api_endpoint!( + /// Create a new queue. + /// + POST, + CreateQueue => Queue, + "accounts/{}/queues"; + account_id; + params: Queue +); + +api_endpoint!( + /// Deletes a queue. + /// + DELETE, + DeleteQueue => (), + "accounts/{}/queues/{}"; + account_id, + queue_id +); + +api_endpoint!( + /// Edit a Queue. + /// + PATCH, + UpdateQueue => Queue, + "accounts/{}/queues/{}"; + account_id, + queue_id; + params: Queue +); + +api_endpoint!( + /// Get details about a specific queue.. + /// + GET, + GetQueue => Queue, + "accounts/{}/queues/{}"; + account_id, + queue_id +); + +api_gen!(Vec); + +api_endpoint!( + /// Returns the queues owned by an account.. + /// + GET, + ListQueue => Vec, + "accounts/{}/queues"; + account_id +); diff --git a/cloudflare/src/endpoints/queue/purge.rs b/cloudflare/src/endpoints/queue/purge.rs new file mode 100644 index 0000000..41d6b5e --- /dev/null +++ b/cloudflare/src/endpoints/queue/purge.rs @@ -0,0 +1,35 @@ +use crate::api_endpoint; +use crate::framework::endpoint::{EndpointSpec, Method, RequestBody}; +use crate::framework::response::{ApiResult, ApiSuccess}; +use serde::{Deserialize, Serialize}; + +use super::types::Queue; + +// INFO: ref: https://developers.cloudflare.com/api/resources/queues/subresources/purge/ + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct ConfirmDelete { + /// Confimation that all messages will be deleted permanently. + delete_messages_permanently: bool, +} + +api_gen!(ConfirmDelete); + +api_endpoint! ( + /// Purge Queue + /// Deletes all messages from the Queue. + /// https://developers.cloudflare.com/api/resources/queues/subresources/purge/ + POST, + PurgeQueue => Queue, + "accounts/{}/queues/{}/purge"; + account_id, + queue_id; + params: ConfirmDelete +); + +api_results!(QueuePurgeStatus { + /// Indicates if the last purge operation completed successfully. + completed: String, + /// Timestamp when the last purge operation started. + started_at: String +}); diff --git a/cloudflare/src/endpoints/queue/types.rs b/cloudflare/src/endpoints/queue/types.rs new file mode 100644 index 0000000..6e3e50e --- /dev/null +++ b/cloudflare/src/endpoints/queue/types.rs @@ -0,0 +1,64 @@ +use chrono::DateTime; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ProducerType { + Worker, + R2Bucket, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct MqWorkerProducer { + #[serde(skip_serializing_if = "Option::is_none")] + pub script: Option, + #[serde(rename = "type")] + pub type_: ProducerType, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct MqR2Producer { + #[serde(skip_serializing_if = "Option::is_none")] + pub bucket_name: Option, + #[serde(rename = "type")] + pub type_: ProducerType, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(untagged)] +pub enum Producer { + Worker(MqWorkerProducer), + R2Bucket(MqR2Producer), +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct QueueSettings { + #[serde(skip_serializing_if = "Option::is_none")] + pub delivery_delay: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub delivery_paused: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub message_retention_period: Option, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct Queue { + #[serde(skip_serializing_if = "Option::is_none")] + pub consumers: Option>, // Assuming Consumer is just a string for this example + #[serde(skip_serializing_if = "Option::is_none")] + pub consumers_total_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_on: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub modified_on: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub producers: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub producers_total_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub queue_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub queue_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub settings: Option, +} diff --git a/cloudflare/src/lib.rs b/cloudflare/src/lib.rs index 6518aa8..7714cd1 100644 --- a/cloudflare/src/lib.rs +++ b/cloudflare/src/lib.rs @@ -1,4 +1,6 @@ #![forbid(unsafe_code)] +#[macro_use] +extern crate paste; pub mod endpoints; pub mod framework;