-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
65 lines (50 loc) · 1.78 KB
/
api.py
File metadata and controls
65 lines (50 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from devtools import pformat
from fastapi import APIRouter, Request
from app.auth import AuthContext
from app.client import ExtensionClient, InstallationClient
from app.config import settings
from app.schema import Event, EventResponse
router = APIRouter(prefix="/api/v1")
@router.post("/events/orders")
async def process_order(
extension_client: ExtensionClient,
installation_client: InstallationClient,
event: Event,
):
print(f"received event: {pformat(event, highlight=True)}")
task = await extension_client.start_task(event.task.id)
if task and task["parameters"]["accountId"] != settings.my_account_id:
await extension_client.update_task(
event.task.id,
{
"description": (
f"{task["description"]}<br/>"
"The task has been skipped by the extension"
),
}
)
await extension_client.complete_task(event.task.id)
return EventResponse.ok()
order = await installation_client.get_order(event.object.id)
print(order)
return EventResponse.cancel()
@router.post("/validation/orders")
async def validate_order():
pass
@router.get("/tasks/synchronize")
async def synchronize(request: Request):
print(request.headers)
@router.get("/management/whoami")
async def whoami(ctx: AuthContext, client: InstallationClient):
account = await client.get_account(ctx.account_id)
user = None if not ctx.user_id else await client.get_user(ctx.user_id)
token = None if not ctx.token_id else await client.get_token(ctx.token_id)
return {
"account": account,
"token": token,
"user": user,
}
@router.post("/cron/dojob")
async def cron_job(ctx: AuthContext):
print(ctx)
return EventResponse.ok()