Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
GRPC_ARG_KEEPALIVE_TIME_MS: "30000"
GRPC_ARG_KEEPALIVE_TIMEOUT_MS: "5000"
GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA: "0"
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS: "1"
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS: "120000"
GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS: "1000"
GRPC_ARG_MAX_CONNECTION_IDLE_MS: "300000"
GRPC_ARG_MAX_CONNECTION_AGE_MS: "1800000"
BEAM_RETRY_MAX_ATTEMPTS: "5"
BEAM_RETRY_INITIAL_DELAY_MS: "1000"
BEAM_RETRY_MAX_DELAY_MS: "60000"
BEAM_RUNNER_BUNDLE_TIMEOUT_MS: "300000"
BEAM_TESTING_FORCE_SINGLE_BUNDLE: "true"
BEAM_TESTING_DETERMINISTIC_ORDER: "true"
BEAM_SDK_WORKER_PARALLELISM: "1"
BEAM_WORKER_POOL_SIZE: "1"

jobs:
beam_PreCommit_Java_PVR_Prism_Loopback:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ type ElementManager struct {
sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input.

pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection.
pcolInfo map[string]PColInfo // Map from pcollectionID to PColInfo for key extraction.

refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
Expand Down Expand Up @@ -255,6 +256,7 @@ func NewElementManager(config Config) *ElementManager {
consumers: map[string][]string{},
sideConsumers: map[string][]LinkID{},
pcolParents: map[string]string{},
pcolInfo: map[string]PColInfo{},
changedStages: set[string]{},
inprogressBundles: set[string]{},
refreshCond: sync.Cond{L: &sync.Mutex{}},
Expand Down Expand Up @@ -324,6 +326,10 @@ func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[stri
em.stages[ID].processingTimeTimersFamilies = ptTimers
}

func (em *ElementManager) RegisterPColInfo(pcolID string, info PColInfo) {
em.pcolInfo[pcolID] = info
}

// AddTestStream provides a builder interface for the execution layer to build the test stream from
// the protos.
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder {
Expand Down
28 changes: 26 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package engine

import (
"bytes"
"fmt"
"log/slog"
"time"

Expand Down Expand Up @@ -173,18 +175,40 @@ type tsElementEvent struct {
// Execute this ElementEvent by routing pending element to their consuming stages.
func (ev tsElementEvent) Execute(em *ElementManager) {
t := em.testStreamHandler.tagState[ev.Tag]
if t.pcollection == "" {
panic(fmt.Sprintf("TestStream tag %q not found in tagState", ev.Tag))
}
info, ok := em.pcolInfo[t.pcollection]
if !ok {
panic(fmt.Sprintf("PColInfo not registered for TestStream output PCollection %q (tag %q)", t.pcollection, ev.Tag))
}

var pending []element
for _, e := range ev.Elements {
if len(e.Encoded) == 0 {
panic(fmt.Sprintf("TestStream: empty encoded element for tag %q", ev.Tag))
}
buf := bytes.NewBuffer(e.Encoded)
elmBytes := info.EDec(buf)
if len(elmBytes) == 0 {
panic(fmt.Sprintf("TestStream: decoded element bytes are empty for tag %q, encoded length: %d", ev.Tag, len(e.Encoded)))
}

var keyBytes []byte
if info.KeyDec != nil {
Copy link
Collaborator

@shunping shunping Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, KeyDec will always be nil in the current code because we override the coder of the pcollection. Specifically, we length-prefix the coder for the output pcollection of TestStream:

Therefore, the coder will be a lp coder and we won't get a key decoder in the following line.

if kcid, ok := extractKVCoderID(col.GetCoderId(), coders); ok {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one possible solution for this is not to LP the entire kv coder during preprocessing, but only the value coder part.

In this case, we will keep a KV coder for this pcollection, then a fix like the current one can pick up the key decoder.

WDYT @lostluck ?

kbuf := bytes.NewBuffer(elmBytes)
keyBytes = info.KeyDec(kbuf)
}

pending = append(pending, element{
window: window.GlobalWindow{},
timestamp: e.EventTime,
elmBytes: e.Encoded,
elmBytes: elmBytes,
keyBytes: keyBytes,
pane: typex.NoFiringPane(),
})
}

// Update the consuming state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the comment unless there's a good reason to remove it. It indicates intent of the following code.

for _, sID := range em.consumers[t.pcollection] {
ss := em.stages[sID]
added := ss.AddPending(em, pending)
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
// Add a synthetic stage that should largely be unused.
em.AddStage(stage.ID, nil, maps.Values(t.GetOutputs()), nil)

for pcolID, info := range stage.OutputsToCoders {
em.RegisterPColInfo(pcolID, info)
}

// Decode the test stream, and convert it to the various events for the ElementManager.
var pyld pipepb.TestStreamPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &pyld); err != nil {
Expand Down
1 change: 0 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.gradle.enterprise.gradleplugin.internal.extension.BuildScanExtensionWithHiddenFeatures

pluginManagement {
plugins {
Expand Down
Loading