Skip to content

Comments

Implement the stream processing handler#164

Closed
s-vitaliy wants to merge 4 commits intomainfrom
implement-stream-processing-handler
Closed

Implement the stream processing handler#164
s-vitaliy wants to merge 4 commits intomainfrom
implement-stream-processing-handler

Conversation

@s-vitaliy
Copy link
Contributor

@s-vitaliy s-vitaliy commented Dec 19, 2025

Part of #144

Scope

This pull request introduces a new event-driven stream handling architecture for the streaming subsystem, focusing on improving modularity, testability, and type safety. It adds several new interfaces and types for handling streaming events, jobs, and repositories, and refactors the controller factory and handle to use these abstractions. The changes also include improvements to queue management and event processing, as well as new helper utilities for testing.

Core architecture and event handling:

  • Implemented the Handler class that handles stream events.
  • Added new interfaces: JobManager, StreamManager, StreamRepository, and StreamDefinition to abstract job and stream management, StreamEvent, EventType, and StreamIdentity types to standardize event handling and identity representation.

Queue and controller refactoring:

  • Changed the controller factory and handle to use a strongly typed workqueue.TypedRateLimitingInterface[StreamEvent] instead of any
    Testing and utilities:

  • Added a new helper in tests/helpers.go to execute a command and load a Kubernetes config, supporting flexible test environments.

  • Updated generate.go to add a mockgen directive for generating mocks for the new stream interfaces, improving testability.

NOTE: tests for this code are not completed yet

Checklist

  • GitHub issue exists for this change.
  • Unit tests added and they pass.
  • Line Coverage is at least 80%.
  • Review requested on latest commit.

@s-vitaliy s-vitaliy requested a review from a team as a code owner December 19, 2025 15:04
@github-actions
Copy link

go-test-coverage report:

  below threshold:							coverage:	threshold:
  configuration/configuration.go					 0.0% (0/18)	70%
  services/controllers/stream/handler.go				43.6% (24/55)	70%
  services/controllers/stream/stream_controller_factory.go		 0.0% (0/11)	70%
  services/controllers/stream/stream_event.go				 0.0% (0/1)	70%
  services/controllers/stream/streaming_job_controller_handle.go	 0.0% (0/26)	70%
  services/controllers/stream_class/worker.go				55.9% (38/68)	70%
  telemetry/app_metrics.go						 0.0% (0/9)	70%
  tests/mocks/stream_mocks.go						51.2% (62/121)	70%

Package coverage threshold (70%) satisfied:	FAIL
  below threshold:				coverage:	threshold:
  services/controllers/stream			26.6% (25/94)	70%
  services/controllers/stream_class		62.8% (54/86)	70%
  telemetry					 0.0% (0/9)	70%
  tests/mocks					56.9% (91/160)	70%
  configuration					 0.0% (0/18)	70%

Total coverage threshold (75%) satisfied:	FAIL
Total test coverage: 47.2% (176/373)

Files with uncovered lines:
  file:									coverage:	uncovered lines:
  configuration/configuration.go					0%		17-20 22-24 27-36 39-41 43-47 52-55 60
  services/controllers/stream/handler.go				43.6%		39-43 56-57 65-72 75-86 88 97-99 102-105 108-116 123-129 137-139
  services/controllers/stream/stream_controller_factory.go		0%		28-43 45-52 55-57
  services/controllers/stream/stream_event.go				0%		33-35
  services/controllers/stream/streaming_job_controller_handle.go	0%		30-40 42-53 55-61 63-68 71-73 75-78 81-83 85-87 89-94
  services/controllers/stream_class/handler.go				83.3%		66-72
  services/controllers/stream_class/worker.go				55.9%		44-47 60-67 73-77 89 100-103 105-108 115-131 140-142 145-147
  telemetry/app_metrics.go						0%		18-22 24 53-56 60-63 65
  tests/mocks/stream_class_mocks.go					74.4%		85-90 93-96 99-104
  tests/mocks/stream_mocks.go						51.2%		72-77 80-83 98-102 105-107 110-115 118-121 162-167 170-173 176-181 184-187 190-195 198-201 218-223 226-229 232-237 240-243 279-281 284-289 292-295 298-303 306-309

Copy link
Contributor

@george-zubrienko george-zubrienko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progress :)

"testing"
)

func Test_StreamAdded(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe try to create a test-only plugin CRD and utilize k8s testing framework instead - if possible.

s.logger.V(0).Info("Backfill requested for stream", "streamId", streamEvent.StreamId().String())
err := s.jobManager.EnsureStopped(streamEvent.StreamId())
if err != nil {
s.logger.Error(err, "Failed to stop job for reload request", "streamId", streamEvent.StreamId().String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s.logger.Error(err, "Failed to stop job for reload request", "streamId", streamEvent.StreamId().String())
s.logger.Error(err, "Failed to stop job for backfill request", "streamId", streamEvent.StreamId().String())

s.logger.V(0).Info("Stream suspended", "streamId", streamEvent.StreamId().String())
err := s.jobManager.EnsureStopped(streamEvent.StreamId())
if err != nil {
s.logger.Error(err, "Failed to stop job for suspended stream", "streamId", streamEvent.StreamId().String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd propose (not in this PR) to fire an event through event recorder that will display on the related CR - if possible

@s-vitaliy s-vitaliy closed this Jan 5, 2026
@s-vitaliy s-vitaliy deleted the implement-stream-processing-handler branch January 26, 2026 09:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants