Skip to content

Commit cf2a3a4

Browse files
authored
Add pod condition documentation (#219)
* Fix failed job condition * Add docs and IsCompleted() fixes * Bump the mock plugin version
1 parent 0df1855 commit cf2a3a4

File tree

4 files changed

+23
-4
lines changed

4 files changed

+23
-4
lines changed

docs/user_scenarios.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,12 @@ If the stream **is suspended**, you need to create backfill request first and th
2727
If your stream has failed, you can set `spec.suspended` to `true` to stop the stream.
2828
To avoid data loss, you may create a backfill request that fills in any gaps occurred during the failure.
2929
After that, you can set `spec.suspended` to `false` to restart the stream.
30+
31+
## I deleted tha pod and my stream transitioned to failed state, how do I avoid that in the future?
32+
Arcane streaming is built on top of Kubernetes Jobs. By default, when a pod is deleted manually or due to node eviction,
33+
all containers in the pod receive a SIGTERM signal and have a grace period to shut down gracefully **with exit code 0**.
34+
If the containers do not shut down within the grace period, the pod is forcefully terminated. If the pod is terminated
35+
with a non-zero exit code, Kubernetes counts this pod **as failed** and the job may transition to a failed state.
36+
It's a responsibility of the user and/or the plugin developer to ensure that the streaming job handles termination signals
37+
gracefully and exits with code 0 or the exit code that returned by the plugin executable on termination is
38+
[added to the job's podFailurePolicy](https://kubernetes.io/docs/tasks/job/pod-failure-policy/).

justfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ mock-stream-plugin:
2727
--namespace default \
2828
--set jobTemplateSettings.podFailurePolicySettings.retryOnExitCodes="{120,121}" \
2929
--set jobTemplateSettings.backoffLimit=1 \
30-
--version v1.0.4
30+
--version v1.0.5
3131

3232
manifests:
3333
kubectl apply -f integration_tests/manifests/*.yaml

services/controllers/stream/stream_reconciler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ func (s *streamReconciler) getLogger(_ context.Context, request types.Namespaced
497497
func (s *streamReconciler) updateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
498498
logger := s.getLogger(ctx, definition.NamespacedName())
499499
if definition.GetPhase() == next { // coverage-ignore
500-
logger.V(0).Info("Stream phase is already set to", "phase", definition.GetPhase())
500+
logger.V(0).Info("Stream phase is already set", "phase", definition.GetPhase())
501501
return reconcile.Result{}, nil
502502
}
503503
logger.V(0).Info("updating Stream status", "from", definition.GetPhase(), "to", next)

services/controllers/stream/streaming_job.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,21 @@ func (j StreamingJob) CurrentConfiguration() (string, error) { // coverage-ignor
1818
}
1919

2020
func (j StreamingJob) IsCompleted() bool { // coverage-ignore (trivial)
21-
return j.Status.Succeeded > 0
21+
for _, condition := range j.Status.Conditions {
22+
if condition.Type == v1.JobComplete && condition.Status == "True" {
23+
return true
24+
}
25+
}
26+
return false
2227
}
2328

2429
func (j StreamingJob) IsFailed() bool { // coverage-ignore (trivial)
25-
return j.Status.Failed > 0 && j.Status.Failed >= *j.Spec.BackoffLimit
30+
for _, condition := range j.Status.Conditions {
31+
if condition.Type == v1.JobFailed && condition.Status == "True" {
32+
return true
33+
}
34+
}
35+
return false
2636
}
2737

2838
func (j StreamingJob) ToV1Job() *v1.Job { // coverage-ignore (trivial)

0 commit comments

Comments
 (0)