Skip to content

Commit f1529cf

Browse files
authored
feat: job notification should emit job status ... (#1165)
... for successful and failed jobs.
1 parent fc6cd54 commit f1529cf

File tree

1 file changed

+54
-9
lines changed

1 file changed

+54
-9
lines changed

ballista/scheduler/src/cluster/memory.rs

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -380,24 +380,27 @@ impl JobState for InMemoryJobState {
380380
Some(Status::Successful(_)) | Some(Status::Failed(_))
381381
) {
382382
self.completed_jobs
383-
.insert(job_id.to_string(), (status, Some(graph.clone())));
383+
.insert(job_id.to_string(), (status.clone(), Some(graph.clone())));
384384
self.running_jobs.remove(job_id);
385-
} else if let Some(old_status) =
386-
self.running_jobs.insert(job_id.to_string(), status)
387-
{
388-
self.job_event_sender.send(&JobStateEvent::JobUpdated {
389-
job_id: job_id.to_string(),
390-
status: old_status,
391-
})
385+
} else {
386+
// otherwise update running job
387+
self.running_jobs.insert(job_id.to_string(), status.clone());
392388
}
393389

390+
// job change event emitted
391+
// it is emitting current job status
392+
self.job_event_sender.send(&JobStateEvent::JobUpdated {
393+
job_id: job_id.to_string(),
394+
status,
395+
});
396+
394397
Ok(())
395398
}
396399

397400
async fn get_session(&self, session_id: &str) -> Result<Arc<SessionContext>> {
398401
self.sessions
399402
.get(session_id)
400-
.map(|sess| sess.clone())
403+
.map(|session_ctx| session_ctx.clone())
401404
.ok_or_else(|| {
402405
BallistaError::General(format!("No session for {session_id} found"))
403406
})
@@ -500,11 +503,15 @@ mod test {
500503

501504
use crate::cluster::memory::InMemoryJobState;
502505
use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
506+
use crate::cluster::{JobState, JobStateEvent};
503507
use crate::test_utils::{
504508
test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
505509
};
506510
use ballista_core::error::Result;
511+
use ballista_core::serde::protobuf::JobStatus;
507512
use ballista_core::utils::{default_config_producer, default_session_builder};
513+
use futures::StreamExt;
514+
use tokio::sync::Barrier;
508515

509516
#[tokio::test]
510517
async fn test_in_memory_job_lifecycle() -> Result<()> {
@@ -571,4 +578,42 @@ mod test {
571578

572579
Ok(())
573580
}
581+
582+
#[tokio::test]
583+
async fn test_in_memory_job_notification() -> Result<()> {
584+
let state = InMemoryJobState::new(
585+
"",
586+
Arc::new(default_session_builder),
587+
Arc::new(default_config_producer),
588+
);
589+
590+
let event_stream = state.job_state_events().await?;
591+
let barrier = Arc::new(Barrier::new(2));
592+
593+
let events = tokio::spawn({
594+
let barrier = barrier.clone();
595+
async move {
596+
barrier.wait().await;
597+
event_stream.collect::<Vec<JobStateEvent>>().await
598+
}
599+
});
600+
601+
barrier.wait().await;
602+
test_job_lifecycle(state, test_aggregation_plan(4).await).await?;
603+
let result = events.await?;
604+
assert_eq!(2, result.len());
605+
match result.last().unwrap() {
606+
JobStateEvent::JobUpdated {
607+
status:
608+
JobStatus {
609+
status: Some(ballista_core::serde::protobuf::job_status::Status::Successful(_)),
610+
..
611+
},
612+
..
613+
} => (), // assert!(true, "Last status should be successful job notification"),
614+
_ => panic!("JobUpdated status expected"),
615+
}
616+
617+
Ok(())
618+
}
574619
}

0 commit comments

Comments
 (0)