-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathdatasource.py
More file actions
1908 lines (1575 loc) · 74.1 KB
/
datasource.py
File metadata and controls
1908 lines (1575 loc) · 74.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import base64
import datetime
import json
import logging
import os.path
import tempfile
import threading
import time
import uuid
import webbrowser
from contextlib import contextmanager
from dataclasses import dataclass, field
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, ContextManager, Dict, List, Literal, Optional, Set, Tuple, Union
from dataclasses_json import DataClassJsonMixin, LetterCase, config
from pathvalidate import sanitize_filepath
import dagshub.common.config
from dagshub.common import rich_console
from dagshub.common.analytics import send_analytics_event
from dagshub.common.environment import is_mlflow_installed
from dagshub.common.helpers import http_request, log_message, prompt_user
from dagshub.common.rich_util import get_rich_progress
from dagshub.common.util import (
deprecated,
exclude_if_none,
lazy_load,
multi_urljoin,
to_timestamp,
)
from dagshub.data_engine.annotation.importer import AnnotationImporter, AnnotationLocation, AnnotationType
from dagshub.data_engine.client.models import (
DatasetResult,
MetadataFieldSchema,
PreprocessingStatus,
ScanOption,
)
from dagshub.data_engine.dtypes import MetadataFieldType
from dagshub.data_engine.model.datapoint import Datapoint
from dagshub.data_engine.model.datasource_state import DatasourceState
from dagshub.data_engine.model.errors import (
DatasetFieldComparisonError,
DatasetNotFoundError,
FieldNotFoundError,
WrongOperatorError,
WrongOrderError,
)
from dagshub.data_engine.model.metadata import (
precalculate_metadata_info,
run_preupload_transforms,
validate_uploading_metadata,
)
from dagshub.common.adaptive_batching import AdaptiveBatcher
from dagshub.data_engine.model.metadata.util import is_retryable_metadata_upload_error
from dagshub.data_engine.model.metadata.dtypes import DatapointMetadataUpdateEntry
from dagshub.data_engine.model.metadata.transforms import DatasourceFieldInfo, _add_metadata
from dagshub.data_engine.model.metadata_field_builder import MetadataFieldBuilder
from dagshub.data_engine.model.query import QueryFilterTree
from dagshub.data_engine.model.schema_util import (
default_metadata_type_value,
)
if TYPE_CHECKING:
import cloudpickle
import fiftyone as fo
import mlflow
import mlflow.entities
import mlflow.exceptions as mlflow_exceptions
import ngrok
import pandas
from dagshub.data_engine.model.query_result import QueryResult
else:
plugin_server_module = lazy_load("dagshub.data_engine.voxel_plugin_server.server")
fo = lazy_load("fiftyone")
mlflow = lazy_load("mlflow")
mlflow_exceptions = lazy_load("mlflow.exceptions")
pandas = lazy_load("pandas")
ngrok = lazy_load("ngrok")
cloudpickle = lazy_load("cloudpickle")
logger = logging.getLogger(__name__)
LS_ORCHESTRATOR_URL = "http://127.0.0.1"
MLFLOW_DATASOURCE_TAG_NAME = "dagshub.datasets.datasource_id"
MLFLOW_DATASET_TAG_NAME = "dagshub.datasets.dataset_id"
@dataclass
class DatapointDeleteMetadataEntry(DataClassJsonMixin):
datapointId: str
key: str
@dataclass
class DatapointDeleteEntry(DataClassJsonMixin):
datapointId: str
@dataclass
class Field:
"""
Class used to define custom fields for use in \
:func:`Datasource.select() <dagshub.data_engine.model.datasource.Datasource.select>` or in filtering.
Example of filtering on old data from a field::
t = datetime.now() - timedelta(days=2)
q = ds[Field("size", as_of=t)] > 500
q.all()
"""
field_name: str
"""The database field where the values are stored. In other words, where to get the values from."""
as_of: Optional[Union[float, datetime.datetime]] = None
"""
If defined, the data in this field would be shown as of this moment in time.
Accepts either a datetime object, or a UTC timestamp.
"""
alias: Optional[str] = None
"""
How the returned custom data field should be named.
Useful when you're comparing the same field at multiple points in time::
yesterday = datetime.now() - timedelta(days=1)
ds.select(
Field("value", alias="value_today"),
Field("value", as_of=yesterday, alias="value_yesterday")
).all()
"""
@property
def as_of_timestamp(self) -> Optional[int]:
if self.as_of is None:
return None
return to_timestamp(self.as_of)
def to_dict(self, ds: "Datasource") -> Dict[str, Any]:
if not ds.has_field(self.field_name):
raise FieldNotFoundError(self.field_name)
res_dict: Dict[str, Union[str, int, None]] = {"name": self.field_name}
if self.as_of is not None:
res_dict["asOf"] = self.as_of_timestamp
if self.alias:
res_dict["alias"] = self.alias
return res_dict
_metadata_contexts: Dict[Union[int, str], "MetadataContextManager"] = {}
class Datasource:
def __init__(
self,
datasource: "DatasourceState",
query: Optional["DatasourceQuery"] = None,
from_dataset: Optional["DatasetState"] = None,
):
self._source = datasource
if query is None:
query = DatasourceQuery()
self._query = query
# this ref marks if source is currently used in
# meta-data update 'with' block
self._explicit_update_ctx: Optional[MetadataContextManager] = None
self.assigned_dataset = from_dataset
self.ngrok_listener = None
@property
def has_explicit_context(self):
return self._explicit_update_ctx is not None
@property
def source(self) -> "DatasourceState":
return self._source
def clear_query(self, reset_to_dataset=True):
"""
Clear the attached query.
Args:
reset_to_dataset: If ``True`` and this Datasource was saved as a dataset, reset to the query in the dataset,
instead of clearing the query completely.
"""
if reset_to_dataset and self.assigned_dataset is not None and self.assigned_dataset.query is not None:
self._query = self.assigned_dataset.query.__deepcopy__()
else:
self._query = DatasourceQuery()
def __deepcopy__(self, memodict={}) -> "Datasource":
res = Datasource(self._source, self._query.__deepcopy__())
res.assigned_dataset = self.assigned_dataset
return res
def get_query(self) -> "DatasourceQuery":
return self._query
@property
def annotation_fields(self) -> List[str]:
"""Return all fields that have the annotation meta tag set"""
return [f.name for f in self.fields if f.is_annotation()]
@property
def document_fields(self) -> List[str]:
return [f.name for f in self.fields if f.is_document()]
def serialize_gql_query_input(self) -> Dict:
"""
Serialize the query of this Datasource for use in GraphQL querying (e.g. getting datapoints)
:meta private:
"""
return self._query.to_dict()
def _deserialize_from_gql_result(self, query_dict: Dict):
"""
Imports query information from ``query_dict``
"""
self._query = DatasourceQuery.from_dict(query_dict)
def load_from_dataset(
self, dataset_id: Optional[Union[str, int]] = None, dataset_name: Optional[str] = None, change_query=True
):
"""
Imports query information from a dataset with the specified id or name. Either of id or name could be specified
Args:
dataset_id: ID of the dataset
dataset_name: Name of the dataset
change_query: Whether to change the query of this object to the query in the dataset
:meta private:
"""
datasets = self.source.client.get_datasets(id=dataset_id, name=dataset_name)
if not datasets:
raise DatasetNotFoundError(self.source.repo, dataset_id, dataset_name)
dataset_state = DatasetState.from_gql_dataset_result(datasets[0])
self.load_from_dataset_state(dataset_state, change_query)
def load_from_dataset_state(self, dataset_state: "DatasetState", change_query=True):
"""
Imports query information from a :class:`~dagshub.data_engine.model.dataset_state.DatasetState`
Args:
dataset_state: State to load
change_query: If false, only assigns the dataset.
If true, also changes the query to be the query of the dataset
:meta private:
"""
if self.source.id != dataset_state.datasource_id:
raise RuntimeError(
"Dataset belongs to a different datasource "
f"(This datasource: {self.source.id}, Dataset's datasource: {dataset_state.datasource_id})"
)
self.assigned_dataset = dataset_state
if change_query:
self._query = dataset_state.query
def sample(self, start: Optional[int] = None, end: Optional[int] = None) -> "QueryResult":
if start is not None:
logger.warning("Starting slices is not implemented for now")
res = self._source.client.sample(self, end, include_metadata=True)
res._load_autoload_fields()
return res
def fetch(self, load_documents=True, load_annotations=True) -> "QueryResult":
"""
Executes the query and returns a :class:`.QueryResult` object containing returned datapoints.
If there's an active MLflow run, logs an artifact with information about the query to the run.
This function respects the limit set on the query with :func:`limit()`.
Args:
load_documents: Automatically download all document blob fields
load_annotations: Automatically download all annotation blob fields
"""
self._check_preprocess()
res = self._source.client.get_datapoints(self)
self._autolog_mlflow(res)
res._load_autoload_fields(documents=load_documents, annotations=load_annotations)
return res
def head(self, size=100, load_documents=True, load_annotations=True) -> "QueryResult":
"""
Executes the query and returns a :class:`.QueryResult` object containing first ``size`` datapoints
.. note::
This function is intended for quick checks and debugging your queries.
As a result of that, this function does not log an artifact to MLflow.
If you want to limit the number of datapoints returned by the query as part of the training workflow,
use :func:`limit()` instead. That will save the limit as part of the query.
Args:
size: how many datapoints to get. Default is 100
load_documents: Automatically download all document blob fields
load_annotations: Automatically download all annotation blob fields
"""
self._check_preprocess()
send_analytics_event("Client_DataEngine_DisplayTopResults", repo=self.source.repoApi)
res = self._source.client.head(self, size)
res._load_autoload_fields(documents=load_documents, annotations=load_annotations)
return res
def all(self, load_documents=True, load_annotations=True) -> "QueryResult":
"""
Executes the query and returns a :class:`.QueryResult` object containing **all** datapoints
If there's an active MLflow run, logs an artifact with information about the query to the run.
.. warning::
Unlike :func:`fetch()`, this function will override any limits set on the query.
If you have set any limits on the query with :func:`limit()`, use :func:`fetch()` instead.
Args:
load_documents: Automatically download all document blob fields
load_annotations: Automatically download all annotation blob fields
"""
ds = self
if self._query.limit:
log_message(
"Calling all() on a datasource with a limited query.\n"
"This will override the limiting and get ALL datapoints in the current query.\n"
"Use fetch() instead if you want to keep the datapoint limit.",
logger,
)
ds = self.limit(None)
return ds.fetch(load_documents=load_documents, load_annotations=load_annotations)
def select(self, *selected: Union[str, Field]) -> "Datasource":
"""
Select which fields should appear in the query result.
If you want to query older versions of metadata,
use :class:`Field` objects with ``as_of`` set to your desired time.
By default, only the defined fields are returned.
If you want to return all existing fields plus whatever additional fields you define,
add ``"*"`` into the arguments.
Args:
selected: Fields you want to select. Can be either of:
- Name of the field to select: ``"field"``.
- ``"*"`` to select all the fields in the datasource.
- :class:`Field` object.
Example::
t = datetime.now() - timedelta(hours=24)
q1 = ds.select("*", Field("size", as_of=t, alias="size_asof_24h_ago"))
q1.all()
"""
include_all = False
selects = []
for s in selected:
if isinstance(s, Field):
selects.append(s.to_dict(self))
else:
if s != "*":
selects.append({"name": s})
else:
include_all = True
if include_all:
aliases = set([s["alias"] for s in selects if "alias" in s])
for f in self.fields:
if f.name in aliases:
raise ValueError(
f"Alias {f.name} can't be used, because * was specified and "
f"a field with that name already exists"
)
selects.append({"name": f.name})
new_ds = self.__deepcopy__()
new_ds.get_query().select = selects
return new_ds
def as_of(self, time: Union[float, datetime.datetime]) -> "Datasource":
"""
Get a snapshot of the datasource's state as of ``time``.
Args:
time: At which point in time do you want to get data from.\
Either a UTC timestamp or a ``datetime`` object.
In the following example, you will get back datapoints that were created no later than yesterday AND \
had their size at this point bigger than 5 bytes::
t = datetime.now() - timedelta(hours=24)
q1 = (ds["size"] > 5).as_of(t)
q1.all()
.. note::
If used with :func:`select`, the ``as_of`` set on the fields takes precedence
over the global query ``as_of`` set here.
"""
new_ds = self.__deepcopy__()
new_ds._query.as_of = to_timestamp(time)
return new_ds
def with_time_zone(self, tz_val: str) -> "Datasource":
"""
A time zone offset string in the form of "+HH:mm" or "-HH:mm".
A metadata of type datetime is always stored in DB as a UTC time, when a query is done on this field
there are 3 options:
- Metadata was saved with a timezone, in which case it will be used.
- Metadata was saved without a timezone, in which case UTC will be used.
- with_time_zone specified a time zone and it will override whatever is in the database.
"""
new_ds = self.__deepcopy__()
new_ds._query.time_zone = tz_val
return new_ds
def order_by(self, *args: Union[str, Tuple[str, Union[bool, str]]]) -> "Datasource":
"""
Sort the query result by the specified fields.
Any previously set order will be overwritten.
Args:
Fields to sort by. Can be either of:
- Name of the field to sort by: ``"field"``.
- A tuple of ``(field_name, ascending)``: ``("field", True)``.
- A tuple of ``(field_name, "asc"|"desc")``: ``("field", "asc")``.
Examples::
ds.order_by("size").all() # Order by ascending size
ds.order_by(("date", "desc"), "size).all() # Order by descending date, then ascending size
"""
new_ds = self.__deepcopy__()
orders = []
for arg in args:
if isinstance(arg, str):
orders.append({"field": arg, "order": "ASC"})
else:
if len(arg) != 2:
raise RuntimeError(
f"Invalid sort argument {arg}, must be a tuple (<field>, 'asc'|'desc'|True|False)"
)
if isinstance(arg[1], bool):
order = "ASC" if arg[1] else "DESC"
elif isinstance(arg[1], str) and arg[1].upper() in ["ASC", "DESC"]:
order = arg[1].upper()
else:
raise RuntimeError(f"Invalid sort argument {arg}, second value must be 'asc'|'desc'|True|False")
orders.append({"field": arg[0], "order": order})
new_ds.get_query().order_by = orders
return new_ds
def limit(self, size: Optional[int]) -> "Datasource":
"""
Limit the number of datapoints returned by the query.
Use ``None`` to remove the limit.
This argument is only respected when using :func:`fetch()`.
Args:
size: Number of datapoints to return. If ``None``, no limit is applied and all datapoints will be fetched.
Example::
ds.limit(10).fetch()
"""
new_ds = self.__deepcopy__()
new_ds._query.limit = size
return new_ds
def _check_preprocess(self):
self.source.get_from_dagshub()
if (
self.source.preprocessing_status == PreprocessingStatus.IN_PROGRESS
or self.source.preprocessing_status == PreprocessingStatus.AUTO_SCAN_IN_PROGRESS
):
logger.warning(
f"Datasource {self.source.name} is currently in the progress of rescanning. "
f"Values might change if you requery later"
)
def metadata_field(self, field_name: str) -> MetadataFieldBuilder:
"""
Returns a builder for a metadata field.
The builder can be used to change properties of a field or create a new field altogether.
Note that fields get automatically created when you upload new metadata to the Data Engine,
so it's not necessary to create fields with this function.
Example of creating a new annotation field::
ds.metadata_field("annotation").set_type(dtypes.LabelStudioAnnotation).apply()
.. note::
New fields have to have their type defined using ``.set_type()`` before doing anything else
Example of marking an existing field as an annotation field::
ds.metadata_field("existing-field").set_annotation().apply()
Args:
field_name: Name of the field that you want to create/change
"""
return MetadataFieldBuilder(self, field_name)
def apply_field_changes(self, field_builders: List[MetadataFieldBuilder]):
"""
Applies one or multiple metadata field builders
that can be constructed using the :func:`metadata_field()` function.
"""
self.source.client.update_metadata_fields(self, [builder.schema for builder in field_builders])
self.source.get_from_dagshub()
@property
def implicit_update_context(self) -> "MetadataContextManager":
"""
Context that is used when updating metadata through ``dp[field] = value`` syntax, can be created on demand.
:meta private:
"""
key = self.source.id
if key not in _metadata_contexts:
_metadata_contexts[key] = MetadataContextManager(self)
return _metadata_contexts[key]
def upload_metadata_of_implicit_context(self):
"""
commit meta data changes done in dictionary assignment context
:meta private:
"""
try:
self._upload_metadata(self.implicit_update_context.get_metadata_entries())
finally:
self.implicit_update_context.clear()
def metadata_context(self) -> ContextManager["MetadataContextManager"]:
"""
Returns a metadata context, that you can upload metadata through using its
:func:`~MetadataContextManager.update_metadata()` function.
Once the context is exited, all metadata is uploaded in one batch::
with ds.metadata_context() as ctx:
ctx.update_metadata("file1", {"key1": True, "key2": "value"})
"""
# Need to have the context manager inside a wrapper to satisfy MyPy + PyCharm type hinter
@contextmanager
def func():
self.source.get_from_dagshub()
send_analytics_event("Client_DataEngine_addEnrichments", repo=self.source.repoApi)
ctx = MetadataContextManager(self)
self._explicit_update_ctx = ctx
yield ctx
try:
entries = ctx.get_metadata_entries() + self.implicit_update_context.get_metadata_entries()
self._upload_metadata(entries)
finally:
# Clear the implicit context because it can persist
self.implicit_update_context.clear()
# The explicit one created with with: can go away
self._explicit_update_ctx = None
return func()
def upload_metadata_from_file(
self, file_path, path_column: Optional[Union[str, int]] = None, ingest_on_server: bool = False
):
"""
Upload metadata from a file.
Args:
file_path: Path to the file with metadata. Allowed formats are CSV, Parquet, ZIP, GZ.
path_column: Column with the datapoints' paths. Can either be the name of the column, or its index.
If not specified, the first column is used.
ingest_on_server: Set to ``True`` to process the metadata asynchronously.
The file will be sent to our server and ingested into the datasource there.
Default is ``False``.
"""
send_analytics_event("Client_DataEngine_addEnrichmentsWithFile", repo=self.source.repoApi)
if ingest_on_server:
datasource_name = self.source.name
self._source.import_metadata_from_file(datasource_name, file_path, path_column)
else:
df = self._convert_file_to_df(file_path)
self.upload_metadata_from_dataframe(df, path_column, ingest_on_server)
def upload_metadata_from_dataframe(
self, df: "pandas.DataFrame", path_column: Optional[Union[str, int]] = None, ingest_on_server: bool = False
):
"""
Upload metadata from a pandas dataframe.
All columns are uploaded as metadata, and the path of every datapoint is taken from `path_column`.
Args:
df (`pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`_):
DataFrame with metadata
path_column: Column with the datapoints' paths. Can either be the name of the column, or its index.
If not specified, the first column is used.
ingest_on_server: Set to ``True`` to process the metadata asynchronously.
The file will be sent to our server and ingested into the datasource there.
Default is ``False``.
"""
self.source.get_from_dagshub()
send_analytics_event("Client_DataEngine_addEnrichmentsWithDataFrame", repo=self.source.repoApi)
if ingest_on_server:
self._remote_upload_metadata_from_dataframe(df, path_column)
else:
metadata = self._df_to_metadata(df, path_column, multivalue_fields=self._get_multivalue_fields())
self._upload_metadata(metadata)
def _remote_upload_metadata_from_dataframe(
self, df: "pandas.DataFrame", path_column: Optional[Union[str, int]] = None
):
datasource_name = self.source.name
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=True) as tmp:
file_path = tmp.name
df.to_parquet(file_path, index=False)
self._source.import_metadata_from_file(datasource_name, file_path, path_column)
def _get_multivalue_fields(self) -> Set[str]:
res = set()
for col in self.source.metadata_fields:
if col.multiple:
res.add(col.name)
return res
def _generate_metadata_cache_info(self) -> DatasourceFieldInfo:
return DatasourceFieldInfo(
multivalue_fields=self._get_multivalue_fields(),
field_value_types={f.name: f.valueType for f in self.fields},
document_fields=self.document_fields,
)
def _df_to_metadata(
self, df: "pandas.DataFrame", path_column: Optional[Union[str, int]] = None, multivalue_fields=set()
) -> List[DatapointMetadataUpdateEntry]:
if path_column is None:
path_column = df.columns[0]
elif isinstance(path_column, str):
if path_column not in df.columns:
raise ValueError(f"Column {path_column} does not exist in the dataframe")
elif isinstance(path_column, int):
path_column = df.columns[path_column]
# objects are actually mixed and not guaranteed to be string, but this should cover most use cases
if not pandas.api.types.is_string_dtype(df.dtypes[path_column]):
raise ValueError(f"Path column {path_column} must contain strings")
field_info = self._generate_metadata_cache_info()
res: List[DatapointMetadataUpdateEntry] = []
for _, row in df.iterrows():
datapoint = row[path_column]
for key, val in row.items():
if key == path_column:
continue
key = str(key)
_add_metadata(field_info, res, datapoint, key, val, is_pandas=True)
return res
def delete_source(self, force: bool = False):
"""
Delete the record of this datasource along with all datapoints.
.. warning::
This is a destructive operation! If you delete the datasource,
all the datapoints and metadata will be removed.
Args:
force: Skip the confirmation prompt
"""
prompt = (
f'You are about to delete datasource "{self.source.name}" for repo "{self.source.repo}"\n'
f"This will remove the datasource and ALL datapoints "
f"and metadata records associated with the source."
)
if not force:
user_response = prompt_user(prompt)
if not user_response:
print("Deletion cancelled")
return
self.source.client.delete_datasource(self)
def delete_dataset(self, force: bool = False):
"""
Deletes the dataset, if this object was created from a dataset
(e.g. from :func:`.datasets.get_dataset()`).
This doesn't delete the underlying datasource and its metadata, only deleting the dataset and its query.
If this datasource object wasn't created from a dataset, raises a ``ValueError``.
Args:
force: Skip the confirmation prompt
"""
if self.assigned_dataset is None:
raise ValueError("This datasource was not created from a dataset")
prompt = (
f'You are about to delete dataset "{self.assigned_dataset.dataset_name}" for repo "{self.source.repo}"\n'
f'The datasource "{self.source.name}" will still exist, but the dataset entry will be removed'
)
if not force:
user_response = prompt_user(prompt)
if not user_response:
print("Deletion cancelled")
return
self.source.client.delete_dataset(self.assigned_dataset.dataset_id)
def scan_source(self, options: Optional[List[ScanOption]] = None):
"""
This function fires a call to the backend to rescan the datapoints.
Call this function whenever you uploaded new files and want them to appear when querying the datasource,
or if you changed existing file contents and want their metadata to be updated.
DagsHub periodically rescans all datasources, this function is a way to make a scan happen as soon as possible.
Notes about automatically scanned metadata:
1. Only new datapoints (files) will be added.
If files were removed from the source, their metadata will still remain,
and they will still be returned from queries on the datasource.
An API to actively remove metadata will be available soon.
2. Some metadata fields will be automatically scanned and updated by DagsHub based on this scan -
the list of automatic metadata fields is growing frequently!
Args:
options: List of scanning options. If not sure, leave empty.
"""
logger.debug("Rescanning datasource")
self.source.client.scan_datasource(self, options=options)
def _upload_metadata(self, metadata_entries: List[DatapointMetadataUpdateEntry]):
precalculated_info = precalculate_metadata_info(self, metadata_entries)
validate_uploading_metadata(precalculated_info)
run_preupload_transforms(self, metadata_entries, precalculated_info)
batcher = AdaptiveBatcher(
is_retryable=is_retryable_metadata_upload_error,
progress_label="Uploading metadata",
)
batcher.run(metadata_entries, lambda batch: self.source.client.update_metadata(self, batch))
# Update the status from dagshub, so we get back the new metadata columns
self.source.get_from_dagshub()
def delete_metadata_from_datapoints(self, datapoints: List[Datapoint], fields: List[str]):
"""
Delete metadata from datapoints.
The deleted values can be accessed using versioned query with time set before the deletion
Args:
datapoints: datapoints to delete metadata from
fields: fields to delete
"""
metadata_entries = []
for d in datapoints:
for n in fields:
metadata_entries.append(DatapointDeleteMetadataEntry(datapointId=d.datapoint_id, key=n))
self.source.client.delete_metadata_for_datapoint(self, metadata_entries)
def delete_datapoints(self, datapoints: List[Datapoint], force: bool = False):
"""
Delete datapoints.
- These datapoints will no longer show up in queries.
- Does not delete the datapoint's file, only removing the data from the datasource.
- You can still query these datapoints and associated metadata with \
versioned queries whose time is before deletion time.
- You can re-add these datapoints to the datasource by uploading new metadata to it with, for example, \
:func:`Datasource.metadata_context <dagshub.data_engine.model.datasource.Datasource.metadata_context>`. \
This will create a new datapoint with new id and new metadata records.
- Datasource scanning will *not* add these datapoints back.
Args:
datapoints: list of datapoints objects to delete
force: Skip the confirmation prompt
"""
dps_str = "\n\t".join([""] + [d.path for d in datapoints])
prompt = (
f"You are about to delete the following datapoint(s): {dps_str}\n"
f"This will remove the datapoint and metadata from unversioned queries, "
f"but won't delete the underlying file."
)
if not force:
user_response = prompt_user(prompt)
if not user_response:
print("Deletion cancelled")
return
self.source.client.delete_datapoints(
self, [DatapointDeleteEntry(datapointId=d.datapoint_id) for d in datapoints]
)
def save_dataset(self, name: str) -> "Datasource":
"""
Save the dataset, which is a combination of datasource + query, on the backend.
That way you can persist and share your queries.
You can get the dataset back later by calling :func:`.datasets.get_dataset()`
Args:
name: Name of the dataset
Returns:
A datasource object with the dataset assigned to it
"""
send_analytics_event("Client_DataEngine_QuerySaved", repo=self.source.repoApi)
self.source.client.save_dataset(self, name)
log_message(f"Dataset {name} saved")
copy_with_ds_assigned = self.__deepcopy__()
copy_with_ds_assigned.load_from_dataset(dataset_name=name, change_query=False)
return copy_with_ds_assigned
@deprecated("Either use autologging, or QueryResult.log_to_mlflow() if autologging is turned off")
def log_to_mlflow(
self,
artifact_name: Optional[str] = None,
run: Optional["mlflow.entities.Run"] = None,
as_of: Optional[datetime.datetime] = None,
) -> "mlflow.entities.Run":
"""
Logs the current datasource state to MLflow as an artifact.
.. warning::
This function is deprecated. Use autologging or
:func:`QueryResult.log_to_mlflow() <dagshub.data_engine.model.query_result.QueryResult.log_to_mlflow>`
instead.
Args:
artifact_name: Name of the artifact that will be stored in the MLflow run.
run: MLflow run to save to. If ``None``, uses the active MLflow run or creates a new run.
as_of: The querying time for which to save the artifact.
Any time the datasource is recreated from the artifact, it will be queried as of this timestamp.
If None, the current machine time will be used.
If the artifact is autologged to MLflow (will happen if you have an active MLflow run),
then the timestamp of the query will be used.
Returns:
Run to which the artifact was logged.
"""
if artifact_name is None:
as_of = as_of or (self._query.as_of or datetime.datetime.now())
artifact_name = self._get_mlflow_artifact_name("log", as_of)
elif not artifact_name.endswith(".dagshub.dataset.json"):
artifact_name += ".dagshub.dataset.json"
return self._log_to_mlflow(artifact_name, run, as_of)
def _autolog_mlflow(self, qr: "QueryResult"):
if not is_mlflow_installed:
return
# Run ONLY if there's an active run going on
active_run = mlflow.active_run()
if active_run is None:
return
artifact_name = self._get_mlflow_artifact_name("autolog", qr.query_data_time)
threading.Thread(
target=self._log_to_mlflow,
kwargs={"artifact_name": artifact_name, "run": active_run, "as_of": qr.query_data_time},
).start()
def _log_to_mlflow(
self,
artifact_name,
run: Optional["mlflow.entities.Run"] = None,
as_of: Optional[datetime.datetime] = None,
) -> "mlflow.Entities.Run":
if run is None:
run = mlflow.active_run()
if run is None:
run = mlflow.start_run()
client = mlflow.MlflowClient()
run_id = run.info.run_id
# Refetch the run from the backend, to prevent double-writing tags
run_info = client.get_run(run_id)
try:
if MLFLOW_DATASOURCE_TAG_NAME not in run_info.data.tags:
client.set_tag(run_id, MLFLOW_DATASOURCE_TAG_NAME, self.source.id)
if self.assigned_dataset is not None and MLFLOW_DATASET_TAG_NAME not in run_info.data.tags:
client.set_tag(run_id, MLFLOW_DATASET_TAG_NAME, self.assigned_dataset.dataset_id)
client.log_dict(run.info.run_id, self._to_dict(as_of), artifact_name)
log_message(f'Saved the datasource state to MLflow (run "{run.info.run_name}") as "{artifact_name}"')
except mlflow_exceptions.RestException as e:
log_message(f"Failed to save the datasource state to MLflow (run {run.info.run_name}): {e}")
return run
def _get_mlflow_artifact_name(self, prefix: str, as_of: datetime.datetime) -> str:
now_time = as_of.strftime("%Y-%m-%dT%H-%M-%S") # Not ISO format to make it a valid filename
uuid_chunk = str(uuid.uuid4())[-4:]
return f"{prefix}_{self.source.name}_{now_time}_{uuid_chunk}.dagshub.dataset.json"
def save_to_file(self, path: Union[str, PathLike] = ".") -> Path:
"""
Saves a JSON file representing the current state of datasource or dataset.
Useful for connecting code versions to the datasource used for training.
.. note::
Does not save the actual contents of the datasource/dataset, only the query.
Args:
path: Where to save the file. If path is an existing folder, saves to ``<path>/<ds_name>.json``.
Returns:
The path to the saved file
"""
path = Path(path)
if path.is_dir():
if self.assigned_dataset is not None:
name = self.assigned_dataset.dataset_name
else:
name = self.source.name
path = path / f"{name}.json"
path.parent.mkdir(parents=True, exist_ok=True)
res = self._to_dict()
with open(path, "w") as file:
json.dump(res, file, indent=4, sort_keys=True)
log_message(f"Datasource saved to '{path}'")
return path
def _serialize(self, as_of: datetime.datetime) -> "DatasourceSerializedState":
res = DatasourceSerializedState(
repo=self.source.repo,
datasource_id=self.source.id,
datasource_name=self.source.name,
query=self._query,
timestamp=as_of.timestamp(),
modified=self.is_query_different_from_dataset,
link=self._generate_visualize_url(),
)
if self.assigned_dataset is not None:
res.dataset_id = self.assigned_dataset.dataset_id
res.dataset_name = self.assigned_dataset.dataset_name
if self._query.as_of is not None:
res.timed_link = res.link
elif as_of is not None:
timed_ds = self.as_of(as_of)
res.timed_link = timed_ds._generate_visualize_url()
return res
def _to_dict(self, as_of: Optional[datetime.datetime] = None) -> Dict:
if as_of is None:
as_of = datetime.datetime.now()
res = self._serialize(as_of).to_dict()
# Skip Nones in the result
res = {k: v for k, v in res.items() if v is not None}
return res
@property
def is_query_different_from_dataset(self) -> Optional[bool]:
"""
Is the current query of the object different from the one in the assigned dataset.
If no dataset is assigned, returns ``None``.
"""
if self.assigned_dataset is None:
return None
return self._query.to_dict() != self.assigned_dataset.query.to_dict()
@staticmethod
def load_from_serialized_state(state_dict: Dict) -> "Datasource":
"""
Load a Datasource that was saved with :func:`save_to_file`
Args:
state_dict: Serialized JSON object
"""
state = DatasourceSerializedState.from_dict(state_dict)
# The json_dataclasses.from_dict() doesn't respect the default value hints, so we fill it out for it
state.query._fill_out_defaults()
ds_state = DatasourceState(repo=state.repo, name=state.datasource_name, id=state.datasource_id)
ds_state.get_from_dagshub()
ds = Datasource(ds_state)