-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_upload_example.py
More file actions
118 lines (97 loc) · 3.6 KB
/
file_upload_example.py
File metadata and controls
118 lines (97 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""
File Upload Example
This example demonstrates how to handle file uploads in WebSpark:
- Handling multipart form data
- Saving uploaded files
- Returning file information
"""
import os
import uuid
from webspark.core import View, WebSpark, path
from webspark.http import Context
from webspark.utils import HTTPException
# Directory to store uploaded files
UPLOAD_DIR = "/tmp/uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
class FileUploadView(View):
"""Handle file uploads."""
def handle_post(self, ctx: Context):
"""Handle file upload."""
# Check if request has files
if not ctx.files:
raise HTTPException("No files uploaded", status=400)
uploaded_files = []
# Process each uploaded file
for field_name, file_list in ctx.files.items():
if isinstance(file_list, list):
for file_info in file_list:
# Generate a unique filename
filename = f"{uuid.uuid4()}_{field_name}"
file_path = os.path.join(UPLOAD_DIR, filename)
# Save the file
with open(file_path, "wb") as f:
f.write(file_info["file"].read())
# Store file information
uploaded_files.append(
{
"field_name": field_name,
"original_name": file_info["filename"],
"saved_name": filename,
"content_type": file_info["content_type"],
}
)
else:
# Generate a unique filename
filename = f"{uuid.uuid4()}_{field_name}"
file_path = os.path.join(UPLOAD_DIR, filename)
# Save the file
with open(file_path, "wb") as f:
f.write(file_list["file"].read())
# Store file information
uploaded_files.append(
{
"field_name": field_name,
"original_name": file_list["filename"],
"saved_name": filename,
"content_type": file_list["content_type"],
}
)
ctx.json(
{
"message": f"Successfully uploaded {len(uploaded_files)} file(s)",
"files": uploaded_files,
},
status=201,
)
class FileListView(View):
"""List uploaded files."""
def handle_get(self, ctx: Context):
"""Return list of uploaded files."""
files = []
if os.path.exists(UPLOAD_DIR):
for filename in os.listdir(UPLOAD_DIR):
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.isfile(file_path):
stat = os.stat(file_path)
files.append(
{
"name": filename,
"size": stat.st_size,
"modified": stat.st_mtime,
}
)
ctx.json({"files": files})
# Create the app
app = WebSpark(debug=True)
# Add routes
app.add_paths(
[
path("/upload", view=FileUploadView.as_view()),
path("/files", view=FileListView.as_view()),
]
)
if __name__ == "__main__":
# For development purposes, you can run this with a WSGI server like:
# gunicorn examples.file_upload_example:app
print("File Upload Example")
print("Run with: gunicorn examples.file_upload_example:app")