Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

package v1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// MockStreamDefinitionSpec is a mock implementation of the StreamDefinitionSpec for testing purposes.
type MockStreamDefinitionSpec struct {
Expand All @@ -11,11 +14,23 @@ type MockStreamDefinitionSpec struct {

// Destination represents the destination of the stream.
Destination string `json:"destination"`

// Suspended indicates whether the stream is suspended.
Suspended bool `json:"suspended"`

// JobTemplateRef represents a reference to the job template.
JobTemplateRef v1.ObjectReference `json:"jobTemplateRef"`

// BackfillJobTemplateRef represents a reference to the job template.
BackfillJobTemplateRef v1.ObjectReference `json:"backfillJobTemplateRef"`
}

type MockStreamDefinitionStatus struct {
// Phase represents the current phase of the stream.
Phase string `json:"phase"`

// ConfigurationHash represents the hash of the current configuration.
ConfigurationHash string `json:"configurationHash"`
}

// MockStreamDefinition is a mock implementation of the StreamDefinition for testing purposes.
Expand Down
2 changes: 2 additions & 0 deletions pkg/test/apis_test/streaming/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions services/controllers/stream/basic_job_configurator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package stream

Check failure on line 1 in services/controllers/stream/basic_job_configurator.go

View workflow job for this annotation

GitHub Actions / Validate commit

File test coverage below threshold

File test coverage below threshold: coverage: 0.0% (0/2); threshold: 70%

import (
batchv1 "k8s.io/api/batch/v1"
)

var _ JobConfigurator = &BasicJobConfigurator{}

type BasicJobConfigurator struct {
definition Definition
}

func (f BasicJobConfigurator) ConfigureJob(job *batchv1.Job) error {
panic("not implemented")
}

func NewFromStreamDefinition(definition Definition) *BasicJobConfigurator {
return &BasicJobConfigurator{
definition: definition,
}
}
7 changes: 7 additions & 0 deletions services/controllers/stream/job_configurator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package stream

import batchv1 "k8s.io/api/batch/v1"

type JobConfigurator interface {
ConfigureJob(job *batchv1.Job) error
}
73 changes: 73 additions & 0 deletions services/controllers/stream/stream_definition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package stream

import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
)

type Phase string

const (
New Phase = ""
Pending Phase = "Pending"
Running Phase = "Running"
Backfilling Phase = "Backfilling"
Suspended Phase = "Suspended"
Failed Phase = "Failed"
)

type Definition interface {
// GetPhase returns the current phase of the stream definition.
GetPhase() Phase

// Suspended returns true if the stream definition is suspended.
Suspended() bool

// CurrentConfiguration returns the hash sum of the current configuration (spec) of the stream definition.
CurrentConfiguration() (string, error)

// LastAppliedConfiguration returns the hash sum of the last observed configuration (spec) of the stream definition.
LastAppliedConfiguration() string

// RecomputeConfiguration recomputes and updates the last observed configuration hash.
// This should be called after any changes to the spec have been applied and the object saved to the API server.
RecomputeConfiguration() error

// NamespacedName returns the namespaced name of the stream definition.
NamespacedName() types.NamespacedName

// ToUnstructured converts the stream definition to an unstructured object.
ToUnstructured() *unstructured.Unstructured

// SetPhase sets the status of the stream definition.
SetPhase(status Phase) error

// StateString returns a string representation of the current state.
// This is primarily used for logging and debugging purposes.
StateString() string

// GetStreamingJobName returns the namespaced name of the streaming job associated with the stream definition.
GetStreamingJobName() types.NamespacedName

// GetBackfillJobName returns the namespaced name of the backfill job associated with the stream definition.
GetBackfillJobName() types.NamespacedName

// ToOwnerReference converts the stream definition to an owner reference.
ToOwnerReference() v1.OwnerReference

// JobConfigurator returns a JobConfigurator for the stream definition.
JobConfigurator() JobConfigurator
}

func fromUnstructured(obj *unstructured.Unstructured) (Definition, error) {
v := unstructuredWrapper{
underlying: obj,
}

err := v.Validate()
if err != nil {
return nil, err
}
return &v, nil
}
Loading
Loading