Skip to content

Commit a82cb94

Browse files
Shelnutt2ihnorton
andauthored
Allow all batch task resources to be overridden (#315)
* Allow all batch task resources to be overridden This adds several new parameters to ingest to allow for each task to have custom resources if required. This is not the most idea, as there are tons of parameters here. This is a short term step to offer flexibility while a better overal design is considered for handling this. * Apply suggestions from code review --------- Co-authored-by: Isaiah Norton <[email protected]>
1 parent e94db05 commit a82cb94

File tree

1 file changed

+95
-14
lines changed

1 file changed

+95
-14
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 95 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ def ingest(
5252
use_sklearn: bool = True,
5353
mode: Mode = Mode.LOCAL,
5454
acn: Optional[str] = None,
55+
ingest_resources: Optional[Mapping[str, Any]] = None,
56+
consolidate_partition_resources: Optional[Mapping[str, Any]] = None,
57+
copy_centroids_resources: Optional[Mapping[str, Any]] = None,
58+
random_sample_resources: Optional[Mapping[str, Any]] = None,
59+
kmeans_resources: Optional[Mapping[str, Any]] = None,
60+
compute_new_centroids_resources: Optional[Mapping[str, Any]] = None,
61+
assign_points_and_partial_new_centroids_resources: Optional[
62+
Mapping[str, Any]
63+
] = None,
64+
write_centroids_resources: Optional[Mapping[str, Any]] = None,
65+
partial_index_resources: Optional[Mapping[str, Any]] = None,
5566
**kwargs,
5667
):
5768
"""
@@ -137,6 +148,24 @@ def ingest(
137148
execution mode, defaults to LOCAL use BATCH for distributed execution
138149
acn: Optional[str]
139150
access credential name to be used when running in BATCH mode for object store access
151+
ingest_resources: Optional[Mapping[str, Any]]
152+
resources to requst when performing vector ingestion, only applies to BATCH mode
153+
consolidate_partition_resources: Optional[Mapping[str, Any]]
154+
resources to requst when performing consolidation of a partition, only applies to BATCH mode
155+
copy_centroids_resources: Optional[Mapping[str, Any]]
156+
resources to requst when performing copy of centroids from input array to output array, only applies to BATCH mode
157+
random_sample_resources: Optional[Mapping[str, Any]]
158+
resources to request when performing random sample selection, only applies to BATCH mode
159+
kmeans_resources: Optional[Mapping[str, Any]]
160+
resources to request when performing kmeans task, only applies to BATCH mode
161+
compute_new_centroids_resources: Optional[Mapping[str, Any]]
162+
resources to request when performing centroid computation, only applies to BATCH mode
163+
assign_points_and_partial_new_centroids_resources: Optional[Mapping[str, Any]]
164+
resources to request when performing the computation of partial centroids, only applies to BATCH mode
165+
write_centroids_resources: Optional[Mapping[str, Any]]
166+
resources to request when performing the write of centroids, only applies to BATCH mode
167+
partial_index_resources: Optional[Mapping[str, Any]]
168+
resources to request when performing the computation of partial indexing, only applies to BATCH mode
140169
"""
141170
import enum
142171
import json
@@ -2044,6 +2073,17 @@ def create_ingestion_dag(
20442073
mode: Mode = Mode.LOCAL,
20452074
acn: Optional[str] = None,
20462075
namespace: Optional[str] = None,
2076+
ingest_resources: Optional[Mapping[str, Any]] = None,
2077+
consolidate_partition_resources: Optional[Mapping[str, Any]] = None,
2078+
copy_centroids_resources: Optional[Mapping[str, Any]] = None,
2079+
random_sample_resources: Optional[Mapping[str, Any]] = None,
2080+
kmeans_resources: Optional[Mapping[str, Any]] = None,
2081+
compute_new_centroids_resources: Optional[Mapping[str, Any]] = None,
2082+
assign_points_and_partial_new_centroids_resources: Optional[
2083+
Mapping[str, Any]
2084+
] = None,
2085+
write_centroids_resources: Optional[Mapping[str, Any]] = None,
2086+
partial_index_resources: Optional[Mapping[str, Any]] = None,
20472087
) -> dag.DAG:
20482088
kwargs = {}
20492089
if mode == Mode.BATCH:
@@ -2086,6 +2126,38 @@ def create_ingestion_dag(
20862126
input_vectors_work_items_per_worker_during_sampling
20872127
)
20882128

2129+
# We can't set as default in the function due to the use of `str(threads)`
2130+
# For consistency we then apply all defaults for resources here.
2131+
if ingest_resources is None:
2132+
ingest_resources = {"cpu": str(threads), "memory": "16Gi"}
2133+
2134+
if consolidate_partition_resources is None:
2135+
consolidate_partition_resources = {"cpu": str(threads), "memory": "16Gi"}
2136+
2137+
if copy_centroids_resources is None:
2138+
copy_centroids_resources = {"cpu": "1", "memory": "2Gi"}
2139+
2140+
if random_sample_resources is None:
2141+
random_sample_resources = {"cpu": "2", "memory": "6Gi"}
2142+
2143+
if kmeans_resources is None:
2144+
kmeans_resources = {"cpu": "8", "memory": "32Gi"}
2145+
2146+
if compute_new_centroids_resources is None:
2147+
compute_new_centroids_resources = {"cpu": "1", "memory": "8Gi"}
2148+
2149+
if assign_points_and_partial_new_centroids_resources is None:
2150+
assign_points_and_partial_new_centroids_resources = {
2151+
"cpu": str(threads),
2152+
"memory": "12Gi",
2153+
}
2154+
2155+
if write_centroids_resources is None:
2156+
write_centroids_resources = {"cpu": "1", "memory": "2Gi"}
2157+
2158+
if partial_index_resources is None:
2159+
partial_index_resources = {"cpu": "1", "memory": "2Gi"}
2160+
20892161
if index_type == "FLAT":
20902162
ingest_node = submit(
20912163
ingest_flat,
@@ -2103,7 +2175,7 @@ def create_ingestion_dag(
21032175
verbose=verbose,
21042176
trace_id=trace_id,
21052177
name="ingest",
2106-
resources={"cpu": str(threads), "memory": "16Gi"},
2178+
resources=ingest_resources,
21072179
image_name=DEFAULT_IMG_NAME,
21082180
**kwargs,
21092181
)
@@ -2129,7 +2201,7 @@ def create_ingestion_dag(
21292201
verbose=verbose,
21302202
trace_id=trace_id,
21312203
name="ingest",
2132-
resources={"cpu": str(threads), "memory": "16Gi"},
2204+
resources=ingest_resources,
21332205
image_name=DEFAULT_IMG_NAME,
21342206
**kwargs,
21352207
)
@@ -2146,7 +2218,7 @@ def create_ingestion_dag(
21462218
verbose=verbose,
21472219
trace_id=trace_id,
21482220
name="copy-centroids",
2149-
resources={"cpu": "1", "memory": "2Gi"},
2221+
resources=copy_centroids_resources,
21502222
image_name=DEFAULT_IMG_NAME,
21512223
**kwargs,
21522224
)
@@ -2201,7 +2273,7 @@ def create_ingestion_dag(
22012273
config=config,
22022274
verbose=verbose,
22032275
name="read-random-sample-" + str(idx),
2204-
resources={"cpu": "2", "memory": "6Gi"},
2276+
resources=random_sample_resources,
22052277
image_name=DEFAULT_IMG_NAME,
22062278
**kwargs,
22072279
)
@@ -2230,7 +2302,7 @@ def create_ingestion_dag(
22302302
trace_id=trace_id,
22312303
use_sklearn=use_sklearn,
22322304
name="kmeans",
2233-
resources={"cpu": "8", "memory": "32Gi"},
2305+
resources=kmeans_resources,
22342306
image_name=DEFAULT_IMG_NAME,
22352307
**kwargs,
22362308
)
@@ -2259,7 +2331,7 @@ def create_ingestion_dag(
22592331
verbose=verbose,
22602332
trace_id=trace_id,
22612333
name="init-centroids",
2262-
resources={"cpu": "1", "memory": "1Gi"},
2334+
resources=copy_centroids_resources,
22632335
image_name=DEFAULT_IMG_NAME,
22642336
**kwargs,
22652337
)
@@ -2294,7 +2366,7 @@ def create_ingestion_dag(
22942366
trace_id=trace_id,
22952367
use_sklearn=use_sklearn,
22962368
name="k-means-part-" + str(task_id),
2297-
resources={"cpu": str(threads), "memory": "12Gi"},
2369+
resources=assign_points_and_partial_new_centroids_resources,
22982370
image_name=DEFAULT_IMG_NAME,
22992371
**kwargs,
23002372
)
@@ -2307,7 +2379,7 @@ def create_ingestion_dag(
23072379
compute_new_centroids,
23082380
*kmeans_workers[i : i + 10],
23092381
name="update-centroids-" + str(i),
2310-
resources={"cpu": "1", "memory": "8Gi"},
2382+
resources=compute_new_centroids_resources,
23112383
image_name=DEFAULT_IMG_NAME,
23122384
**kwargs,
23132385
)
@@ -2316,7 +2388,7 @@ def create_ingestion_dag(
23162388
compute_new_centroids,
23172389
*reducers,
23182390
name="update-centroids",
2319-
resources={"cpu": "1", "memory": "8Gi"},
2391+
resources=compute_new_centroids_resources,
23202392
image_name=DEFAULT_IMG_NAME,
23212393
**kwargs,
23222394
)
@@ -2330,7 +2402,7 @@ def create_ingestion_dag(
23302402
verbose=verbose,
23312403
trace_id=trace_id,
23322404
name="write-centroids",
2333-
resources={"cpu": "1", "memory": "2Gi"},
2405+
resources=write_centroids_resources,
23342406
image_name=DEFAULT_IMG_NAME,
23352407
**kwargs,
23362408
)
@@ -2343,7 +2415,7 @@ def create_ingestion_dag(
23432415
verbose=verbose,
23442416
trace_id=trace_id,
23452417
name="compute-indexes",
2346-
resources={"cpu": "1", "memory": "2Gi"},
2418+
resources=partial_index_resources,
23472419
image_name=DEFAULT_IMG_NAME,
23482420
**kwargs,
23492421
)
@@ -2373,7 +2445,7 @@ def create_ingestion_dag(
23732445
verbose=verbose,
23742446
trace_id=trace_id,
23752447
name="ingest-" + str(task_id),
2376-
resources={"cpu": str(threads), "memory": "16Gi"},
2448+
resources=ingest_resources,
23772449
image_name=DEFAULT_IMG_NAME,
23782450
**kwargs,
23792451
)
@@ -2393,7 +2465,7 @@ def create_ingestion_dag(
23932465
verbose=verbose,
23942466
trace_id=trace_id,
23952467
name="ingest-" + str(task_id),
2396-
resources={"cpu": str(threads), "memory": "16Gi"},
2468+
resources=ingest_resources,
23972469
image_name=DEFAULT_IMG_NAME,
23982470
**kwargs,
23992471
)
@@ -2420,7 +2492,7 @@ def create_ingestion_dag(
24202492
verbose=verbose,
24212493
trace_id=trace_id,
24222494
name="consolidate-partition-" + str(task_id),
2423-
resources={"cpu": str(threads), "memory": "16Gi"},
2495+
resources=consolidate_partition_resources,
24242496
image_name=DEFAULT_IMG_NAME,
24252497
**kwargs,
24262498
)
@@ -2735,6 +2807,15 @@ def consolidate_and_vacuum(
27352807
mode=mode,
27362808
acn=acn,
27372809
namespace=namespace,
2810+
ingest_resources=ingest_resources,
2811+
consolidate_partition_resources=consolidate_partition_resources,
2812+
copy_centroids_resources=copy_centroids_resources,
2813+
random_sample_resources=random_sample_resources,
2814+
kmeans_resources=kmeans_resources,
2815+
compute_new_centroids_resources=compute_new_centroids_resources,
2816+
assign_points_and_partial_new_centroids_resources=assign_points_and_partial_new_centroids_resources,
2817+
write_centroids_resources=write_centroids_resources,
2818+
partial_index_resources=partial_index_resources,
27382819
)
27392820
logger.debug("Submitting ingestion graph")
27402821
d.compute()

0 commit comments

Comments
 (0)