-
Notifications
You must be signed in to change notification settings - Fork 98
feat: support for loop #3473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: support for loop #3473
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for basic for-loop semantics by introducing LoopStart and LoopEnd operators, new iteration control messages, and the necessary execution and RPC plumbing in both Scala and Python runtimes.
- Introduced LoopStart/LoopEnd executors and descriptors to buffer and replay tuples across iterations
- Defined EndIteration and NextIteration control messages and integrated them into DataProcessor, OutputManager, and RPC handlers
- Updated Python runner, protobuf definitions, and internal models to handle iteration lifecycle
Reviewed Changes
Copilot reviewed 25 out of 28 changed files in this pull request and generated 6 comments.
Show a summary per file
File | Description |
---|---|
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sleep/SleepOpExec.scala | Added Sleep operator executor |
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sleep/SleepOpDesc.scala | Defined Sleep operator descriptor |
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala | Implemented LoopStart executor |
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala | Defined LoopStart descriptor |
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala | Implemented LoopEnd executor |
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala | Defined LoopEnd descriptor |
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala | Imported new loop and sleep operators |
core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/TupleLike.scala | Added FinalizeIteration marker |
core/workflow-core/src/main/scala/edu/uci/ics/amber/core/executor/OperatorExecutor.scala | Added reset lifecycle method |
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala | Refactored start-channel handling |
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala | Added NextIteration RPC handler |
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala | Added EndIteration RPC handler |
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala | Updated end-channel to finalize loops |
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala | Registered iteration handlers |
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/DataProcessor.scala | Integrated iteration control flow and executor reset |
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/OutputManager.scala | Added finalizeIteration API |
core/amber/src/main/python/proto/edu/uci/ics/amber/engine/architecture/rpc/init.py | Extended Python protobuf with EndIteration |
core/amber/src/main/python/core/runnables/main_loop.py | Handled EndIteration in main loop |
core/amber/src/main/python/core/runnables/data_processor.py | Treated EndIteration as InternalMarker |
core/amber/src/main/python/core/models/operator.py | Cleared table buffer in on_finish |
core/amber/src/main/python/core/models/internal_marker.py | Added EndIteration marker class |
core/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py | Registered Python EndIterationHandler |
core/amber/src/main/python/core/architecture/handlers/control/end_iteration_handler.py | Implemented Python EndIteration handler |
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/workerservice.proto | Added EndIteration/NextIteration RPCs |
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto | Added EndIterationRequest message |
Comments suppressed due to low confidence (1)
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sleep/SleepOpDesc.scala:34
- [nitpick] The JSON schema title
"n"
is ambiguous. Consider using a more descriptive title like"time"
or"duration"
to clarify the property purpose.
@JsonSchemaTitle("n")
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala
Show resolved
Hide resolved
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala
Show resolved
Hide resolved
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala
Show resolved
Hide resolved
...cala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala
Show resolved
Hide resolved
core/amber/src/main/python/core/architecture/handlers/control/end_iteration_handler.py
Outdated
Show resolved
Hide resolved
…or/loop/LoopStartOpDesc.scala Co-authored-by: Copilot <[email protected]> Signed-off-by: Xinyuan Lin <[email protected]>
Signed-off-by: Xinyuan Lin <[email protected]>
Signed-off-by: Xinyuan Lin <[email protected]>
Main changes for the
Basic For Loop
:LoopStart
OperatorLoopEnd
OperatorIterationEnd
NextIteration
Life Cycle of

Basic For Loop
: