Skip to content

Commit 163a962

Browse files
authored
Support access_credentials_name for BATCH tasks (#308)
TileDB Cloud supports using certain types of credentials like AWS IAM roles, Azure SAS tokens as part of a batch task graph. These credentials can provide direct access to object stores which is useful in some cases, especially during ingestion. This PR extends the existing support for ingestion by adding the new parameter `acn` which is passed to batch tasks.
1 parent 1405eff commit 163a962

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def ingest(
5151
trace_id: Optional[str] = None,
5252
use_sklearn: bool = True,
5353
mode: Mode = Mode.LOCAL,
54+
acn: Optional[str] = None,
5455
**kwargs,
5556
):
5657
"""
@@ -134,6 +135,8 @@ def ingest(
134135
tiledb.vector_search's. Defaults to true.
135136
mode: Mode
136137
execution mode, defaults to LOCAL use BATCH for distributed execution
138+
acn: Optional[str]
139+
access credential name to be used when running in BATCH mode for object store access
137140
"""
138141
import enum
139142
import json
@@ -1891,8 +1894,11 @@ def create_ingestion_dag(
18911894
trace_id: Optional[str] = None,
18921895
use_sklearn: bool = True,
18931896
mode: Mode = Mode.LOCAL,
1897+
acn: Optional[str] = None,
18941898
namespace: Optional[str] = None
18951899
) -> dag.DAG:
1900+
1901+
kwargs = {}
18961902
if mode == Mode.BATCH:
18971903
d = dag.DAG(
18981904
name="vector-ingestion",
@@ -1905,6 +1911,8 @@ def create_ingestion_dag(
19051911
namespace=namespace
19061912
)
19071913
threads = 16
1914+
if acn:
1915+
kwargs["access_credentials_name"] = acn
19081916
else:
19091917
d = dag.DAG(
19101918
name="vector-ingestion",
@@ -1950,6 +1958,7 @@ def create_ingestion_dag(
19501958
name="ingest",
19511959
resources={"cpu": str(threads), "memory": "16Gi"},
19521960
image_name=DEFAULT_IMG_NAME,
1961+
**kwargs
19531962
)
19541963
return d
19551964
elif index_type == "IVF_FLAT":
@@ -1966,6 +1975,7 @@ def create_ingestion_dag(
19661975
name="copy-centroids",
19671976
resources={"cpu": "1", "memory": "2Gi"},
19681977
image_name=DEFAULT_IMG_NAME,
1978+
**kwargs
19691979
)
19701980
else:
19711981
random_sample_nodes = []
@@ -2020,6 +2030,7 @@ def create_ingestion_dag(
20202030
name="read-random-sample-" + str(idx),
20212031
resources={"cpu": "2", "memory": "6Gi"},
20222032
image_name=DEFAULT_IMG_NAME,
2033+
**kwargs
20232034
)
20242035
)
20252036
num_sampled += num_to_sample
@@ -2048,6 +2059,7 @@ def create_ingestion_dag(
20482059
name="kmeans",
20492060
resources={"cpu": "8", "memory": "32Gi"},
20502061
image_name=DEFAULT_IMG_NAME,
2062+
**kwargs
20512063
)
20522064

20532065
for random_sample_node in random_sample_nodes:
@@ -2076,6 +2088,7 @@ def create_ingestion_dag(
20762088
name="init-centroids",
20772089
resources={"cpu": "1", "memory": "1Gi"},
20782090
image_name=DEFAULT_IMG_NAME,
2091+
**kwargs
20792092
)
20802093

20812094
for random_sample_node in random_sample_nodes:
@@ -2110,6 +2123,7 @@ def create_ingestion_dag(
21102123
name="k-means-part-" + str(task_id),
21112124
resources={"cpu": str(threads), "memory": "12Gi"},
21122125
image_name=DEFAULT_IMG_NAME,
2126+
**kwargs
21132127
)
21142128
)
21152129
task_id += 1
@@ -2122,6 +2136,7 @@ def create_ingestion_dag(
21222136
name="update-centroids-" + str(i),
21232137
resources={"cpu": "1", "memory": "8Gi"},
21242138
image_name=DEFAULT_IMG_NAME,
2139+
**kwargs
21252140
)
21262141
)
21272142
internal_centroids_node = submit(
@@ -2130,6 +2145,7 @@ def create_ingestion_dag(
21302145
name="update-centroids",
21312146
resources={"cpu": "1", "memory": "8Gi"},
21322147
image_name=DEFAULT_IMG_NAME,
2148+
**kwargs
21332149
)
21342150
centroids_node = submit(
21352151
write_centroids,
@@ -2143,6 +2159,7 @@ def create_ingestion_dag(
21432159
name="write-centroids",
21442160
resources={"cpu": "1", "memory": "2Gi"},
21452161
image_name=DEFAULT_IMG_NAME,
2162+
**kwargs
21462163
)
21472164

21482165
compute_indexes_node = submit(
@@ -2155,6 +2172,7 @@ def create_ingestion_dag(
21552172
name="compute-indexes",
21562173
resources={"cpu": "1", "memory": "2Gi"},
21572174
image_name=DEFAULT_IMG_NAME,
2175+
**kwargs
21582176
)
21592177

21602178
task_id = 0
@@ -2184,6 +2202,7 @@ def create_ingestion_dag(
21842202
name="ingest-" + str(task_id),
21852203
resources={"cpu": str(threads), "memory": "16Gi"},
21862204
image_name=DEFAULT_IMG_NAME,
2205+
**kwargs
21872206
)
21882207
ingest_node.depends_on(centroids_node)
21892208
compute_indexes_node.depends_on(ingest_node)
@@ -2203,6 +2222,7 @@ def create_ingestion_dag(
22032222
name="ingest-" + str(task_id),
22042223
resources={"cpu": str(threads), "memory": "16Gi"},
22052224
image_name=DEFAULT_IMG_NAME,
2225+
**kwargs
22062226
)
22072227
ingest_additions_node.depends_on(centroids_node)
22082228
compute_indexes_node.depends_on(ingest_additions_node)
@@ -2229,6 +2249,7 @@ def create_ingestion_dag(
22292249
name="consolidate-partition-" + str(task_id),
22302250
resources={"cpu": str(threads), "memory": "16Gi"},
22312251
image_name=DEFAULT_IMG_NAME,
2252+
**kwargs
22322253
)
22332254
consolidate_partition_node.depends_on(compute_indexes_node)
22342255
task_id += 1
@@ -2513,6 +2534,7 @@ def consolidate_and_vacuum(
25132534
trace_id=trace_id,
25142535
use_sklearn=use_sklearn,
25152536
mode=mode,
2537+
acn=acn,
25162538
namespace=namespace,
25172539
)
25182540
logger.debug("Submitting ingestion graph")

0 commit comments

Comments
 (0)