Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/user_scenarios.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,12 @@ If the stream **is suspended**, you need to create backfill request first and th
If your stream has failed, you can set `spec.suspended` to `true` to stop the stream.
To avoid data loss, you may create a backfill request that fills in any gaps occurred during the failure.
After that, you can set `spec.suspended` to `false` to restart the stream.

## I deleted tha pod and my stream transitioned to failed state, how do I avoid that in the future?
Arcane streaming is built on top of Kubernetes Jobs. By default, when a pod is deleted manually or due to node eviction,
all containers in the pod receive a SIGTERM signal and have a grace period to shut down gracefully **with exit code 0**.
If the containers do not shut down within the grace period, the pod is forcefully terminated. If the pod is terminated
with a non-zero exit code, Kubernetes counts this pod **as failed** and the job may transition to a failed state.
It's a responsibility of the user and/or the plugin developer to ensure that the streaming job handles termination signals
gracefully and exits with code 0 or the exit code that returned by the plugin executable on termination is
[added to the job's podFailurePolicy](https://kubernetes.io/docs/tasks/job/pod-failure-policy/).
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mock-stream-plugin:
--namespace default \
--set jobTemplateSettings.podFailurePolicySettings.retryOnExitCodes="{120,121}" \
--set jobTemplateSettings.backoffLimit=1 \
--version v1.0.4
--version v1.0.5

manifests:
kubectl apply -f integration_tests/manifests/*.yaml
2 changes: 1 addition & 1 deletion services/controllers/stream/stream_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (s *streamReconciler) getLogger(_ context.Context, request types.Namespaced
func (s *streamReconciler) updateStreamPhase(ctx context.Context, definition Definition, backfillRequest *v1.BackfillRequest, next Phase, eventFunc controllers.EventFunc) (reconcile.Result, error) {
logger := s.getLogger(ctx, definition.NamespacedName())
if definition.GetPhase() == next { // coverage-ignore
logger.V(0).Info("Stream phase is already set to", "phase", definition.GetPhase())
logger.V(0).Info("Stream phase is already set", "phase", definition.GetPhase())
return reconcile.Result{}, nil
}
logger.V(0).Info("updating Stream status", "from", definition.GetPhase(), "to", next)
Expand Down
14 changes: 12 additions & 2 deletions services/controllers/stream/streaming_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@ func (j StreamingJob) CurrentConfiguration() (string, error) { // coverage-ignor
}

func (j StreamingJob) IsCompleted() bool { // coverage-ignore (trivial)
return j.Status.Succeeded > 0
for _, condition := range j.Status.Conditions {
if condition.Type == v1.JobComplete && condition.Status == "True" {
return true
}
}
return false
}

func (j StreamingJob) IsFailed() bool { // coverage-ignore (trivial)
return j.Status.Failed > 0 && j.Status.Failed >= *j.Spec.BackoffLimit
for _, condition := range j.Status.Conditions {
if condition.Type == v1.JobFailed && condition.Status == "True" {
return true
}
}
return false
}

func (j StreamingJob) ToV1Job() *v1.Job { // coverage-ignore (trivial)
Expand Down