4
4
5
5
from azure .cosmos .aio import ContainerProxy , CosmosClient
6
6
from azure .identity .aio import AzureDeveloperCliCredential , ManagedIdentityCredential
7
- from quart import Blueprint , current_app , jsonify , request
7
+ from quart import Blueprint , current_app , jsonify , make_response , request
8
8
9
9
from config import (
10
10
CONFIG_CHAT_HISTORY_COSMOS_ENABLED ,
11
11
CONFIG_COSMOS_HISTORY_CLIENT ,
12
12
CONFIG_COSMOS_HISTORY_CONTAINER ,
13
+ CONFIG_COSMOS_HISTORY_VERSION ,
13
14
CONFIG_CREDENTIAL ,
14
15
)
15
16
from decorators import authenticated
@@ -34,23 +35,50 @@ async def post_chat_history(auth_claims: Dict[str, Any]):
34
35
35
36
try :
36
37
request_json = await request .get_json ()
37
- id = request_json .get ("id" )
38
- answers = request_json .get ("answers" )
39
- title = answers [0 ][0 ][:50 ] + "..." if len (answers [0 ][0 ]) > 50 else answers [0 ][0 ]
38
+ session_id = request_json .get ("id" )
39
+ message_pairs = request_json .get ("answers" )
40
+ first_question = message_pairs [0 ][0 ]
41
+ title = first_question + "..." if len (first_question ) > 50 else first_question
40
42
timestamp = int (time .time () * 1000 )
41
43
42
- await container .upsert_item (
43
- {"id" : id , "entra_oid" : entra_oid , "title" : title , "answers" : answers , "timestamp" : timestamp }
44
- )
44
+ # Insert the session item:
45
+ session_item = {
46
+ "id" : session_id ,
47
+ "version" : current_app .config [CONFIG_COSMOS_HISTORY_VERSION ],
48
+ "session_id" : session_id ,
49
+ "entra_oid" : entra_oid ,
50
+ "type" : "session" ,
51
+ "title" : title ,
52
+ "timestamp" : timestamp ,
53
+ }
54
+
55
+ message_pair_items = []
56
+ # Now insert a message item for each question/response pair:
57
+ for ind , message_pair in enumerate (message_pairs ):
58
+ message_pair_items .append (
59
+ {
60
+ "id" : f"{ session_id } -{ ind } " ,
61
+ "version" : current_app .config [CONFIG_COSMOS_HISTORY_VERSION ],
62
+ "session_id" : session_id ,
63
+ "entra_oid" : entra_oid ,
64
+ "type" : "message_pair" ,
65
+ "question" : message_pair [0 ],
66
+ "response" : message_pair [1 ],
67
+ }
68
+ )
45
69
70
+ batch_operations = [("upsert" , (session_item ,))] + [
71
+ ("upsert" , (message_pair_item ,)) for message_pair_item in message_pair_items
72
+ ]
73
+ await container .execute_item_batch (batch_operations = batch_operations , partition_key = [entra_oid , session_id ])
46
74
return jsonify ({}), 201
47
75
except Exception as error :
48
76
return error_response (error , "/chat_history" )
49
77
50
78
51
- @chat_history_cosmosdb_bp .post ("/chat_history/items " )
79
+ @chat_history_cosmosdb_bp .get ("/chat_history/sessions " )
52
80
@authenticated
53
- async def get_chat_history (auth_claims : Dict [str , Any ]):
81
+ async def get_chat_history_sessions (auth_claims : Dict [str , Any ]):
54
82
if not current_app .config [CONFIG_CHAT_HISTORY_COSMOS_ENABLED ]:
55
83
return jsonify ({"error" : "Chat history not enabled" }), 400
56
84
@@ -63,27 +91,26 @@ async def get_chat_history(auth_claims: Dict[str, Any]):
63
91
return jsonify ({"error" : "User OID not found" }), 401
64
92
65
93
try :
66
- request_json = await request .get_json ()
67
- count = request_json .get ("count" , 20 )
68
- continuation_token = request_json .get ("continuation_token" )
94
+ count = int (request .args .get ("count" , 10 ))
95
+ continuation_token = request .args .get ("continuation_token" )
69
96
70
97
res = container .query_items (
71
- query = "SELECT c.id, c.entra_oid, c.title, c.timestamp FROM c WHERE c.entra_oid = @entra_oid ORDER BY c.timestamp DESC" ,
72
- parameters = [dict (name = "@entra_oid" , value = entra_oid )],
98
+ query = "SELECT c.id, c.entra_oid, c.title, c.timestamp FROM c WHERE c.entra_oid = @entra_oid AND c.type = @type ORDER BY c.timestamp DESC" ,
99
+ parameters = [dict (name = "@entra_oid" , value = entra_oid ), dict (name = "@type" , value = "session" )],
100
+ partition_key = [entra_oid ],
73
101
max_item_count = count ,
74
102
)
75
103
76
- # set the continuation token for the next page
77
104
pager = res .by_page (continuation_token )
78
105
79
106
# Get the first page, and the continuation token
107
+ sessions = []
80
108
try :
81
109
page = await pager .__anext__ ()
82
110
continuation_token = pager .continuation_token # type: ignore
83
111
84
- items = []
85
112
async for item in page :
86
- items .append (
113
+ sessions .append (
87
114
{
88
115
"id" : item .get ("id" ),
89
116
"entra_oid" : item .get ("entra_oid" ),
@@ -94,18 +121,17 @@ async def get_chat_history(auth_claims: Dict[str, Any]):
94
121
95
122
# If there are no more pages, StopAsyncIteration is raised
96
123
except StopAsyncIteration :
97
- items = []
98
124
continuation_token = None
99
125
100
- return jsonify ({"items " : items , "continuation_token" : continuation_token }), 200
126
+ return jsonify ({"sessions " : sessions , "continuation_token" : continuation_token }), 200
101
127
102
128
except Exception as error :
103
- return error_response (error , "/chat_history/items " )
129
+ return error_response (error , "/chat_history/sessions " )
104
130
105
131
106
- @chat_history_cosmosdb_bp .get ("/chat_history/items/<item_id >" )
132
+ @chat_history_cosmosdb_bp .get ("/chat_history/sessions/<session_id >" )
107
133
@authenticated
108
- async def get_chat_history_session (auth_claims : Dict [str , Any ], item_id : str ):
134
+ async def get_chat_history_session (auth_claims : Dict [str , Any ], session_id : str ):
109
135
if not current_app .config [CONFIG_CHAT_HISTORY_COSMOS_ENABLED ]:
110
136
return jsonify ({"error" : "Chat history not enabled" }), 400
111
137
@@ -118,26 +144,34 @@ async def get_chat_history_session(auth_claims: Dict[str, Any], item_id: str):
118
144
return jsonify ({"error" : "User OID not found" }), 401
119
145
120
146
try :
121
- res = await container .read_item (item = item_id , partition_key = entra_oid )
147
+ res = container .query_items (
148
+ query = "SELECT * FROM c WHERE c.session_id = @session_id AND c.type = @type" ,
149
+ parameters = [dict (name = "@session_id" , value = session_id ), dict (name = "@type" , value = "message_pair" )],
150
+ partition_key = [entra_oid , session_id ],
151
+ )
152
+
153
+ message_pairs = []
154
+ async for page in res .by_page ():
155
+ async for item in page :
156
+ message_pairs .append ([item ["question" ], item ["response" ]])
157
+
122
158
return (
123
159
jsonify (
124
160
{
125
- "id" : res .get ("id" ),
126
- "entra_oid" : res .get ("entra_oid" ),
127
- "title" : res .get ("title" , "untitled" ),
128
- "timestamp" : res .get ("timestamp" ),
129
- "answers" : res .get ("answers" , []),
161
+ "id" : session_id ,
162
+ "entra_oid" : entra_oid ,
163
+ "answers" : message_pairs ,
130
164
}
131
165
),
132
166
200 ,
133
167
)
134
168
except Exception as error :
135
- return error_response (error , f"/chat_history/items/ { item_id } " )
169
+ return error_response (error , f"/chat_history/sessions/ { session_id } " )
136
170
137
171
138
- @chat_history_cosmosdb_bp .delete ("/chat_history/items/<item_id >" )
172
+ @chat_history_cosmosdb_bp .delete ("/chat_history/sessions/<session_id >" )
139
173
@authenticated
140
- async def delete_chat_history_session (auth_claims : Dict [str , Any ], item_id : str ):
174
+ async def delete_chat_history_session (auth_claims : Dict [str , Any ], session_id : str ):
141
175
if not current_app .config [CONFIG_CHAT_HISTORY_COSMOS_ENABLED ]:
142
176
return jsonify ({"error" : "Chat history not enabled" }), 400
143
177
@@ -150,10 +184,22 @@ async def delete_chat_history_session(auth_claims: Dict[str, Any], item_id: str)
150
184
return jsonify ({"error" : "User OID not found" }), 401
151
185
152
186
try :
153
- await container .delete_item (item = item_id , partition_key = entra_oid )
154
- return jsonify ({}), 204
187
+ res = container .query_items (
188
+ query = "SELECT c.id FROM c WHERE c.session_id = @session_id" ,
189
+ parameters = [dict (name = "@session_id" , value = session_id )],
190
+ partition_key = [entra_oid , session_id ],
191
+ )
192
+
193
+ ids_to_delete = []
194
+ async for page in res .by_page ():
195
+ async for item in page :
196
+ ids_to_delete .append (item ["id" ])
197
+
198
+ batch_operations = [("delete" , (id ,)) for id in ids_to_delete ]
199
+ await container .execute_item_batch (batch_operations = batch_operations , partition_key = [entra_oid , session_id ])
200
+ return await make_response ("" , 204 )
155
201
except Exception as error :
156
- return error_response (error , f"/chat_history/items/ { item_id } " )
202
+ return error_response (error , f"/chat_history/sessions/ { session_id } " )
157
203
158
204
159
205
@chat_history_cosmosdb_bp .before_app_serving
@@ -183,6 +229,7 @@ async def setup_clients():
183
229
184
230
current_app .config [CONFIG_COSMOS_HISTORY_CLIENT ] = cosmos_client
185
231
current_app .config [CONFIG_COSMOS_HISTORY_CONTAINER ] = cosmos_container
232
+ current_app .config [CONFIG_COSMOS_HISTORY_VERSION ] = os .environ ["AZURE_CHAT_HISTORY_VERSION" ]
186
233
187
234
188
235
@chat_history_cosmosdb_bp .after_app_serving
0 commit comments