Skip to content

Commit 3a43433

Browse files
authored
Implement send-example route (#976)
This change implements the `send-example` route which allows dispatching example events to a given endpoint.
1 parent ed61687 commit 3a43433

File tree

7 files changed

+271
-14
lines changed

7 files changed

+271
-14
lines changed

server/openapi.json

Lines changed: 122 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,20 @@
950950
],
951951
"type": "object"
952952
},
953+
"EventExampleIn": {
954+
"properties": {
955+
"eventType": {
956+
"example": "user.signup",
957+
"maxLength": 256,
958+
"pattern": "^[a-zA-Z0-9\\-_.]+$",
959+
"type": "string"
960+
}
961+
},
962+
"required": [
963+
"eventType"
964+
],
965+
"type": "object"
966+
},
953967
"EventTypeIn": {
954968
"properties": {
955969
"archived": {
@@ -4574,8 +4588,35 @@
45744588
},
45754589
"/api/v1/app/{app_id}/endpoint/{endpoint_id}/send-example/": {
45764590
"post": {
4577-
"description": "Send an example message for event",
4591+
"description": "Send an example message for an event",
4592+
"operationId": "v1.endpoint.send-example",
45784593
"parameters": [
4594+
{
4595+
"in": "path",
4596+
"name": "app_id",
4597+
"required": true,
4598+
"schema": {
4599+
"example": "unique-app-identifier",
4600+
"maxLength": 256,
4601+
"minLength": 1,
4602+
"pattern": "^[a-zA-Z0-9\\-_.]+$",
4603+
"type": "string"
4604+
},
4605+
"style": "simple"
4606+
},
4607+
{
4608+
"in": "path",
4609+
"name": "endpoint_id",
4610+
"required": true,
4611+
"schema": {
4612+
"example": "unique-ep-identifier",
4613+
"maxLength": 256,
4614+
"minLength": 1,
4615+
"pattern": "^[a-zA-Z0-9\\-_.]+$",
4616+
"type": "string"
4617+
},
4618+
"style": "simple"
4619+
},
45794620
{
45804621
"description": "The request's idempotency key",
45814622
"in": "header",
@@ -4586,11 +4627,89 @@
45864627
"style": "simple"
45874628
}
45884629
],
4630+
"requestBody": {
4631+
"content": {
4632+
"application/json": {
4633+
"schema": {
4634+
"$ref": "#/components/schemas/EventExampleIn"
4635+
}
4636+
}
4637+
},
4638+
"required": true
4639+
},
45894640
"responses": {
4590-
"204": {
4591-
"description": "no content"
4641+
"200": {
4642+
"content": {
4643+
"application/json": {
4644+
"schema": {
4645+
"$ref": "#/components/schemas/MessageOut"
4646+
}
4647+
}
4648+
},
4649+
"description": ""
4650+
},
4651+
"401": {
4652+
"content": {
4653+
"application/json": {
4654+
"schema": {
4655+
"$ref": "#/components/schemas/HttpErrorOut"
4656+
}
4657+
}
4658+
},
4659+
"description": "Unauthorized"
4660+
},
4661+
"403": {
4662+
"content": {
4663+
"application/json": {
4664+
"schema": {
4665+
"$ref": "#/components/schemas/HttpErrorOut"
4666+
}
4667+
}
4668+
},
4669+
"description": "Forbidden"
4670+
},
4671+
"404": {
4672+
"content": {
4673+
"application/json": {
4674+
"schema": {
4675+
"$ref": "#/components/schemas/HttpErrorOut"
4676+
}
4677+
}
4678+
},
4679+
"description": "Not Found"
4680+
},
4681+
"409": {
4682+
"content": {
4683+
"application/json": {
4684+
"schema": {
4685+
"$ref": "#/components/schemas/HttpErrorOut"
4686+
}
4687+
}
4688+
},
4689+
"description": "Conflict"
4690+
},
4691+
"422": {
4692+
"content": {
4693+
"application/json": {
4694+
"schema": {
4695+
"$ref": "#/components/schemas/HTTPValidationError"
4696+
}
4697+
}
4698+
},
4699+
"description": "Validation Error"
4700+
},
4701+
"429": {
4702+
"content": {
4703+
"application/json": {
4704+
"schema": {
4705+
"$ref": "#/components/schemas/HttpErrorOut"
4706+
}
4707+
}
4708+
},
4709+
"description": "Too Many Requests"
45924710
}
45934711
},
4712+
"summary": "Send Event Type Example Message",
45944713
"tags": [
45954714
"Endpoint"
45964715
]

server/svix-server/src/db/models/eventtype.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,21 @@ pub fn schema_example() -> serde_json::Value {
102102
pub struct Schema(HashMap<String, Json>);
103103
json_wrapper!(Schema);
104104

105+
impl Schema {
106+
pub fn example(&self) -> Option<&serde_json::Value> {
107+
self.0
108+
.get("1")
109+
.and_then(|version| match version {
110+
serde_json::Value::Object(obj) => obj.get("examples"),
111+
_ => None,
112+
})
113+
.and_then(|examples| match examples {
114+
serde_json::Value::Array(arr) => arr.iter().next(),
115+
_ => None,
116+
})
117+
}
118+
}
119+
105120
impl JsonSchema for Schema {
106121
fn schema_name() -> String {
107122
stringify!(Schema).to_string()

server/svix-server/src/queue/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,18 +94,21 @@ impl MessageTask {
9494
pub struct MessageTaskBatch {
9595
pub msg_id: MessageId,
9696
pub app_id: ApplicationId,
97+
pub force_endpoint: Option<EndpointId>,
9798
pub trigger_type: MessageAttemptTriggerType,
9899
}
99100

100101
impl MessageTaskBatch {
101102
pub fn new_task(
102103
msg_id: MessageId,
103104
app_id: ApplicationId,
105+
force_endpoint: Option<EndpointId>,
104106
trigger_type: MessageAttemptTriggerType,
105107
) -> QueueTask {
106108
QueueTask::MessageBatch(Self {
107109
msg_id,
108110
app_id,
111+
force_endpoint,
109112
trigger_type,
110113
})
111114
}

server/svix-server/src/v1/endpoints/endpoint/mod.rs

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,20 @@ use crate::{
1212
permissions,
1313
types::{
1414
metadata::Metadata, BaseId, EndpointId, EndpointSecretInternal, EndpointUid,
15-
EventChannelSet, EventTypeNameSet, MessageEndpointId, MessageStatus,
15+
EventChannelSet, EventTypeName, EventTypeNameSet, MessageEndpointId, MessageStatus,
1616
},
1717
},
1818
ctx,
19-
db::models::messagedestination,
19+
db::models::{eventtype, messagedestination},
2020
error::{self, HttpError},
2121
v1::utils::{
22-
api_not_implemented, openapi_desc, openapi_tag,
22+
openapi_tag,
2323
patch::{
2424
patch_field_non_nullable, patch_field_nullable, UnrequiredField,
2525
UnrequiredNullableField,
2626
},
2727
validate_no_control_characters, validate_no_control_characters_unrequired,
28-
validation_error, ApplicationEndpointPath, ModelIn,
28+
validation_error, ApplicationEndpointPath, ModelIn, ValidatedJson,
2929
},
3030
AppState,
3131
};
@@ -53,6 +53,8 @@ use crate::db::models::endpoint;
5353

5454
use self::secrets::generate_secret;
5555

56+
use super::message::{create_message_inner, MessageIn, MessageOut, RawPayload};
57+
5658
pub fn validate_event_types_ids(
5759
event_types_ids: &EventTypeNameSet,
5860
) -> std::result::Result<(), ValidationError> {
@@ -730,7 +732,81 @@ async fn endpoint_stats(
730732
}))
731733
}
732734

733-
const SEND_EXAMPLE_DESCRIPTION: &str = "Send an example message for event";
735+
#[derive(Deserialize, JsonSchema, Validate)]
736+
#[serde(rename_all = "camelCase")]
737+
struct EventExampleIn {
738+
event_type: EventTypeName,
739+
}
740+
741+
const SVIX_PING_EVENT_TYPE_NAME: &str = "svix.ping";
742+
const SVIX_PING_EVENT_TYPE_PAYLOAD: &str = r#"{"success": true}"#;
743+
744+
/// Send an example message for an event
745+
#[aide_annotate(
746+
op_id = "v1.endpoint.send-example",
747+
op_summary = "Send Event Type Example Message"
748+
)]
749+
async fn send_example(
750+
state: State<AppState>,
751+
Path(ApplicationEndpointPath { endpoint_id, .. }): Path<ApplicationEndpointPath>,
752+
permissions::OrganizationWithApplication { app }: permissions::OrganizationWithApplication,
753+
ValidatedJson(data): ValidatedJson<EventExampleIn>,
754+
) -> error::Result<Json<MessageOut>> {
755+
let State(AppState {
756+
ref db,
757+
queue_tx,
758+
cache,
759+
..
760+
}) = state;
761+
762+
let endpoint = ctx!(
763+
endpoint::Entity::secure_find_by_id_or_uid(app.id.clone(), endpoint_id)
764+
.one(db)
765+
.await
766+
)?
767+
.ok_or_else(|| HttpError::not_found(None, None))?;
768+
769+
let example = if data.event_type == EventTypeName(SVIX_PING_EVENT_TYPE_NAME.to_owned()) {
770+
SVIX_PING_EVENT_TYPE_PAYLOAD.to_string()
771+
} else {
772+
let event_type = ctx!(
773+
eventtype::Entity::secure_find_by_name(app.org_id.clone(), data.event_type.clone())
774+
.one(db)
775+
.await
776+
)?
777+
.ok_or_else(|| HttpError::not_found(None, None))?;
778+
779+
let example = event_type.schemas.and_then(|schema| {
780+
schema
781+
.example()
782+
.and_then(|ex| serde_json::to_string(ex).ok())
783+
});
784+
785+
match example {
786+
Some(example) => example,
787+
None => {
788+
return Err(HttpError::bad_request(
789+
Some("invalid_scheme".to_owned()),
790+
Some("Unable to generate example message from event-type schema".to_owned()),
791+
)
792+
.into());
793+
}
794+
}
795+
};
796+
797+
let msg_in = MessageIn {
798+
channels: None,
799+
event_type: data.event_type,
800+
payload: RawPayload::from_string(example).unwrap(),
801+
uid: None,
802+
payload_retention_period: 90,
803+
};
804+
805+
let create_message =
806+
create_message_inner(db, queue_tx, cache, false, Some(endpoint.id), msg_in, app).await?;
807+
808+
Ok(Json(create_message))
809+
}
734810

735811
pub fn router() -> ApiRouter<AppState> {
736812
let tag = openapi_tag("Endpoint");
@@ -772,7 +848,7 @@ pub fn router() -> ApiRouter<AppState> {
772848
)
773849
.api_route_with(
774850
"/app/:app_id/endpoint/:endpoint_id/send-example/",
775-
post_with(api_not_implemented, openapi_desc(SEND_EXAMPLE_DESCRIPTION)),
851+
post_with(send_example, send_example_operation),
776852
&tag,
777853
)
778854
.api_route_with(

server/svix-server/src/v1/endpoints/message.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,19 @@
33

44
use crate::{
55
core::{
6+
cache::Cache,
67
message_app::CreateMessageApp,
78
permissions,
89
types::{
9-
EventChannel, EventChannelSet, EventTypeName, EventTypeNameSet,
10+
EndpointId, EventChannel, EventChannelSet, EventTypeName, EventTypeNameSet,
1011
MessageAttemptTriggerType, MessageId, MessageUid,
1112
},
1213
},
13-
ctx, err_generic,
14+
ctx,
15+
db::models::application,
16+
err_generic,
1417
error::{HttpError, Result},
15-
queue::MessageTaskBatch,
18+
queue::{MessageTaskBatch, TaskQueueProducer},
1619
v1::utils::{
1720
apply_pagination_desc, iterator_from_before_or_after, openapi_tag, validation_error,
1821
ApplicationMsgPath, EventTypesQueryParams, JsonStatus, ListResponse, ModelIn, ModelOut,
@@ -305,6 +308,20 @@ async fn create_message(
305308
permissions::OrganizationWithApplication { app }: permissions::OrganizationWithApplication,
306309
ValidatedJson(data): ValidatedJson<MessageIn>,
307310
) -> Result<JsonStatus<202, MessageOut>> {
311+
Ok(JsonStatus(
312+
create_message_inner(db, queue_tx, cache, with_content, None, data, app).await?,
313+
))
314+
}
315+
316+
pub(crate) async fn create_message_inner(
317+
db: &DatabaseConnection,
318+
queue_tx: TaskQueueProducer,
319+
cache: Cache,
320+
with_content: bool,
321+
force_endpoint: Option<EndpointId>,
322+
data: MessageIn,
323+
app: application::Model,
324+
) -> Result<MessageOut> {
308325
let create_message_app = CreateMessageApp::layered_fetch(
309326
&cache,
310327
db,
@@ -334,6 +351,7 @@ async fn create_message(
334351
MessageTaskBatch::new_task(
335352
msg.id.clone(),
336353
app.id.clone(),
354+
force_endpoint,
337355
MessageAttemptTriggerType::Scheduled,
338356
),
339357
None,
@@ -347,7 +365,7 @@ async fn create_message(
347365
MessageOut::without_payload(msg)
348366
};
349367

350-
Ok(JsonStatus(msg_out))
368+
Ok(msg_out)
351369
}
352370

353371
#[derive(Debug, Deserialize, Validate, JsonSchema)]

server/svix-server/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ async fn process_queue_task_inner(
748748
QueueTask::MessageBatch(task) => {
749749
let msg = ctx!(message::Entity::find_by_id(task.msg_id).one(db).await)?
750750
.ok_or_else(|| err_generic!("Unexpected: message doesn't exist"))?;
751-
(msg, None, None, task.trigger_type, 0)
751+
(msg, task.force_endpoint, None, task.trigger_type, 0)
752752
}
753753
};
754754

0 commit comments

Comments
 (0)