88// by the Apache License, Version 2.0.
99
1010use std:: {
11- collections:: BTreeSet ,
11+ collections:: { BTreeMap , BTreeSet } ,
1212 fmt:: Display ,
1313 str:: FromStr ,
1414 sync:: { Arc , Mutex } ,
15+ time:: Duration ,
1516} ;
1617
1718use http:: HeaderValue ;
1819use k8s_openapi:: apimachinery:: pkg:: apis:: meta:: v1:: { Condition , Time } ;
1920use kube:: { api:: PostParams , runtime:: controller:: Action , Api , Client , Resource , ResourceExt } ;
2021use serde:: Deserialize ;
21- use tracing:: { debug, trace} ;
22+ use tokio_postgres:: NoTls ;
23+ use tracing:: { debug, trace, warn} ;
2224
2325use crate :: metrics:: Metrics ;
2426use mz_cloud_provider:: CloudProvider ;
@@ -27,7 +29,12 @@ use mz_cloud_resources::crd::materialize::v1alpha1::{
2729} ;
2830use mz_orchestrator_kubernetes:: KubernetesImagePullPolicy ;
2931use mz_orchestrator_tracing:: TracingCliArgs ;
30- use mz_ore:: { cast:: CastFrom , cli:: KeyValueArg , instrument} ;
32+ use mz_ore:: {
33+ cast:: CastFrom ,
34+ cli:: KeyValueArg ,
35+ instrument,
36+ task:: { spawn, AbortOnDropHandle } ,
37+ } ;
3138
3239pub mod balancer;
3340pub mod console;
@@ -54,6 +61,8 @@ pub struct Args {
5461 enable_prometheus_scrape_annotations : bool ,
5562 #[ clap( long) ]
5663 disable_authentication : bool ,
64+ #[ clap( long) ]
65+ use_external_orchestrator : bool ,
5766
5867 #[ clap( long) ]
5968 segment_api_key : Option < String > ,
@@ -158,6 +167,14 @@ impl Args {
158167 self . environmentd_internal_http_port
159168 )
160169 }
170+
171+ fn environmentd_internal_sql_address ( & self , namespace : & str , service_name : & str ) -> String {
172+ format ! (
173+ "{}:{}" ,
174+ self . environmentd_internal_hostname( namespace, service_name) ,
175+ self . environmentd_internal_sql_port
176+ )
177+ }
161178}
162179
163180#[ derive( Deserialize , Default ) ]
@@ -223,12 +240,121 @@ impl Display for Error {
223240 }
224241}
225242
243+ struct EnvironmentWorker {
244+ client : Client ,
245+ namespace : String ,
246+ name : String ,
247+ internal_pgwire_url : String ,
248+ }
249+
250+ impl EnvironmentWorker {
251+ fn new ( client : Client , namespace : String , name : String , internal_pgwire_url : String ) -> Self {
252+ Self {
253+ client,
254+ namespace,
255+ name,
256+ internal_pgwire_url,
257+ }
258+ }
259+
260+ async fn run ( & self , initial_generation : u64 ) {
261+ // this is required to break the bootstrapping loop, since we can't
262+ // run subscribe queries until there is a system cluster available to
263+ // run them on
264+ self . ensure_replica (
265+ & format ! ( "cluster-s1-replica-s1-gen-{}" , initial_generation) ,
266+ false ,
267+ )
268+ . await ;
269+
270+ let mut active_client = None ;
271+ loop {
272+ if let Some ( client) = active_client. as_mut ( ) {
273+ if let Err ( e) = self . subscribe ( client) . await {
274+ warn ! ( "lost subscribe connection: {e}" ) ;
275+ active_client = None ;
276+ }
277+ } else {
278+ if let Ok ( client) = self . reconnect ( ) . await {
279+ active_client = Some ( client) ;
280+ }
281+ }
282+ }
283+ }
284+
285+ async fn ensure_replica ( & self , replica_name : & str , write_status : bool ) {
286+ // ensure_service logic from orchestrator-kubernetes goes here
287+ todo ! ( )
288+ }
289+
290+ async fn drop_replica ( & self , replica_name : & str ) {
291+ // drop_service logic from orchestrator-kubernetes goes here
292+ todo ! ( )
293+ }
294+
295+ async fn subscribe ( & self , client : & mut tokio_postgres:: Client ) -> Result < ( ) , anyhow:: Error > {
296+ let transaction = client. transaction ( ) . await ?;
297+ transaction
298+ . execute (
299+ "DECLARE c CURSOR FOR SUBSCRIBE (SELECT id, state FROM mz_internal.mz_external_orchestrator_services) ENVELOPE UPSERT (KEY (id));" ,
300+ & [ ] ,
301+ )
302+ . await ?;
303+ loop {
304+ let results = transaction. query ( "FETCH ALL c;" , & [ ] ) . await ?;
305+ for row in results {
306+ let id = row. get ( "id" ) ;
307+ match row. get ( "mz_state" ) {
308+ "upsert" => {
309+ self . ensure_replica ( id, true ) . await ;
310+ }
311+ "delete" => {
312+ self . drop_replica ( id) . await ;
313+ }
314+ _ => { }
315+ }
316+ }
317+ }
318+ }
319+
320+ async fn reconnect ( & self ) -> Result < tokio_postgres:: Client , anyhow:: Error > {
321+ let ( client, connection) = match tokio:: time:: timeout (
322+ Duration :: from_secs ( 5 ) ,
323+ tokio_postgres:: connect ( & self . internal_pgwire_url , NoTls ) ,
324+ )
325+ . await
326+ {
327+ Ok ( Ok ( ( client, connection) ) ) => ( client, connection) ,
328+ Ok ( Err ( err) ) => {
329+ warn ! ( "failed to connect to environmentd: {err}" ) ;
330+ return Err ( err. into ( ) ) ;
331+ }
332+ Err ( err) => {
333+ warn ! ( "timed out connecting to environmentd" ) ;
334+ return Err ( err. into ( ) ) ;
335+ }
336+ } ;
337+
338+ mz_ore:: task:: spawn (
339+ || format ! ( "postgres connection for {}/{}" , self . namespace, self . name) ,
340+ async move {
341+ if let Err ( e) = connection. await {
342+ panic ! ( "connection error: {}" , e) ;
343+ }
344+ } ,
345+ ) ;
346+
347+ Ok ( client)
348+ }
349+ }
350+
226351pub struct Context {
227352 config : Args ,
228353 tracing : TracingCliArgs ,
229354 orchestratord_namespace : String ,
230355 metrics : Arc < Metrics > ,
231356 needs_update : Arc < Mutex < BTreeSet < String > > > ,
357+ environment_workers : Mutex < BTreeMap < String , AbortOnDropHandle < ( ) > > > ,
232358}
233359
234360impl Context {
@@ -255,6 +381,7 @@ impl Context {
255381 orchestratord_namespace,
256382 metrics,
257383 needs_update : Default :: default ( ) ,
384+ environment_workers : Mutex :: new ( BTreeMap :: new ( ) ) ,
258385 }
259386 }
260387
@@ -297,6 +424,51 @@ impl Context {
297424 )
298425 . await
299426 }
427+
428+ fn start_environment_worker ( & self , mz : & Materialize , client : Client ) {
429+ let namespace = mz. namespace ( ) ;
430+ let name = mz. name_unchecked ( ) ;
431+ let generation = mz
432+ . status
433+ . as_ref ( )
434+ . map ( |status| status. active_generation )
435+ . unwrap_or ( 1 ) ;
436+ let internal_pgwire_url = format ! (
437+ "postgres://mz_system@{}/materialize" ,
438+ self . config. environmentd_internal_sql_address(
439+ & mz. namespace( ) ,
440+ & mz. environmentd_service_name( )
441+ )
442+ ) ;
443+ self . environment_workers
444+ . lock ( )
445+ . unwrap ( )
446+ . entry ( mz. metadata . uid . clone ( ) . unwrap ( ) )
447+ . or_insert_with ( || {
448+ spawn (
449+ || {
450+ format ! (
451+ "environment worker for {}/{}" ,
452+ mz. namespace( ) ,
453+ mz. name_unchecked( )
454+ )
455+ } ,
456+ async move {
457+ EnvironmentWorker :: new ( client, namespace, name, internal_pgwire_url)
458+ . run ( generation)
459+ . await
460+ } ,
461+ )
462+ . abort_on_drop ( )
463+ } ) ;
464+ }
465+
466+ fn stop_environment_worker ( & self , mz : & Materialize ) {
467+ self . environment_workers
468+ . lock ( )
469+ . unwrap ( )
470+ . remove ( mz. metadata . uid . as_ref ( ) . unwrap ( ) ) ;
471+ }
300472}
301473
302474#[ async_trait:: async_trait]
@@ -439,6 +611,9 @@ impl k8s_controller::Context for Context {
439611 false ,
440612 )
441613 . await ?;
614+ if self . config . use_external_orchestrator {
615+ self . start_environment_worker ( mz, client. clone ( ) ) ;
616+ }
442617 Ok ( None )
443618 }
444619 Err ( e) => {
@@ -609,6 +784,7 @@ impl k8s_controller::Context for Context {
609784 mz : & Self :: Resource ,
610785 ) -> Result < Option < Action > , Self :: Error > {
611786 self . set_needs_update ( mz, false ) ;
787+ self . stop_environment_worker ( mz) ;
612788
613789 Ok ( None )
614790 }
0 commit comments