@@ -183,6 +183,12 @@ def get_environmentd_data() -> dict[str, Any]:
183183 )
184184
185185
186+ def get_clusterd_data () -> dict [str , Any ]:
187+ return get_pod_data (
188+ labels = {"environmentd.materialize.cloud/namespace" : "cluster" },
189+ )
190+
191+
186192@contextmanager
187193def port_forward_environmentd (
188194 port : int = 6875 ,
@@ -1990,6 +1996,208 @@ def workflow_balancer(c: Composition, parser: WorkflowArgumentParser) -> None:
19901996 run_balancer (definition , False )
19911997
19921998
1999+ def workflow_orchestratord_upgrade (
2000+ c : Composition ,
2001+ parser : WorkflowArgumentParser ,
2002+ ) -> None :
2003+ # TODO: ideally we'd just be able to compare the images directly, but
2004+ # this test isn't consistently handling ghcr overrides yet - remove this
2005+ # and go back to full image comparisons once we fix that
2006+ def image_tag (image : str ):
2007+ return image .rsplit (":" , 1 )[1 ]
2008+
2009+ def check_orchestratord_version (version : MzVersion ):
2010+ def check ():
2011+ data = get_orchestratord_data ()
2012+ assert len (data ["items" ]) == 1 , f"got { len (data ['items' ])} items"
2013+
2014+ got_image = data ["items" ][0 ]["spec" ]["containers" ][0 ]["image" ]
2015+ expected_image = get_image (
2016+ c .compose ["services" ]["orchestratord" ]["image" ],
2017+ str (version ),
2018+ )
2019+ assert image_tag (got_image ) == image_tag (
2020+ expected_image
2021+ ), f"{ got_image } != { expected_image } "
2022+
2023+ retry (check , 60 )
2024+
2025+ def check_environmentd_version (version : MzVersion ):
2026+ def check ():
2027+ data = get_environmentd_data ()
2028+ assert len (data ["items" ]) == 1 , f"got { len (data ['items' ])} items"
2029+
2030+ got_image = data ["items" ][0 ]["spec" ]["containers" ][0 ]["image" ]
2031+ expected_image = get_image (
2032+ c .compose ["services" ]["environmentd" ]["image" ],
2033+ str (version ),
2034+ )
2035+ assert image_tag (got_image ) == image_tag (
2036+ expected_image
2037+ ), f"{ got_image } != { expected_image } "
2038+
2039+ retry (check , 60 )
2040+
2041+ def check_clusterd_version (version : MzVersion ):
2042+ def check ():
2043+ data = get_clusterd_data ()
2044+ assert len (data ["items" ]) == 2 , f"got { len (data ['items' ])} items"
2045+
2046+ got_image = data ["items" ][0 ]["spec" ]["containers" ][0 ]["image" ]
2047+ expected_image = get_image (
2048+ c .compose ["services" ]["clusterd" ]["image" ],
2049+ str (version ),
2050+ )
2051+ assert image_tag (got_image ) == image_tag (
2052+ expected_image
2053+ ), f"{ got_image } != { expected_image } "
2054+
2055+ retry (check , 60 )
2056+
2057+ def check_balancerd_version (version : MzVersion ):
2058+ def check ():
2059+ data = get_balancerd_data ()
2060+ assert len (data ["items" ]) == 2 , f"got { len (data ['items' ])} items"
2061+
2062+ got_image = data ["items" ][0 ]["spec" ]["containers" ][0 ]["image" ]
2063+ expected_image = get_image (
2064+ c .compose ["services" ]["balancerd" ]["image" ],
2065+ str (version ),
2066+ )
2067+ assert image_tag (got_image ) == image_tag (
2068+ expected_image
2069+ ), f"{ got_image } != { expected_image } "
2070+
2071+ # balancerd pods have a 60s draining period after termination, so we
2072+ # have to wait longer to ensure that the old ones are gone
2073+ retry (check , 180 )
2074+
2075+ parser .add_argument (
2076+ "--recreate-cluster" ,
2077+ action = argparse .BooleanOptionalAction ,
2078+ help = "Recreate cluster if it exists already" ,
2079+ )
2080+ parser .add_argument (
2081+ "--tag" ,
2082+ type = str ,
2083+ help = "Custom version tag to use" ,
2084+ )
2085+ parser .add_argument (
2086+ "--orchestratord-override" ,
2087+ default = True ,
2088+ action = argparse .BooleanOptionalAction ,
2089+ help = "Override orchestratord tag" ,
2090+ )
2091+ args = parser .parse_args ()
2092+
2093+ definition = setup (c , args )
2094+ versions = get_all_self_managed_versions ()
2095+ versions .append (get_version (args .tag ))
2096+
2097+ print (f"running orchestratord { versions [- 3 ]} " )
2098+ definition ["operator" ]["operator" ]["image" ]["tag" ] = str (versions [- 3 ])
2099+ init (definition )
2100+ check_orchestratord_version (versions [- 3 ])
2101+
2102+ print (f"running environmentd { versions [- 3 ]} " )
2103+ definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
2104+ c .compose ["services" ]["environmentd" ]["image" ],
2105+ str (versions [- 3 ]),
2106+ )
2107+ run (definition , False )
2108+ check_environmentd_version (versions [- 3 ])
2109+ check_clusterd_version (versions [- 3 ])
2110+ check_balancerd_version (versions [- 3 ])
2111+
2112+ for version in versions [- 2 :]:
2113+ print (f"running orchestratord { version } " )
2114+ definition ["operator" ]["operator" ]["image" ]["tag" ] = str (version )
2115+ spawn .runv (
2116+ [
2117+ "helm" ,
2118+ "upgrade" ,
2119+ "operator" ,
2120+ MZ_ROOT / "misc" / "helm-charts" / "operator" ,
2121+ "--namespace=materialize" ,
2122+ "--create-namespace" ,
2123+ "--version" ,
2124+ "v26.0.0" ,
2125+ "--wait" ,
2126+ "-f" ,
2127+ "-" ,
2128+ ],
2129+ stdin = yaml .dump (definition ["operator" ]).encode (),
2130+ stdout = subprocess .DEVNULL ,
2131+ stderr = subprocess .DEVNULL ,
2132+ )
2133+ check_orchestratord_version (version )
2134+
2135+ print (f"running environmentd { version } " )
2136+ definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
2137+ c .compose ["services" ]["environmentd" ]["image" ],
2138+ str (version ),
2139+ )
2140+ definition ["materialize" ]["spec" ]["requestRollout" ] = str (uuid .uuid4 ())
2141+ run (definition , False )
2142+ check_environmentd_version (version )
2143+ check_clusterd_version (version )
2144+ # balancerd is broken in the upgrade to 26.4.0
2145+ if str (version ) != "v26.4.0" :
2146+ check_balancerd_version (version )
2147+
2148+ definition = setup (c , args )
2149+
2150+ print (f"running orchestratord { versions [- 3 ]} " )
2151+ definition ["operator" ]["operator" ]["image" ]["tag" ] = str (versions [- 3 ])
2152+ init (definition )
2153+ check_orchestratord_version (versions [- 3 ])
2154+
2155+ print (f"running environmentd { versions [- 3 ]} " )
2156+ definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
2157+ c .compose ["services" ]["environmentd" ]["image" ],
2158+ str (versions [- 3 ]),
2159+ )
2160+ run (definition , False )
2161+ check_environmentd_version (versions [- 3 ])
2162+ check_clusterd_version (versions [- 3 ])
2163+ check_balancerd_version (versions [- 3 ])
2164+
2165+ print (f"running orchestratord { versions [- 1 ]} " )
2166+ definition ["operator" ]["operator" ]["image" ]["tag" ] = str (versions [- 1 ])
2167+ spawn .runv (
2168+ [
2169+ "helm" ,
2170+ "upgrade" ,
2171+ "operator" ,
2172+ MZ_ROOT / "misc" / "helm-charts" / "operator" ,
2173+ "--namespace=materialize" ,
2174+ "--create-namespace" ,
2175+ "--version" ,
2176+ "v26.0.0" ,
2177+ "--wait" ,
2178+ "-f" ,
2179+ "-" ,
2180+ ],
2181+ stdin = yaml .dump (definition ["operator" ]).encode (),
2182+ stdout = subprocess .DEVNULL ,
2183+ stderr = subprocess .DEVNULL ,
2184+ )
2185+ check_orchestratord_version (versions [- 1 ])
2186+
2187+ print (f"running environmentd { versions [- 1 ]} " )
2188+ definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
2189+ c .compose ["services" ]["environmentd" ]["image" ],
2190+ str (versions [- 1 ]),
2191+ )
2192+ definition ["materialize" ]["spec" ]["requestRollout" ] = str (uuid .uuid4 ())
2193+ run (definition , False )
2194+ check_environmentd_version (versions [- 1 ])
2195+ check_clusterd_version (versions [- 1 ])
2196+ # balancerd is broken in the upgrade to 26.4.0
2197+ if str (versions [- 1 ]) != "v26.4.0" :
2198+ check_balancerd_version (versions [- 1 ])
2199+
2200+
19932201def workflow_default (c : Composition , parser : WorkflowArgumentParser ) -> None :
19942202 parser .add_argument (
19952203 "--recreate-cluster" ,
@@ -2420,6 +2628,7 @@ def init(definition: dict[str, Any]) -> None:
24202628 "--create-namespace" ,
24212629 "--version" ,
24222630 "v26.0.0" ,
2631+ "--wait" ,
24232632 "-f" ,
24242633 "-" ,
24252634 ],
0 commit comments