Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ def sickbayTests = [

// Java side dying during execution.
// https://github.com/apache/beam/issues/32930
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
// 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
// Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74
// Likely due to prism't coder changes.
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2',
// 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2',

// java.lang.IllegalStateException: Output with tag Tag<output> must have a schema in order to call getRowReceiver
// Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders.
Expand Down
9 changes: 5 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)

// This file retains the logic for the pardo handler
Expand Down Expand Up @@ -107,12 +108,12 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C
// they're written out to the runner in the same fashion.
// This may stop being necessary once Flatten Unzipping happens in the optimizer.
outPCol := comps.GetPcollections()[outColID]
outCoder := comps.GetCoders()[outPCol.GetCoderId()]
coderSubs := map[string]*pipepb.Coder{}
pcollSubs := map[string]*pipepb.PCollection{}
for _, p := range t.GetInputs() {
inPCol := comps.GetPcollections()[p]
if inPCol.CoderId != outPCol.CoderId {
coderSubs[inPCol.CoderId] = outCoder
pcollSubs[p] = proto.Clone(inPCol).(*pipepb.PCollection)
pcollSubs[p].CoderId = outPCol.CoderId
}
}

Expand All @@ -123,7 +124,7 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C
Transforms: map[string]*pipepb.PTransform{
tid: t,
},
Coders: coderSubs,
Pcollections: pcollSubs,
},
RemovedLeaves: nil,
ForcedRoots: forcedRoots,
Expand Down
Loading