Skip to content

Commit c0a5895

Browse files
authored
Fix zero key length panic in Prism (#36983)
* Fix TestStream so kv coder is preserved. Put keyBytes into TestStream elements. * Fix a bug when panic in watermark eval goroutine not triggering job failure * Minor fix. * Sickbay some failed tests for later investigation.
1 parent e24a2a2 commit c0a5895

File tree

5 files changed

+107
-21
lines changed

5 files changed

+107
-21
lines changed

runners/prism/java/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ def sickbayTests = [
115115
// ShardedKey not yet implemented.
116116
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
117117

118+
// Some tests failed when using TestStream with keyed elements.
119+
// https://github.com/apache/beam/issues/36984
120+
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithState',
121+
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testMapStateNoReadOnComputeIfAbsentAndPutIfAbsentInsertsElement',
122+
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp',
123+
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampWithProcessingTime',
124+
118125
// Technically these tests "succeed"
119126
// the test is just complaining that an AssertionException isn't a RuntimeException
120127
//

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ type ElementManager struct {
225225
sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input.
226226

227227
pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection.
228+
pcolInfo map[string]PColInfo
228229

229230
refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
230231
inprogressBundles set[string] // Active bundleIDs
@@ -255,6 +256,7 @@ func NewElementManager(config Config) *ElementManager {
255256
consumers: map[string][]string{},
256257
sideConsumers: map[string][]LinkID{},
257258
pcolParents: map[string]string{},
259+
pcolInfo: map[string]PColInfo{},
258260
changedStages: set[string]{},
259261
inprogressBundles: set[string]{},
260262
refreshCond: sync.Cond{L: &sync.Mutex{}},
@@ -324,6 +326,10 @@ func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[stri
324326
em.stages[ID].processingTimeTimersFamilies = ptTimers
325327
}
326328

329+
func (em *ElementManager) RegisterPColInfo(pcolID string, info PColInfo) {
330+
em.pcolInfo[pcolID] = info
331+
}
332+
327333
// AddTestStream provides a builder interface for the execution layer to build the test stream from
328334
// the protos.
329335
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder {
@@ -386,14 +392,17 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
386392
}()
387393
// Watermark evaluation goroutine.
388394
go func() {
395+
// We should defer closing of the channel first, so that when a panic happens,
396+
// we will handle the panic and trigger a job failure BEFORE the job is
397+
// prematurely marked as done.
398+
defer close(runStageCh)
389399
defer func() {
390400
// In case of panics in bundle generation, fail and cancel the job.
391401
if e := recover(); e != nil {
392402
slog.Error("panic in ElementManager.Bundles watermark evaluation goroutine", "error", e, "traceback", string(debug.Stack()))
393403
upstreamCancelFn(fmt.Errorf("panic in ElementManager.Bundles watermark evaluation goroutine: %v\n%v", e, string(debug.Stack())))
394404
}
395405
}()
396-
defer close(runStageCh)
397406

398407
for {
399408
em.refreshCond.L.Lock()

sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package engine
1717

1818
import (
19+
"bytes"
1920
"log/slog"
2021
"time"
2122

@@ -174,12 +175,20 @@ type tsElementEvent struct {
174175
func (ev tsElementEvent) Execute(em *ElementManager) {
175176
t := em.testStreamHandler.tagState[ev.Tag]
176177

178+
info := em.pcolInfo[t.pcollection]
177179
var pending []element
178180
for _, e := range ev.Elements {
181+
var keyBytes []byte
182+
if info.KeyDec != nil {
183+
kbuf := bytes.NewBuffer(e.Encoded)
184+
keyBytes = info.KeyDec(kbuf)
185+
}
186+
179187
pending = append(pending, element{
180188
window: window.GlobalWindow{},
181189
timestamp: e.EventTime,
182190
elmBytes: e.Encoded,
191+
keyBytes: keyBytes,
183192
pane: typex.NoFiringPane(),
184193
})
185194
}

sdks/go/pkg/beam/runners/prism/internal/execute.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
277277
// Add a synthetic stage that should largely be unused.
278278
em.AddStage(stage.ID, nil, maps.Values(t.GetOutputs()), nil)
279279

280+
for pcolID, info := range stage.OutputsToCoders {
281+
em.RegisterPColInfo(pcolID, info)
282+
}
283+
280284
// Decode the test stream, and convert it to the various events for the ElementManager.
281285
var pyld pipepb.TestStreamPayload
282286
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &pyld); err != nil {

sdks/go/pkg/beam/runners/prism/internal/handlerunner.go

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -227,44 +227,101 @@ func (h *runner) handleTestStream(tid string, t *pipepb.PTransform, comps *pipep
227227
}
228228
coders := map[string]*pipepb.Coder{}
229229
// Ensure awareness of the coder used for the teststream.
230-
cID, err := lpUnknownCoders(pyld.GetCoderId(), coders, comps.GetCoders())
230+
ocID := pyld.GetCoderId()
231+
cID, err := lpUnknownCoders(ocID, coders, comps.GetCoders())
231232
if err != nil {
232233
panic(err)
233234
}
234235

235236
// If the TestStream coder needs to be LP'ed or if it is a coder that has different
236237
// behaviors between nested context and outer context (in Java SDK), then we must
237238
// LP this coder and the TestStream data elements.
238-
forceLP := (cID != pyld.GetCoderId() && coders[pyld.GetCoderId()].GetSpec().GetUrn() != "beam:go:coder:custom:v1") ||
239-
coders[cID].GetSpec().GetUrn() == urns.CoderStringUTF8 ||
240-
coders[cID].GetSpec().GetUrn() == urns.CoderBytes ||
241-
coders[cID].GetSpec().GetUrn() == urns.CoderKV
239+
forceLP := (cID != ocID && coders[ocID].GetSpec().GetUrn() != "beam:go:coder:custom:v1") ||
240+
coders[ocID].GetSpec().GetUrn() == urns.CoderStringUTF8 ||
241+
coders[ocID].GetSpec().GetUrn() == urns.CoderBytes ||
242+
coders[ocID].GetSpec().GetUrn() == urns.CoderKV
242243

243244
if !forceLP {
244245
return prepareResult{SubbedComps: &pipepb.Components{
245246
Transforms: map[string]*pipepb.PTransform{tid: t},
246247
}}
247248
}
248249

249-
// The coder needed length prefixing. For simplicity, add a length prefix to each
250-
// encoded element, since we will be sending a length prefixed coder to consume
251-
// this anyway. This is simpler than trying to find all the re-written coders after the fact.
252-
// This also adds a LP-coder for the original coder in comps.
253-
cID, err = forceLpCoder(pyld.GetCoderId(), coders, comps.GetCoders())
254-
if err != nil {
255-
panic(err)
256-
}
257-
slog.Debug("teststream: add coder", "coderId", cID)
258-
259-
mustLP := func(v []byte) []byte {
260-
var buf bytes.Buffer
261-
if err := coder.EncodeVarInt((int64)(len(v)), &buf); err != nil {
250+
var mustLP func(v []byte) []byte
251+
if coders[ocID].GetSpec().GetUrn() != urns.CoderKV {
252+
// The coder needed length prefixing. For simplicity, add a length prefix to each
253+
// encoded element, since we will be sending a length prefixed coder to consume
254+
// this anyway. This is simpler than trying to find all the re-written coders after the fact.
255+
// This also adds a LP-coder for the original coder in comps.
256+
cID, err = forceLpCoder(pyld.GetCoderId(), coders, comps.GetCoders())
257+
if err != nil {
262258
panic(err)
263259
}
264-
if _, err := buf.Write(v); err != nil {
260+
slog.Debug("teststream: add coder", "coderId", cID)
261+
262+
mustLP = func(v []byte) []byte {
263+
var buf bytes.Buffer
264+
if err := coder.EncodeVarInt((int64)(len(v)), &buf); err != nil {
265+
panic(err)
266+
}
267+
if _, err := buf.Write(v); err != nil {
268+
panic(err)
269+
}
270+
return buf.Bytes()
271+
}
272+
} else {
273+
// For a KV coder, we only length-prefix the value coder because we need to
274+
// preserve the original structure of the key coder. This allows the key
275+
// coder to be easily extracted later to retrieve the KeyBytes from the
276+
// encoded elements.
277+
278+
c := coders[ocID]
279+
kcid := c.GetComponentCoderIds()[0]
280+
vcid := c.GetComponentCoderIds()[1]
281+
282+
var lpvcid string
283+
lpvcid, err = forceLpCoder(vcid, coders, comps.GetCoders())
284+
if err != nil {
265285
panic(err)
266286
}
267-
return buf.Bytes()
287+
288+
slog.Debug("teststream: add coder", "coderId", lpvcid)
289+
290+
kvc := &pipepb.Coder{
291+
Spec: &pipepb.FunctionSpec{
292+
Urn: urns.CoderKV,
293+
},
294+
ComponentCoderIds: []string{kcid, lpvcid},
295+
}
296+
297+
kvcID := ocID + "_vlp"
298+
coders[kvcID] = kvc
299+
300+
slog.Debug("teststream: add coder", "coderId", kvcID)
301+
302+
cID = kvcID
303+
304+
kd := collectionPullDecoder(kcid, coders, comps)
305+
mustLP = func(v []byte) []byte {
306+
elmBuf := bytes.NewBuffer(v)
307+
keyBytes := kd(elmBuf)
308+
309+
var buf bytes.Buffer
310+
if _, err := buf.Write(keyBytes); err != nil {
311+
panic(err)
312+
}
313+
314+
// put the length of the value
315+
if err := coder.EncodeVarInt((int64)(len(v)-len(keyBytes)), &buf); err != nil {
316+
panic(err)
317+
}
318+
319+
// write the value aka. the remaining bytes from the buffer
320+
if _, err := buf.Write(elmBuf.Bytes()); err != nil {
321+
panic(err)
322+
}
323+
return buf.Bytes()
324+
}
268325
}
269326

270327
// We need to loop over the events.

0 commit comments

Comments
 (0)