Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloudflare/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = "2"
url = "2.2"
urlencoding = "2.1.3"
uuid = { version = "1.0", features = ["serde"] }
paste = "1.0.15"

[dev-dependencies]
mockito = { version = "1.6.1" }
Expand Down
83 changes: 83 additions & 0 deletions cloudflare/src/endpoints/api_endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#[macro_export]
macro_rules! api_results {
($name:ident {
$(
$(#[$field_meta:meta])*
$field:ident : $ty:ty
),* $(,)?
}) => {
paste! {
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
pub struct [<$name Results>] {
$(
$(#[$field_meta])*
pub $field: Option<$ty>,
)*
}
}
};
}
#[macro_export]
macro_rules! api_gen {
($st:ty) => {
impl ApiResult for $st {}
};
}
#[macro_export]
macro_rules! api_endpoint {
// Pattern 1: With params (explicit method and params marker)
($(#[$docs:meta])*
$method:ident, $name:ident => $response:ty , $path:expr; $($field:ident),+; params: $params:ty) => {
$(#[$docs])*
#[derive(Debug)]
pub struct $name<'a> {
$(pub $field: &'a str,)+
pub params: $params,
}

impl<'a> EndpointSpec for $name<'a> {
type JsonResponse = $response;
type ResponseType = ApiSuccess<Self::JsonResponse>;

fn method(&self) -> Method {
Method::$method
}

fn path(&self) -> String {
format!($path, $(self.$field),+)
}

#[inline]
fn body(&self) -> Option<RequestBody> {
Some(RequestBody::Json(
serde_json::to_string(&self.params)
.expect("Failed to serialize request body")
))
}
}
};

// Pattern 2: Without params (explicit method) and return type
($(#[$docs:meta])* $method:ident, $name:ident => $response:ty , $path:expr; $($field:ident),+) => {
$(#[$docs])*
#[derive(Debug)]
pub struct $name<'a> {
$(pub $field: &'a str,)+
}

impl<'a> EndpointSpec for $name<'a> {
type JsonResponse = $response;
type ResponseType = ApiSuccess<Self::JsonResponse>;

fn method(&self) -> Method {
Method::$method
}

fn path(&self) -> String {
format!($path, $(self.$field),+)
}

}
};

}
3 changes: 3 additions & 0 deletions cloudflare/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub mod argo_tunnel;
pub mod cfd_tunnel;
pub mod dns;
pub mod load_balancing;
#[macro_use]
mod api_endpoint;
pub mod queue;
pub mod r2;
pub mod workers;
pub mod workerskv;
Expand Down
58 changes: 58 additions & 0 deletions cloudflare/src/endpoints/queue/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::api_endpoint;
use crate::framework::endpoint::{EndpointSpec, Method, RequestBody};
use crate::framework::response::{ApiResult, ApiSuccess};

use super::consumer_types::Consumer;

// INFO: ref. https://developers.cloudflare.com/api/resources/queues/subresources/consumers/

api_gen!(Consumer);
api_endpoint!(
/// Create A Queue Consumer
/// Creates a new consumer for a Queue.
/// <https://developers.cloudflare.com/api/resources/queues/subresources/consumers/methods/create/>
POST,
CreateQueueConsumer => Consumer,
"accounts/{}/queues/{}/consumers";
account_id,
queue_id;
params: Consumer
);

api_endpoint!(
/// Delete Queue Consumer
/// Deletes the consumer for a queue.
/// <https://developers.cloudflare.com/api/resources/queues/subresources/consumers/methods/delete/>
DELETE,
DeleteQueueConsumer => (),
"accounts/{}/queues/{}/consumers/{}";
account_id,
queue_id,
consumer_id
);

api_gen!(Vec<Consumer>);

api_endpoint!(
/// List Queue Consumers
/// Returns the consumers for a Queue.
/// <https://developers.cloudflare.com/api/resources/queues/subresources/consumers/methods/get/>
GET,
ListQueueConsumer => Vec<Consumer>,
"accounts/{}/queues/{}/consumers";
account_id,
queue_id
);

api_endpoint!(
/// Update Queue Consumer
/// Updates the consumer for a queue, or creates one if it does not exist..
/// <https://developers.cloudflare.com/api/resources/queues/subresources/consumers/methods/update/>
PUT,
UpdateQueueConsumer => Consumer,
"accounts/{}/queues/{}/consumers/{}";
account_id,
queue_id,
consumer_id;
params: Consumer
);
114 changes: 114 additions & 0 deletions cloudflare/src/endpoints/queue/consumer_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum Consumer {
Worker(MqWorkerConsumer),
HttpPull(MqHttpConsumer),
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub struct MqWorkerConsumer {
/// A Resource identifier.
#[serde(skip_serializing_if = "Option::is_none")]
pub consumer_id: Option<String>, // Optional (maxLength: 32)

#[serde(skip_serializing_if = "Option::is_none")]
pub created_on: Option<String>,

#[serde(skip_serializing_if = "Option::is_none")]
pub dead_letter_queue: Option<String>,

/// A Resource identifier.
/// Optional (maxLength: 32)
#[serde(skip_serializing_if = "Option::is_none")]
pub queue_id: Option<String>, // Optional (maxLength: 32)

/// Name of a Worker
#[serde(skip_serializing_if = "Option::is_none")]
pub script: Option<String>,

/// Name of a Worker
#[serde(skip_serializing_if = "Option::is_none")]
pub script_name: Option<String>,

#[serde(skip_serializing_if = "Option::is_none")]
pub settings: Option<WorkerSettings>,

#[serde(rename = "type")]
pub type_: Option<ConsumerType>, // "worker"
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ConsumerType {
Worker,
HttpPull,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
pub struct WorkerSettings {
/// The maximum number of messages to include in a batch.
#[serde(skip_serializing_if = "Option::is_none")]
pub batch_size: Option<u32>,

/// Maximum number of concurrent consumers that may consume from this Queue. Set to null to automatically opt in to the platform's maximum (recommended).
#[serde(skip_serializing_if = "Option::is_none")]
pub max_concurrency: Option<u32>,

/// The maximum number of retries
#[serde(skip_serializing_if = "Option::is_none")]
pub max_retries: Option<u32>,

/// The number of milliseconds to wait for a batch to fill up before attempting to deliver it
#[serde(skip_serializing_if = "Option::is_none")]
pub max_wait_time_ms: Option<u64>,

/// The number of seconds to delay before making the message available for another attempt.
#[serde(skip_serializing_if = "Option::is_none")]
pub retry_delay: Option<u32>,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub struct MqHttpConsumer {
/// A Resource identifier.
#[serde(skip_serializing_if = "Option::is_none")]
pub consumer_id: Option<String>, // Optional (maxLength: 32)

#[serde(skip_serializing_if = "Option::is_none")]
pub created_on: Option<String>,

#[serde(skip_serializing_if = "Option::is_none")]
pub dead_letter_queue: Option<String>,

/// A Resource identifier.
#[serde(skip_serializing_if = "Option::is_none")]
pub queue_id: Option<String>, // Optional (maxLength: 32)

#[serde(skip_serializing_if = "Option::is_none")]
pub settings: Option<HttpSettings>,

#[serde(rename = "type")]
pub type_: Option<ConsumerType>, // "http_pull"
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
pub struct HttpSettings {
/// The maximum number of messages to include in a batch.
#[serde(skip_serializing_if = "Option::is_none")]
pub batch_size: Option<u32>,

/// The maximum number of retries
#[serde(skip_serializing_if = "Option::is_none")]
pub max_retries: Option<u32>,

/// The number of seconds to delay before making the message available for another attempt.
#[serde(skip_serializing_if = "Option::is_none")]
pub retry_delay: Option<u32>,

/// The number of milliseconds that a message is exclusively leased. After the timeout, the message becomes available for another attempt.
#[serde(skip_serializing_if = "Option::is_none")]
pub visibility_timeout_ms: Option<u64>,
}
30 changes: 30 additions & 0 deletions cloudflare/src/endpoints/queue/messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::api_endpoint;
use crate::framework::endpoint::{EndpointSpec, Method, RequestBody};
use crate::framework::response::{ApiResult, ApiSuccess};

use super::messages_types::{AckQueueResults, ActionMessage, PullQueueResults};

// INFO: ref. https://developers.cloudflare.com/api/resources/queues/subresources/messages/

api_gen!(AckQueueResults);
api_endpoint!(
/// Acknowledge + Retry messages from a Queue.
/// <https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/ack/>
POST,
AckRetry => AckQueueResults,
"accounts/{}/queues/{}/messages/ack";
account_id,
queue_id;
params: ActionMessage
);

api_gen!(PullQueueResults);
api_endpoint!(
/// Pull a batch of messages from a Queue.
/// <https://developers.cloudflare.com/api/resources/queues/subresources/messages/methods/pull/>
POST,
PullQueue => PullQueueResults,
"/accounts/{}/queues/{}/messages/pull";
account_id,
queue_id
);
40 changes: 40 additions & 0 deletions cloudflare/src/endpoints/queue/messages_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
pub struct ActionMessage {
#[serde(skip_serializing_if = "Option::is_none")]
acks: Option<Vec<AcksRequest>>,
#[serde(skip_serializing_if = "Option::is_none")]
retries: Option<Vec<RetriesRequest>>,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
pub struct AcksRequest {
/// An ID that represents an "in-flight" message that has been pulled from a Queue. You must hold on to this ID and use it to acknowledge this message.
lease_id: String,
}
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
pub struct RetriesRequest {
/// The number of seconds to delay before making the message available for another attempt.
#[serde(skip_serializing_if = "Option::is_none")]
delay_seconds: Option<i64>,
/// An ID that represents an "in-flight" message that has been pulled from a Queue. You must hold on to this ID and use it to acknowledge this message.
#[serde(skip_serializing_if = "Option::is_none")]
lease_id: Option<String>,
}

api_results!(AckQueue {
/// The number of messages that were succesfully acknowledged.
ackCount: u64,
/// The number of messages that were succesfully retried.
retryCount: u64,
warnings: Vec<String>,
});

api_results!(PullQueue {
id: String,
attempts: i64,
body: String,
/// An ID that represents an "in-flight" message that has been pulled from a Queue. You must hold on to this ID and use it to acknowledge this message.
lease_id: String,
});
Loading