@@ -320,7 +320,7 @@ def ini_escape(s: str) -> str:
320320
321321 print (f"--- Running dbbench step '{ name } ' for { self .envd_cpus } ..." )
322322 # Execute the command in the 'dbbench' service container and capture output
323- result = self .target .c .run (
323+ result = self .target .composition .run (
324324 "dbbench" ,
325325 "-lc" , # sh arg to make it run `script`
326326 script ,
@@ -375,8 +375,9 @@ def _slug(x: Any) -> str:
375375 self .add_result (
376376 category ,
377377 name ,
378- # we have `duration` instead of `repetitions` in QPS benchmarks currently (but maybe it's good to keep
379- # `repetition` in the schema in case we ever want also multiple repetitions for QPS)
378+ # For QPS measurements, we have only one repetition (i.e., repetition 0). We have `duration` instead of
379+ # `repetitions` in QPS benchmarks currently (but maybe it's good to keep `repetition` in the schema in case
380+ # we ever want also multiple repetitions for QPS).
380381 0 ,
381382 None ,
382383 None ,
@@ -1911,14 +1912,17 @@ def run(self, runner: ScenarioRunner) -> None:
19111912 # We'll also want to measure latency, including tail latency.
19121913
19131914
1914- def disable_region (c : Composition , hard : bool ) -> None :
1915+ # TODO: We should factor out the below
1916+ # `disable_region`, `cloud_disable_enable_and_wait`, `reconfigure_envd_cpus`, `wait_for_envd`
1917+ # functions into a separate module. (Similar `disable_region` functions also occur in other tests.)
1918+ def disable_region (composition : Composition , hard : bool ) -> None :
19151919 print ("Shutting down region ..." )
19161920
19171921 try :
19181922 if hard :
1919- c .run ("mz" , "region" , "disable" , "--hard" , rm = True )
1923+ composition .run ("mz" , "region" , "disable" , "--hard" , rm = True )
19201924 else :
1921- c .run ("mz" , "region" , "disable" , rm = True )
1925+ composition .run ("mz" , "region" , "disable" , rm = True )
19221926 except UIError :
19231927 # Can return: status 404 Not Found
19241928 pass
@@ -1939,12 +1943,12 @@ def cloud_disable_enable_and_wait(
19391943 When `environmentd_cpu_allocation` is provided, it is passed to `mz region enable` via
19401944 `--environmentd-cpu-allocation` to reconfigure environmentd's CPU allocation.
19411945 """
1942- disable_region (target .c , hard = False )
1946+ disable_region (target .composition , hard = False )
19431947
19441948 if environmentd_cpu_allocation is None :
1945- target .c .run ("mz" , "region" , "enable" , rm = True )
1949+ target .composition .run ("mz" , "region" , "enable" , rm = True )
19461950 else :
1947- target .c .run (
1951+ target .composition .run (
19481952 "mz" ,
19491953 "region" ,
19501954 "enable" ,
@@ -1955,7 +1959,7 @@ def cloud_disable_enable_and_wait(
19551959
19561960 time .sleep (10 )
19571961
1958- assert "materialize.cloud" in target .c .cloud_hostname ()
1962+ assert "materialize.cloud" in target .composition .cloud_hostname ()
19591963 wait_for_envd (target )
19601964
19611965
@@ -1983,15 +1987,17 @@ def reconfigure_envd_cpus(
19831987 cpu = str (envd_cpus ),
19841988 )
19851989 print (f"--- Reconfiguring local environmentd CPUs to { envd_cpus } " )
1986- with target .c .override (overridden ):
1990+ with target .composition .override (overridden ):
19871991 # Recreate the container to apply new limits, but preserve volumes.
19881992 try :
1989- target .c .rm ("materialized" , stop = True , destroy_volumes = False )
1993+ target .composition .rm (
1994+ "materialized" , stop = True , destroy_volumes = False
1995+ )
19901996 except CommandFailureCausedUIError as e :
19911997 # Ignore only the benign case where the container does not yet exist.
19921998 if not (e .stderr and "No such container" in e .stderr ):
19931999 raise
1994- target .c .up ("materialized" )
2000+ target .composition .up ("materialized" )
19952001 wait_for_envd (target , timeout_secs = 60 )
19962002 except Exception as e :
19972003 raise UIError (f"failed to apply Docker CPU override for environmentd: { e } " )
@@ -2021,7 +2027,7 @@ def wait_for_envd(target: "BenchTarget", timeout_secs: int = 300) -> None:
20212027 - Docker: probes SQL readiness via Composition.sql_query
20222028 """
20232029 if isinstance (target , CloudTarget ):
2024- host = target .c .cloud_hostname ()
2030+ host = target .composition .cloud_hostname ()
20252031 user = target .username
20262032 # Prefer the newly created app password when present; fall back to the CLI password.
20272033 password = target .new_app_password or target .app_password or ""
@@ -2049,7 +2055,7 @@ def wait_for_envd(target: "BenchTarget", timeout_secs: int = 300) -> None:
20492055 last_err : Exception | None = None
20502056 while time .time () < deadline :
20512057 try :
2052- target .c .sql_query ("SELECT 1" , service = "materialized" )
2058+ target .composition .sql_query ("SELECT 1" , service = "materialized" )
20532059 return
20542060 except Exception as e :
20552061 last_err = e
@@ -2059,7 +2065,7 @@ def wait_for_envd(target: "BenchTarget", timeout_secs: int = 300) -> None:
20592065 )
20602066
20612067
2062- def workflow_default (c : Composition , parser : WorkflowArgumentParser ) -> None :
2068+ def workflow_default (composition : Composition , parser : WorkflowArgumentParser ) -> None :
20632069 """
20642070 Run the bench workflow by default
20652071 """
@@ -2125,7 +2131,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
21252131
21262132 if args .target == "cloud-production" :
21272133 target : BenchTarget = CloudTarget (
2128- c , PRODUCTION_USERNAME , PRODUCTION_APP_PASSWORD or ""
2134+ composition , PRODUCTION_USERNAME , PRODUCTION_APP_PASSWORD or ""
21292135 )
21302136 mz = Mz (
21312137 region = PRODUCTION_REGION ,
@@ -2134,20 +2140,20 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
21342140 )
21352141 elif args .target == "cloud-staging" :
21362142 target : BenchTarget = CloudTarget (
2137- c , STAGING_USERNAME , STAGING_APP_PASSWORD or ""
2143+ composition , STAGING_USERNAME , STAGING_APP_PASSWORD or ""
21382144 )
21392145 mz = Mz (
21402146 region = STAGING_REGION ,
21412147 environment = STAGING_ENVIRONMENT ,
21422148 app_password = STAGING_APP_PASSWORD or "" ,
21432149 )
21442150 elif args .target == "docker" :
2145- target = DockerTarget (c )
2151+ target = DockerTarget (composition )
21462152 mz = Mz (app_password = "" )
21472153 else :
21482154 raise ValueError (f"Unknown target: { args .target } " )
21492155
2150- with c .override (mz ):
2156+ with composition .override (mz ):
21512157 max_scale = args .max_scale
21522158 if target .max_scale () is not None :
21532159 max_scale = min (max_scale , target .max_scale ())
@@ -2203,7 +2209,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
22032209 envd_writer .writeheader ()
22042210
22052211 def process (scenario : str ) -> None :
2206- with c .test_case (scenario ):
2212+ with composition .test_case (scenario ):
22072213 conn = ConnectionHandler (target .new_connection )
22082214
22092215 # This cluster is just for misc setup queries.
@@ -2295,7 +2301,7 @@ def process(scenario: str) -> None:
22952301 test_failed = True
22962302 try :
22972303 scenarios_list = buildkite .shard_list (sorted (list (scenarios )), lambda s : s )
2298- c .test_parts (scenarios_list , process )
2304+ composition .test_parts (scenarios_list , process )
22992305 test_failed = False
23002306 finally :
23012307 cluster_file .close ()
@@ -2304,11 +2310,12 @@ def process(scenario: str) -> None:
23042310 if args .cleanup :
23052311 target .cleanup ()
23062312
2307- # Upload only cluster scaling results to Test Analytics for now, until the Test Analytics schema is extended.
2308- # TODO: See slack discussion:
2309- # https://materializeinc.slack.com/archives/C01LKF361MZ/p1762351652336819?thread_ts=1762348361.164759&cid=C01LKF361MZ
2310- upload_cluster_results_to_test_analytics (c , cluster_path , not test_failed )
2311- upload_environmentd_results_to_test_analytics (c , envd_path , not test_failed )
2313+ upload_cluster_results_to_test_analytics (
2314+ composition , cluster_path , not test_failed
2315+ )
2316+ upload_environmentd_results_to_test_analytics (
2317+ composition , envd_path , not test_failed
2318+ )
23122319
23132320 assert not test_failed
23142321
@@ -2324,7 +2331,7 @@ def process(scenario: str) -> None:
23242331
23252332
23262333class BenchTarget :
2327- c : Composition
2334+ composition : Composition
23282335
23292336 @abstractmethod
23302337 def initialize (self ) -> None : ...
@@ -2356,8 +2363,10 @@ def dbbench_connection_flags(self) -> list[str]:
23562363
23572364
23582365class CloudTarget (BenchTarget ):
2359- def __init__ (self , c : Composition , username : str , app_password : str ) -> None :
2360- self .c = c
2366+ def __init__ (
2367+ self , composition : Composition , username : str , app_password : str
2368+ ) -> None :
2369+ self .composition = composition
23612370 self .username = username
23622371 self .app_password = app_password
23632372 self .new_app_password : str | None = None
@@ -2368,7 +2377,7 @@ def dbbench_connection_flags(self) -> list[str]:
23682377 "-driver" ,
23692378 "postgres" ,
23702379 "-host" ,
2371- self .c .cloud_hostname (),
2380+ self .composition .cloud_hostname (),
23722381 "-port" ,
23732382 "6875" ,
23742383 "-username" ,
@@ -2387,7 +2396,7 @@ def initialize(self) -> None:
23872396
23882397 # Create new app password.
23892398 new_app_password_name = "Materialize CLI (mz) - Cluster Spec Sheet"
2390- output = self .c .run (
2399+ output = self .composition .run (
23912400 "mz" ,
23922401 "app-password" ,
23932402 "create" ,
@@ -2401,7 +2410,7 @@ def initialize(self) -> None:
24012410 def new_connection (self ) -> psycopg .Connection :
24022411 assert self .new_app_password is not None
24032412 conn = psycopg .connect (
2404- host = self .c .cloud_hostname (),
2413+ host = self .composition .cloud_hostname (),
24052414 port = 6875 ,
24062415 user = self .username ,
24072416 password = self .new_app_password ,
@@ -2412,7 +2421,7 @@ def new_connection(self) -> psycopg.Connection:
24122421 return conn
24132422
24142423 def cleanup (self ) -> None :
2415- disable_region (self .c , hard = True )
2424+ disable_region (self .composition , hard = True )
24162425
24172426 def replica_size_for_scale (self , scale : int ) -> str :
24182427 """
@@ -2422,8 +2431,8 @@ def replica_size_for_scale(self, scale: int) -> str:
24222431
24232432
24242433class DockerTarget (BenchTarget ):
2425- def __init__ (self , c : Composition ) -> None :
2426- self .c = c
2434+ def __init__ (self , composition : Composition ) -> None :
2435+ self .composition = composition
24272436
24282437 def dbbench_connection_flags (self ) -> list [str ]:
24292438 return [
@@ -2443,14 +2452,14 @@ def dbbench_connection_flags(self) -> list[str]:
24432452
24442453 def initialize (self ) -> None :
24452454 print ("Starting local Materialize instance ..." )
2446- self .c .up ("materialized" )
2455+ self .composition .up ("materialized" )
24472456
24482457 def new_connection (self ) -> psycopg .Connection :
2449- return self .c .sql_connection ()
2458+ return self .composition .sql_connection ()
24502459
24512460 def cleanup (self ) -> None :
24522461 print ("Stopping local Materialize instance ..." )
2453- self .c .stop ("materialized" )
2462+ self .composition .stop ("materialized" )
24542463
24552464 def replica_size_for_scale (self , scale : int ) -> str :
24562465 # 100cc == 2 workers
@@ -2590,7 +2599,7 @@ def recreate_cluster() -> None:
25902599 if isinstance (target , CloudTarget ):
25912600 # We reset the cloud envd's core count in any case, to avoid accidentally burning a lot of money.
25922601 print ("--- Resetting Cloud environmentd CPUs to the default" )
2593- target .c .run (
2602+ target .composition .run (
25942603 "mz" ,
25952604 "region" ,
25962605 "enable" ,
@@ -2657,7 +2666,7 @@ def run_scenario_weak(
26572666 scenario .run (runner )
26582667
26592668
2660- def workflow_plot (c : Composition , parser : WorkflowArgumentParser ) -> None :
2669+ def workflow_plot (composition : Composition , parser : WorkflowArgumentParser ) -> None :
26612670 """Analyze the results of the workflow."""
26622671
26632672 parser .add_argument (
@@ -2783,6 +2792,7 @@ def analyze_envd_results_file(file: str) -> None:
27832792 plot_dir = os .path .join ("test" , "cluster-spec-sheet" , "plots" , base_name )
27842793 os .makedirs (plot_dir , exist_ok = True )
27852794
2795+ # TODO: this might be need to be modified if we have more than one repetitions in an envd results file.
27862796 for (benchmark , category , mode ), sub in df .groupby (
27872797 ["scenario" , "category" , "mode" ]
27882798 ):
@@ -2906,14 +2916,14 @@ def labels_to_drop(
29062916
29072917
29082918def upload_cluster_results_to_test_analytics (
2909- c : Composition ,
2919+ composition : Composition ,
29102920 file : str ,
29112921 was_successful : bool ,
29122922) -> None :
29132923 if not buildkite .is_in_buildkite ():
29142924 return
29152925
2916- test_analytics = TestAnalyticsDb (create_test_analytics_config (c ))
2926+ test_analytics = TestAnalyticsDb (create_test_analytics_config (composition ))
29172927 test_analytics .builds .add_build_job (was_successful = was_successful )
29182928
29192929 result_entries = []
@@ -2950,14 +2960,14 @@ def upload_cluster_results_to_test_analytics(
29502960
29512961
29522962def upload_environmentd_results_to_test_analytics (
2953- c : Composition ,
2963+ composition : Composition ,
29542964 file : str ,
29552965 was_successful : bool ,
29562966) -> None :
29572967 if not buildkite .is_in_buildkite ():
29582968 return
29592969
2960- test_analytics = TestAnalyticsDb (create_test_analytics_config (c ))
2970+ test_analytics = TestAnalyticsDb (create_test_analytics_config (composition ))
29612971 test_analytics .builds .add_build_job (was_successful = was_successful )
29622972
29632973 result_entries = []
0 commit comments