@@ -305,10 +305,11 @@ def ingest(
305305 + "" .join (random .choices (string .ascii_letters , k = 10 ))
306306 )
307307 DEFAULT_ATTR_FILTERS = storage_formats [storage_version ]["DEFAULT_ATTR_FILTERS" ]
308- VECTORS_PER_WORK_ITEM = 20000000
308+ DEFAULT_PARTITION_BYTE_SIZE = 2560000000 # 2.5GB
309309 VECTORS_PER_SAMPLE_WORK_ITEM = 1000000
310310 MAX_TASKS_PER_STAGE = 100
311311 CENTRALISED_KMEANS_MAX_SAMPLE_SIZE = 1000000
312+ DEFAULT_KMEANS_BYTES_PER_SAMPLE = 128000000 # ~ 128MB
312313 DEFAULT_IMG_NAME = "3.9-vectorsearch"
313314 MAX_INT32 = 2 ** 31 - 1
314315
@@ -2105,6 +2106,14 @@ def create_ingestion_dag(
21052106 partial_index_resources : Optional [Mapping [str , Any ]] = None ,
21062107 ) -> dag .DAG :
21072108 kwargs = {}
2109+
2110+ # We compute the real size of the batch in bytes.
2111+ size_in_bytes = size * dimensions * np .dtype (vector_type ).itemsize
2112+ logger .debug ("Input size in bytes: %d" , size_in_bytes )
2113+ training_sample_size_in_bytes = (
2114+ training_sample_size * dimensions * np .dtype (vector_type ).itemsize
2115+ )
2116+ logger .debug ("Training sample size in bytes: %d" , training_sample_size_in_bytes )
21082117 if mode == Mode .BATCH :
21092118 d = dag .DAG (
21102119 name = "vector-ingestion" ,
@@ -2117,6 +2126,7 @@ def create_ingestion_dag(
21172126 namespace = namespace ,
21182127 )
21192128 threads = 16
2129+
21202130 if acn :
21212131 kwargs ["access_credentials_name" ] = acn
21222132 else :
@@ -2149,25 +2159,84 @@ def create_ingestion_dag(
21492159 input_vectors_work_items_per_worker_during_sampling
21502160 )
21512161
2162+ def scale_resources (min_resource , max_resource , max_input_size , input_size ):
2163+ """
2164+ Scales the resources based on the input size and the maximum input size.
2165+
2166+ Args:
2167+ min_resource (int): The minimum resource value (either cpu cores or ram gb).
2168+ max_resource (int): The maximum resource value.
2169+ max_input_size (int): The maximum input size.
2170+ input_size (int): The input size.
2171+
2172+ Returns:
2173+ str: The scaled resource value as a string.
2174+ """
2175+ return str (
2176+ max (
2177+ min_resource ,
2178+ min (
2179+ max_resource ,
2180+ int (max_resource * input_size / max_input_size ),
2181+ ),
2182+ )
2183+ )
2184+
21522185 # We can't set as default in the function due to the use of `str(threads)`
21532186 # For consistency we then apply all defaults for resources here.
21542187 if ingest_resources is None :
2155- ingest_resources = {"cpu" : str (threads ), "memory" : "16Gi" }
2188+ ingest_resources = {
2189+ "cpu" : scale_resources (
2190+ 2 , threads , DEFAULT_PARTITION_BYTE_SIZE , size_in_bytes
2191+ ),
2192+ "memory" : scale_resources (
2193+ 2 , 16 , DEFAULT_PARTITION_BYTE_SIZE , size_in_bytes
2194+ )
2195+ + "Gi" ,
2196+ }
21562197
21572198 if consolidate_partition_resources is None :
2158- consolidate_partition_resources = {"cpu" : str (threads ), "memory" : "16Gi" }
2199+ consolidate_partition_resources = {
2200+ "cpu" : scale_resources (
2201+ 2 , threads , DEFAULT_PARTITION_BYTE_SIZE , size_in_bytes
2202+ ),
2203+ "memory" : scale_resources (
2204+ 2 , 16 , DEFAULT_PARTITION_BYTE_SIZE , size_in_bytes
2205+ )
2206+ + "Gi" ,
2207+ }
21592208
21602209 if copy_centroids_resources is None :
21612210 copy_centroids_resources = {"cpu" : "1" , "memory" : "2Gi" }
21622211
21632212 if random_sample_resources is None :
2164- random_sample_resources = {"cpu" : "2" , "memory" : "6Gi" }
2213+ random_sample_resources = {
2214+ "cpu" : "2" ,
2215+ "memory" : "6Gi" ,
2216+ }
21652217
21662218 if kmeans_resources is None :
2167- kmeans_resources = {"cpu" : "8" , "memory" : "32Gi" }
2219+ kmeans_resources = {
2220+ "cpu" : scale_resources (
2221+ 4 ,
2222+ threads ,
2223+ DEFAULT_KMEANS_BYTES_PER_SAMPLE ,
2224+ training_sample_size_in_bytes ,
2225+ ),
2226+ "memory" : scale_resources (
2227+ 8 ,
2228+ 32 ,
2229+ DEFAULT_KMEANS_BYTES_PER_SAMPLE ,
2230+ training_sample_size_in_bytes ,
2231+ )
2232+ + "Gi" ,
2233+ }
21682234
21692235 if compute_new_centroids_resources is None :
2170- compute_new_centroids_resources = {"cpu" : "1" , "memory" : "8Gi" }
2236+ compute_new_centroids_resources = {
2237+ "cpu" : "1" ,
2238+ "memory" : "8Gi" ,
2239+ }
21712240
21722241 if assign_points_and_partial_new_centroids_resources is None :
21732242 assign_points_and_partial_new_centroids_resources = {
@@ -2688,7 +2757,12 @@ def consolidate_and_vacuum(
26882757
26892758 # Compute task parameters for main ingestion.
26902759 if input_vectors_per_work_item == - 1 :
2691- input_vectors_per_work_item = VECTORS_PER_WORK_ITEM
2760+ # We scale the input_vectors_per_work_item to maintain the DEFAULT_PARTITION_BYTE_SIZE
2761+ input_vectors_per_work_item = int (
2762+ DEFAULT_PARTITION_BYTE_SIZE
2763+ / dimensions
2764+ / np .dtype (vector_type ).itemsize
2765+ )
26922766 input_vectors_work_items = int (math .ceil (size / input_vectors_per_work_item ))
26932767 input_vectors_work_tasks = input_vectors_work_items
26942768 input_vectors_work_items_per_worker = 1
0 commit comments