-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy path_file_client.py
More file actions
431 lines (350 loc) · 15.3 KB
/
_file_client.py
File metadata and controls
431 lines (350 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
"""Implementation of FileClient."""
import json
from typing import BinaryIO, Dict, List
from nisystemlink.clients import core
from nisystemlink.clients.core._uplink._base_client import BaseClient
from nisystemlink.clients.core._uplink._file_like_response import (
file_like_response_handler,
)
from nisystemlink.clients.core._uplink._methods import (
delete,
get,
post,
response_handler,
)
from nisystemlink.clients.core.helpers import IteratorFileLike
from requests.models import Response
from uplink import Body, Field, params, Part, Path, Query, retry
from . import models
def _file_uri_response_handler(response: Response) -> str:
"""Response handler for File URI response. Extracts ID from URI."""
resp = response.json()
uri: str = resp["uri"]
# Split the uri by '/' and get the last part
parts = uri.split("/")
return parts[-1]
@retry(when=retry.when.status(429), stop=retry.stop.after_attempt(5))
class FileClient(BaseClient):
def __init__(self, configuration: core.HttpConfiguration | None = None):
"""Initialize an instance.
Args:
configuration: Defines the web server to connect to and information about
how to connect. If not provided, the
:class:`HttpConfigurationManager <nisystemlink.clients.core.HttpConfigurationManager>`
is used to obtain the configuration.
Raises:
ApiException: if unable to communicate with the File Service.
"""
if configuration is None:
configuration = core.HttpConfigurationManager.get_configuration()
super().__init__(configuration, "/nifile/v1/")
@get("")
def api_info(self) -> models.V1Operations:
"""Get information about available API operations.
Returns:
Information about available API operations.
Raises:
ApiException: if unable to communicate with the File Service.
"""
@get(
"service-groups/Default/files",
args=[
Query,
Query,
Query(name="orderBy"),
Query(name="orderByDescending"),
Query(name="id"),
],
)
def __get_files(
self,
skip: int = 0,
take: int = 0,
order_by: str | None = None,
order_by_descending: str | None = "false",
ids: str | None = None,
) -> models.FileQueryResponse:
"""Lists available files on the SystemLink File service.
Use the skip and take parameters to return paged responses.
The orderBy and orderByDescending fields can be used to manage sorting the list by metadata objects.
Args:
skip: How many files to skip in the result when paging. Defaults to 0.
take: How many files to return in the result, or 0 to use a default defined by the service.
Defaults to 0.
order_by: The name of the metadata key to sort by. Defaults to None.
order_by_descending: The elements in the list are sorted ascending if "false"
and descending if "true". Defaults to "false".
ids: Comma-separated list of file IDs to search by. Defaults to None.
Returns:
File Query Response
Raises:
ApiException: if unable to communicate with the File Service.
"""
def get_files(
self,
skip: int = 0,
take: int = 0,
order_by: models.FileQueryOrderBy | None = None,
order_by_descending: bool | None = False,
ids: List[str] | None = None,
) -> models.FileQueryResponse:
"""Lists available files on the SystemLink File service.
Use the skip and take parameters to return paged responses.
The orderBy and orderByDescending fields can be used to manage sorting the list by metadata objects.
Args:
skip: How many files to skip in the result when paging. Defaults to 0.
take: How many files to return in the result, or 0 to use a default defined by the service.
Defaults to 0.
order_by: The name of the metadata key to sort by. Defaults to None.
order_by_descending: The elements in the list are sorted ascending if False
and descending if True. Defaults to False.
ids: List of file IDs to search by. Defaults to None.
Returns:
File Query Response
Raises:
ApiException: if unable to communicate with the File Service.
"""
# Uplink does not support enum serializing into str
# workaround as the service expects lower case `true` and `false`
# uplink serializes bools to `True` and `False`
order_by_str = order_by.value if order_by is not None else None
order_by_desc_str = "true" if order_by_descending else "false"
if ids:
ids_str = ",".join(ids)
else:
ids_str = ""
resp = self.__get_files(
skip=skip,
take=take,
order_by=order_by_str,
order_by_descending=order_by_desc_str,
ids=ids_str,
)
return resp
@post("service-groups/Default/query-files-linq")
def query_files_linq(
self, query: models.FileLinqQueryRequest
) -> models.FileLinqQueryResponse:
"""Queries file using LINQ filters.
Args:
query: The LINQ query request containing the following parameters:
- **filter** (:class:`str`, optional): The filter criteria for files. Consists of a
string of queries composed using AND/OR operators. String values and date strings
need to be enclosed in double quotes. Parentheses can be used around filters to
better define the order of operations.
Filter syntax: ``[property name][operator][operand] and [property name][operator][operand]``
Operators:
- Equals: ``x = y``
- Not equal: ``x != y``
- Greater than: ``x > y``
- Greater than or equal: ``x >= y``
- Less than: ``x < y``
- Less than or equal: ``x <= y``
- Logical AND: ``x and y`` or ``x && y``
- Logical OR: ``x or y`` or ``x || y``
- Contains: ``x.Contains(y)`` — checks if a list contains an element
- Not Contains: ``!x.Contains(y)`` — checks if a list does not contain an element
Valid file properties for filtering:
- ``created``: ISO timestamp string (e.g. ``"2023-01-01T08:00:00.000Z"``)
- ``updated``: ISO timestamp string (e.g. ``"2023-01-01T08:00:00.000Z"``)
- ``extension``: File extension (e.g. ``png``, ``txt``, ``pdf``)
- ``size``: File size in bytes (e.g. ``1024``)
- ``userId``: User ID that created the file (e.g. ``"8abc4b87-07d4-4f84-b54f-48eec89b07d4"``)
- ``properties``: File properties (e.g. ``properties['x'] = 'y'``)
- ``workspace``: The workspace of the files (e.g. ``"88974b87-07d4-4f84-b54f-48eec89b11ed"``)
Example::
'(name = "myfile.txt" OR extension = "png")'
' and (DateTime(created) >'
' DateTime.parse("2023-01-01T08:00:00.000Z"))'
' and (size < 1024)'
- **order_by** (:class:`~nisystemlink.clients.file.models.FileLinqQueryOrderBy`, optional):
The file property to order results by. One of ``created``, ``updated``,
``extension``, ``size``, or ``workspace``. Defaults to ``updated``.
- **order_by_descending** (:class:`bool`, optional): Whether to return the files in
descending order. Defaults to ``True``.
- **skip** (:class:`int`, optional): How many files to skip in the result when paging.
For example, a list of 100 files with a skip value of 50 will return entries
starting from the 51st file. Defaults to ``0``.
- **take** (:class:`int`, optional): Maximum number of files to return.
Defaults to ``1000``.
Returns:
File Query Response
Raises:
ApiException: if unable to communicate with the File Service.
"""
...
@post("service-groups/Default/search-files")
def search_files(
self, request: models.SearchFilesRequest
) -> models.SearchFilesResponse:
"""Search for files based on filter criteria.
Note:
This endpoint requires Elasticsearch to be configured in the SystemLink cluster.
If Elasticsearch is not configured, this method will fail with an ApiException.
For deployments without Elasticsearch, use `query_files_linq()` instead.
You can call `api_info()` to check if the `search_files` operation is available
in your deployment.
Args:
request: The search request containing filter, pagination, and sorting parameters.
Returns:
SearchFilesResponse: Response containing matching files and total count.
Raises:
ApiException: if unable to communicate with the File Service or if Elasticsearch
is not configured in the cluster.
"""
...
@params({"force": True}) # type: ignore
@delete("service-groups/Default/files/{id}", args=[Path])
def delete_file(self, id: str) -> None:
"""Deletes the file indicated by the `file_id`.
Args:
id: The ID of the file.
Raises:
ApiException: if unable to communicate with the File Service.
"""
@params({"force": True}) # type: ignore
@post("service-groups/Default/delete-files", args=[Field])
def delete_files(self, ids: List[str]) -> None:
"""Delete multiple files.
Args:
ids: List of unique IDs of Files.
Raises:
ApiException: if unable to communicate with the File Service.
"""
@params({"inline": True}) # type: ignore
@response_handler(file_like_response_handler)
@get("service-groups/Default/files/{id}/data", args=[Path])
def download_file(self, id: str) -> IteratorFileLike:
"""Downloads a file from the SystemLink File service.
Args:
id: The ID of the file.
Yields:
A file-like object for reading the exported data.
Raises:
ApiException: if unable to communicate with the File Service.
"""
@response_handler(_file_uri_response_handler)
@post("service-groups/Default/upload-files")
def __upload_file(
self,
file: Part,
metadata: Part = None,
id: Part = None,
workspace: Query = None,
) -> str:
"""Uploads a file using multipart/form-data headers to send the file payload in the HTTP body.
Args:
file: The file to upload.
metadata: JSON Dictionary with key/value pairs
id: Specify an unique (among all file) 24-digit Hex string ID of the file once it is uploaded.
Defaults to None.
workspace: The id of the workspace the file belongs to. Defaults to None.
Returns:
ID of uploaded file.
Raises:
ApiException: if unable to communicate with the File Service.
"""
def upload_file(
self,
file: BinaryIO,
metadata: Dict[str, str] | None = None,
id: str | None = None,
workspace: str | None = None,
) -> str:
"""Uploads a file to the File Service.
Args:
file: The file to upload.
metadata: File Metadata as dictionary.
id: Specify an unique (among all file) 24-digit Hex string ID of the file once it is uploaded.
Defaults to None.
workspace: The id of the workspace the file belongs to. Defaults to None.
Returns:
ID of uploaded file.
Raises:
ApiException: if unable to communicate with the File Service.
"""
if metadata:
metadata_str = json.dumps(metadata)
else:
metadata_str = None
file_id = self.__upload_file(
file=file,
metadata=metadata_str,
id=id,
workspace=workspace,
)
return file_id
@post("service-groups/Default/files/{id}/update-metadata", args=[Body, Path])
def update_metadata(self, metadata: models.UpdateMetadataRequest, id: str) -> None:
"""Updates an existing file's metadata with the specified metadata properties.
Args:
metadata: File's metadata and options for updating it.
id: ID of the file to update Metadata.
Raises:
ApiException: if unable to communicate with the File Service.
"""
@post(
"service-groups/Default/upload-sessions/start", args=[Query(name="workspace")]
)
def start_upload_session(
self, workspace: str | None = None
) -> models.UploadSessionStartResponse:
"""Start an upload session for uploading a file in chunks.
Args:
workspace: The id of the workspace the file belongs to. Defaults to None.
Returns:
Upload session information including the session ID.
Raises:
ApiException: if unable to communicate with the File Service.
"""
@post(
"service-groups/Default/upload-sessions/append",
args=[
Query(name="sessionId"),
Query(name="chunk"),
Part(name="file"),
Query(name="close"),
],
)
@response_handler(lambda response: None)
def append_to_upload_session(
self,
session_id: str,
chunk_index: int,
chunk: BinaryIO,
close: bool = False,
) -> None:
"""Append a chunk to an upload session.
The chunk needs to be 10485760 bytes (10 MB), unless the close parameter is true,
which means that it is the last chunk of the file content. The chunks can be uploaded
concurrently, as long as all chunk uploads are completed before finalizing.
Args:
session_id: The id of the upload session.
chunk_index: The 1-based index of the chunk to be uploaded.
file: The chunk data to upload.
close: Set the current chunk as the last chunk to be uploaded. Defaults to False.
Raises:
ApiException: if unable to communicate with the File Service.
"""
@response_handler(_file_uri_response_handler)
@post(
"service-groups/Default/upload-sessions/finish",
args=[Query(name="sessionId"), Field(name="name"), Field(name="properties")],
)
def finish_upload_session(
self,
session_id: str,
name: str,
properties: Dict[str, str],
) -> str:
"""Finish an upload session and make the file visible in SystemLink.
This will trigger file events, such as routines.
Args:
session_id: The id of the upload session.
name: The name of the file.
properties: The properties of the file.
Returns:
ID of the uploaded file.
Raises:
ApiException: if unable to communicate with the File Service.
"""