-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexternal_session_feedback.py
More file actions
114 lines (107 loc) · 4.76 KB
/
external_session_feedback.py
File metadata and controls
114 lines (107 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
from datetime import datetime
from imio.esign import logger
from imio.esign.utils import get_session_annotation
from imio.helpers.ws import verify_auth_token
from plone.restapi.deserializer import json_body
from plone.restapi.services import Service
class ExternalSessionFeedbackPost(Service):
def reply(self): # noqa C901
"""Handle the external session feedback.
Needs json body with:
* "app_session_id": int 1234560001, app_session_id
* "code": int some_code, feedback identification code
* "session_state": microservice session state
* "value": json dict contaaining sign URL or signer/refused emails
* "message": "some message", optional message with feedback
"""
if not self.authorized():
self.request.response.setStatus(403)
return {"message": "Unauthorized access"}
data = json_body(self.request)
app_session_id = data.get("app_session_id")
logger.info("External session feedback received: {}".format(data))
if not app_session_id:
self.request.response.setStatus(400)
return {"message": "app_session_id is required"}
code = int(data.get("code"))
if not code:
self.request.response.setStatus(400)
return {"message": "code is required"}
value = data.get("value") or {}
db_state = data.get("session_state")
try:
annot = get_session_annotation()
session_id = int(app_session_id[7:])
if session_id not in annot["sessions"]:
self.request.response.setStatus(400)
return {"message": "Session ID {} not found".format(session_id)}
session = annot["sessions"][session_id]
session_update = {"returns": session["returns"]}
session_update["returns"].append((code, db_state, data.get("value", ""), data.get("message", ""),
datetime.now()))
if code == 21:
# sign_session_confirmed
session_update["state"] = "to_sign"
if value and "sign_session_url" in value and not session["sign_url"]:
session_update["sign_url"] = value["sign_session_url"]
elif code == 22:
# one_signer_accepted
if value and "signed_users" in value:
session_update["signers"] = session["signers"]
for i, d in enumerate(session["signers"]):
if d["status"] in ("signed", "refused"):
continue
if d["email"] in value["signed_users"]:
session_update["signers"][i]["status"] = "signed"
elif code == 23:
# upload_success (files returned)
session_update["state"] = "returned"
elif code == 52:
# one_signer_refused
session_update["state"] = "refused"
if value and "user" in value:
session_update["signers"] = session["signers"]
for i, d in enumerate(session["signers"]):
if d["email"] == value["user"]:
session_update["signers"][i]["status"] = "refused"
break
elif code == 53:
# upload_failed
session_update["state"] = "signed"
elif code in (50, 40, 51, 41):
# seal_creation_error, seal_creation_not_available, sign_creation_error, sign_creation_not_available
session_update["state"] = "errored"
if session_update:
session.update(session_update)
session["last_update"] = datetime.now()
except Exception as e:
self.request.response.setStatus(500)
logger.error(str(e))
return {"message": str(e)}
return {"message": "Information correctly handled"}
""" microservice session state
to_create_session = "to_create_session"
session_creation_failed = "session_creation_failed"
to_sign = "to_sign"
refused = "refused"
to_upload = "to_upload"
to_notify_ged_upload = "to_notify_ged_upload"
completed = "completed"
"""
def authorized(self):
"""Check if the user is authorized to access this service."""
auth_header = self.request.getHeader("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return False
token = auth_header[7:] # len("Bearer ") == 7
if not token:
return False
return verify_auth_token(token, groups=["access_imio-apps-docs"])
"""
State:
to_create_session
to_sign
to_upload
refused
"""