-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschema_plugin_example.py
More file actions
87 lines (67 loc) · 2.33 KB
/
schema_plugin_example.py
File metadata and controls
87 lines (67 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""
Schema Validation Example
This example demonstrates WebSpark's schema validation capabilities:
- Using Schema for request body validation
- Field validation with different data types
- Automatic error responses for invalid data
"""
from webspark.contrib.plugins.schema import SchemaPlugin
from webspark.core import View, WebSpark, path
from webspark.http import Context
from webspark.utils import HTTPException
from webspark.utils.decorators import apply
from webspark.validation import Schema, fields
# Define a schema for user data
class UserSchema(Schema):
name = fields.StringField(required=True, max_length=100)
email = fields.EmailField(required=True)
age = fields.IntegerField(min_value=1, max_value=120)
is_active = fields.BooleanField(default=True)
# In-memory storage
users = []
next_id = 1
class UserView(View):
"""Handle user operations with schema validation."""
def handle_get(self, ctx: Context):
"""Return all users."""
ctx.json({"users": users})
@apply(
SchemaPlugin(UserSchema, prop="body"),
)
def handle_post(self, ctx: Context, body: dict):
"""Create a new user with validation."""
global next_id
# Create new user with validated data
new_user = {
"id": next_id,
"name": body["name"],
"age": body["age"],
"email": body["email"],
"is_active": body["is_active"],
}
users.append(new_user)
next_id += 1
ctx.json(new_user, status=201)
class UserDetailView(View):
"""Handle operations on a single user."""
def handle_get(self, ctx: Context):
"""Return a specific user by ID."""
user_id = int(ctx.path_params["id"])
user = next((user for user in users if user["id"] == user_id), None)
if not user:
raise HTTPException("User not found", status=404)
ctx.json(user)
# Create the app
app = WebSpark(debug=True)
# Add routes
app.add_paths(
[
path("/users", view=UserView.as_view()),
path("/users/:id", view=UserDetailView.as_view()),
]
)
if __name__ == "__main__":
# For development purposes, you can run this with a WSGI server like:
# gunicorn examples.schema_example:app
print("Schema Plugin Example")
print("Run with: gunicorn examples.schema_plugin_example:app")