@@ -19,64 +19,14 @@ use std::env;
1919use std:: error:: Error ;
2020use std:: path:: PathBuf ;
2121
22- use ballista:: prelude:: SessionConfigExt ;
22+ use ballista:: prelude:: { SessionConfigExt , SessionContextExt } ;
2323use ballista_core:: serde:: {
2424 protobuf:: scheduler_grpc_client:: SchedulerGrpcClient , BallistaCodec ,
2525} ;
2626use ballista_core:: { ConfigProducer , RuntimeProducer } ;
2727use ballista_scheduler:: SessionBuilder ;
2828use datafusion:: execution:: SessionState ;
29- use datafusion:: prelude:: SessionConfig ;
30- use object_store:: aws:: AmazonS3Builder ;
31- use testcontainers_modules:: minio:: MinIO ;
32- use testcontainers_modules:: testcontainers:: core:: { CmdWaitFor , ExecCommand } ;
33- use testcontainers_modules:: testcontainers:: ContainerRequest ;
34- use testcontainers_modules:: { minio, testcontainers:: ImageExt } ;
35-
36- pub const REGION : & str = "eu-west-1" ;
37- pub const BUCKET : & str = "ballista" ;
38- pub const ACCESS_KEY_ID : & str = "MINIO" ;
39- pub const SECRET_KEY : & str = "MINIOMINIO" ;
40-
41- #[ allow( dead_code) ]
42- pub fn create_s3_store (
43- port : u16 ,
44- ) -> std:: result:: Result < object_store:: aws:: AmazonS3 , object_store:: Error > {
45- AmazonS3Builder :: new ( )
46- . with_endpoint ( format ! ( "http://localhost:{port}" ) )
47- . with_region ( REGION )
48- . with_bucket_name ( BUCKET )
49- . with_access_key_id ( ACCESS_KEY_ID )
50- . with_secret_access_key ( SECRET_KEY )
51- . with_allow_http ( true )
52- . build ( )
53- }
54-
55- #[ allow( dead_code) ]
56- pub fn create_minio_container ( ) -> ContainerRequest < minio:: MinIO > {
57- MinIO :: default ( )
58- . with_env_var ( "MINIO_ACCESS_KEY" , ACCESS_KEY_ID )
59- . with_env_var ( "MINIO_SECRET_KEY" , SECRET_KEY )
60- }
61-
62- #[ allow( dead_code) ]
63- pub fn create_bucket_command ( ) -> ExecCommand {
64- // this is hack to create a bucket without creating s3 client.
65- // this works with current testcontainer (and image) version 'RELEASE.2022-02-07T08-17-33Z'.
66- // (testcontainer does not await properly on latest image version)
67- //
68- // if testcontainer image version change to something newer we should use "mc mb /data/ballista"
69- // to crate a bucket.
70- ExecCommand :: new ( vec ! [
71- "mkdir" . to_string( ) ,
72- format!( "/data/{}" , crate :: common:: BUCKET ) ,
73- ] )
74- . with_cmd_ready_condition ( CmdWaitFor :: seconds ( 1 ) )
75- }
76-
77- // /// Remote ballista cluster to be used for local testing.
78- // static BALLISTA_CLUSTER: tokio::sync::OnceCell<(String, u16)> =
79- // tokio::sync::OnceCell::const_new();
29+ use datafusion:: prelude:: { SessionConfig , SessionContext } ;
8030
8131/// Returns the parquet test data directory, which is by default
8232/// stored in a git submodule rooted at
@@ -161,17 +111,8 @@ pub async fn setup_test_cluster() -> (String, u16) {
161111
162112 let host = "localhost" . to_string ( ) ;
163113
164- let scheduler_url = format ! ( "http://{}:{}" , host, addr. port( ) ) ;
165-
166- let scheduler = loop {
167- match SchedulerGrpcClient :: connect ( scheduler_url. clone ( ) ) . await {
168- Err ( _) => {
169- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
170- log:: info!( "Attempting to connect to test scheduler..." ) ;
171- }
172- Ok ( scheduler) => break scheduler,
173- }
174- } ;
114+ let scheduler =
115+ connect_to_scheduler ( format ! ( "http://{}:{}" , host, addr. port( ) ) ) . await ;
175116
176117 ballista_executor:: new_standalone_executor (
177118 scheduler,
@@ -190,7 +131,6 @@ pub async fn setup_test_cluster() -> (String, u16) {
190131#[ allow( dead_code) ]
191132pub async fn setup_test_cluster_with_state ( session_state : SessionState ) -> ( String , u16 ) {
192133 let config = SessionConfig :: new_with_ballista ( ) ;
193- //let default_codec = BallistaCodec::default();
194134
195135 let addr = ballista_scheduler:: standalone:: new_standalone_scheduler_from_state (
196136 & session_state,
@@ -200,22 +140,10 @@ pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (Stri
200140
201141 let host = "localhost" . to_string ( ) ;
202142
203- let scheduler_url = format ! ( "http://{}:{}" , host, addr. port( ) ) ;
143+ let scheduler =
144+ connect_to_scheduler ( format ! ( "http://{}:{}" , host, addr. port( ) ) ) . await ;
204145
205- let scheduler = loop {
206- match SchedulerGrpcClient :: connect ( scheduler_url. clone ( ) ) . await {
207- Err ( _) => {
208- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
209- log:: info!( "Attempting to connect to test scheduler..." ) ;
210- }
211- Ok ( scheduler) => break scheduler,
212- }
213- } ;
214-
215- ballista_executor:: new_standalone_executor_from_state :: <
216- datafusion_proto:: protobuf:: LogicalPlanNode ,
217- datafusion_proto:: protobuf:: PhysicalPlanNode ,
218- > (
146+ ballista_executor:: new_standalone_executor_from_state (
219147 scheduler,
220148 config. ballista_standalone_parallelism ( ) ,
221149 & session_state,
@@ -253,22 +181,13 @@ pub async fn setup_test_cluster_with_builders(
253181
254182 let host = "localhost" . to_string ( ) ;
255183
256- let scheduler_url = format ! ( "http://{}:{}" , host, addr. port( ) ) ;
257-
258- let scheduler = loop {
259- match SchedulerGrpcClient :: connect ( scheduler_url. clone ( ) ) . await {
260- Err ( _) => {
261- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
262- log:: info!( "Attempting to connect to test scheduler..." ) ;
263- }
264- Ok ( scheduler) => break scheduler,
265- }
266- } ;
184+ let scheduler =
185+ connect_to_scheduler ( format ! ( "http://{}:{}" , host, addr. port( ) ) ) . await ;
267186
268187 ballista_executor:: new_standalone_executor_from_builder (
269188 scheduler,
270189 config. ballista_standalone_parallelism ( ) ,
271- config_producer. clone ( ) ,
190+ config_producer,
272191 runtime_producer,
273192 codec,
274193 Default :: default ( ) ,
@@ -281,6 +200,40 @@ pub async fn setup_test_cluster_with_builders(
281200 ( host, addr. port ( ) )
282201}
283202
203+ async fn connect_to_scheduler (
204+ scheduler_url : String ,
205+ ) -> SchedulerGrpcClient < tonic:: transport:: Channel > {
206+ let mut retry = 50 ;
207+ loop {
208+ match SchedulerGrpcClient :: connect ( scheduler_url. clone ( ) ) . await {
209+ Err ( _) if retry > 0 => {
210+ retry -= 1 ;
211+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
212+ log:: debug!( "Re-attempting to connect to test scheduler..." ) ;
213+ }
214+
215+ Err ( _) => {
216+ log:: error!( "scheduler connection timed out" ) ;
217+ panic ! ( "scheduler connection timed out" )
218+ }
219+ Ok ( scheduler) => break scheduler,
220+ }
221+ }
222+ }
223+
224+ #[ allow( dead_code) ]
225+ pub async fn standalone_context ( ) -> SessionContext {
226+ SessionContext :: standalone ( ) . await . unwrap ( )
227+ }
228+
229+ #[ allow( dead_code) ]
230+ pub async fn remote_context ( ) -> SessionContext {
231+ let ( host, port) = setup_test_cluster ( ) . await ;
232+ SessionContext :: remote ( & format ! ( "df://{host}:{port}" ) )
233+ . await
234+ . unwrap ( )
235+ }
236+
284237#[ ctor:: ctor]
285238fn init ( ) {
286239 // Enable RUST_LOG logging configuration for test
0 commit comments