1
+ import logging
1
2
import os
2
3
import time
3
4
from typing import Any , Dict , Union
4
5
6
+ from azure .cosmos import exceptions
5
7
from azure .cosmos .aio import ContainerProxy , CosmosClient
8
+ from azure .cosmos .partition_key import PartitionKey
6
9
from azure .identity .aio import AzureDeveloperCliCredential , ManagedIdentityCredential
7
10
from quart import Blueprint , current_app , jsonify , request
8
11
15
18
from decorators import authenticated
16
19
from error import error_response
17
20
21
+ logger = logging .getLogger ("scripts" )
22
+
18
23
chat_history_cosmosdb_bp = Blueprint ("chat_history_cosmos" , __name__ , static_folder = "static" )
19
24
20
25
26
+ def make_partition_key (entra_id , session_id = None ):
27
+ if entra_id and session_id :
28
+ # Need multihash for hierachical partitioning
29
+ return PartitionKey (path = ["/entra_id" , "/session_id" ], kind = "MultiHash" )
30
+ else :
31
+ return PartitionKey (path = "/entra_id" )
32
+
33
+
21
34
@chat_history_cosmosdb_bp .post ("/chat_history" )
22
35
@authenticated
23
36
async def post_chat_history (auth_claims : Dict [str , Any ]):
@@ -34,44 +47,69 @@ async def post_chat_history(auth_claims: Dict[str, Any]):
34
47
35
48
try :
36
49
request_json = await request .get_json ()
37
- id = request_json .get ("id" )
50
+ session_id = request_json .get ("id" )
38
51
answers = request_json .get ("answers" )
39
52
title = answers [0 ][0 ][:50 ] + "..." if len (answers [0 ][0 ]) > 50 else answers [0 ][0 ]
40
53
timestamp = int (time .time () * 1000 )
41
54
42
55
# Insert the session item:
43
- await container .upsert_item (
44
- {
45
- "id" : id ,
46
- "session_id" : id ,
47
- "entra_oid" : entra_oid ,
48
- "type" : "session" ,
49
- "title" : title ,
50
- "timestamp" : timestamp ,
51
- }
52
- )
53
-
56
+ session_item = {
57
+ "id" : id ,
58
+ "session_id" : session_id ,
59
+ "entra_oid" : entra_oid ,
60
+ "type" : "session" ,
61
+ "title" : title ,
62
+ "timestamp" : timestamp ,
63
+ }
64
+
65
+ message_items = []
54
66
# Now insert a message item for each question/response pair:
55
67
for ind , message_pair in enumerate (zip (answers [::2 ], answers [1 ::2 ])):
56
- # TODO: Can I do a batch upsert?
57
- await container .upsert_item (
68
+ # The id: what if you delete a message and then add a new one? The id will be the same.
69
+ # If we had delete mechanism, and you deleted item 5 in a history, then item 6 would still hang around
70
+ # and youd have two of item 6.
71
+ # abc-0
72
+ # abc-1
73
+ # abc-2 <-- DELETE
74
+ # abc-3
75
+ # One approach would be to delete EVERYTHING, then upsert everything.
76
+ # Another approach would be to delete item plus everything after, then upsert everything after.
77
+ # Or: Change the frontend?
78
+ # We can do this first, and change the frontend after
79
+ message_items .append (
58
80
{
59
- "id" : f"{ id } -{ ind } " ,
81
+ "id" : f"{ session_id } -{ ind } " ,
60
82
"session_id" : id ,
61
83
"entra_oid" : entra_oid ,
62
84
"type" : "message" ,
63
85
"question" : message_pair [0 ],
64
86
"response" : message_pair [1 ],
65
- "timestamp" : timestamp ,
87
+ "timestamp" : timestamp , # <-- This is the timestamp of the session, not the message
66
88
}
67
89
)
68
90
91
+ batch_operations = [("upsert" , tuple ([session_item ] + message_items ), {})]
92
+
93
+ try :
94
+ # Run that list of operations
95
+ batch_results = container .execute_item_batch (
96
+ batch_operations = batch_operations , partition_key = make_partition_key (entra_oid , session_id )
97
+ )
98
+ # Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
99
+ # one of the operations failed within your batch request.
100
+ print (f"\n Results for the batch operations: { batch_results } \n " )
101
+ except exceptions .CosmosBatchOperationError as e :
102
+ error_operation_index = e .error_index
103
+ error_operation_response = e .operation_responses [error_operation_index ]
104
+ error_operation = batch_operations [error_operation_index ]
105
+ logger .error (f"Batch operation failed: { error_operation_response } for operation { error_operation } " )
106
+ return jsonify ({"error" : "Batch operation failed" }), 400
69
107
return jsonify ({}), 201
70
108
except Exception as error :
71
109
return error_response (error , "/chat_history" )
72
110
73
111
74
- @chat_history_cosmosdb_bp .post ("/chat_history/items" )
112
+ @chat_history_cosmosdb_bp .get ("/chat_history/items" )
75
113
@authenticated
76
114
async def get_chat_history (auth_claims : Dict [str , Any ]):
77
115
if not current_app .config [CONFIG_CHAT_HISTORY_COSMOS_ENABLED ]:
@@ -86,14 +124,15 @@ async def get_chat_history(auth_claims: Dict[str, Any]):
86
124
return jsonify ({"error" : "User OID not found" }), 401
87
125
88
126
try :
89
- request_json = await request . get_json ()
90
- count = request_json . get ("count" , 20 )
91
- continuation_token = request_json .get ("continuation_token" )
127
+ # get the count and continuation token from the request URL
128
+ count = request . args . get ("count" , 10 )
129
+ continuation_token = request . args .get ("continuation_token" )
92
130
93
131
res = container .query_items (
132
+ # TODO: do we need distinct? per Mark's code - Mark says no!
94
133
query = "SELECT c.id, c.entra_oid, c.title, c.timestamp FROM c WHERE c.entra_oid = @entra_oid AND c.type = @type ORDER BY c.timestamp DESC" ,
95
134
parameters = [dict (name = "@entra_oid" , value = entra_oid ), dict (name = "@type" , value = "session" )],
96
- partition_key = entra_oid ,
135
+ partition_key = make_partition_key ( entra_oid ) ,
97
136
max_item_count = count ,
98
137
)
99
138
@@ -142,6 +181,14 @@ async def get_chat_history_session(auth_claims: Dict[str, Any], item_id: str):
142
181
return jsonify ({"error" : "User OID not found" }), 401
143
182
144
183
try :
184
+ res = container .query_items (
185
+ # TODO: do we need distinct? per Mark's code
186
+ query = "SELECT c.id, c.entra_oid, c.title, c.timestamp FROM c WHERE c.session_id = @session_id ORDER BY c.timestamp DESC" ,
187
+ parameters = [dict (name = "@entra_oid" , value = entra_oid ), dict (name = "@session_id" , value = item_id )],
188
+ partition_key = make_partition_key (entra_oid , item_id ),
189
+ # max_item_count=?
190
+ )
191
+
145
192
res = await container .read_item (item = item_id , partition_key = entra_oid )
146
193
return (
147
194
jsonify (
@@ -175,6 +222,8 @@ async def delete_chat_history_session(auth_claims: Dict[str, Any], item_id: str)
175
222
176
223
try :
177
224
await container .delete_item (item = item_id , partition_key = entra_oid )
225
+ # Delete session, and all the message items associated with it
226
+ # TODO: Delete all the message items as well
178
227
return jsonify ({}), 204
179
228
except Exception as error :
180
229
return error_response (error , f"/chat_history/items/{ item_id } " )
0 commit comments