Skip to content

Commit f11ebca

Browse files
committed
Merge branch 'master' of https://github.com/apache/beam into users/damccorm/test-revert
2 parents 07d7dc7 + 8f888fb commit f11ebca

File tree

33 files changed

+1074
-124
lines changed

33 files changed

+1074
-124
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@
9797

9898
* (Python) Fixed Java YAML provider fails on Windows ([#35617](https://github.com/apache/beam/issues/35617)).
9999
* Fixed BigQueryIO creating temporary datasets in wrong project when temp_dataset is specified with a different project than the pipeline project. For some jobs, temporary datasets will now be created in the correct project (Python) ([#35813](https://github.com/apache/beam/issues/35813)).
100+
* (Go) Fix duplicates due to reads after blind writes to Bag State ([#35869](https://github.com/apache/beam/issues/35869)).
101+
* Earlier Go SDK versions can avoid the issue by not reading in the same call after a blind write.
100102

101103
## Known Issues
102104

sdks/go.mod

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ require (
3232
cloud.google.com/go/pubsub v1.50.0
3333
cloud.google.com/go/spanner v1.83.0
3434
cloud.google.com/go/storage v1.56.0
35-
github.com/aws/aws-sdk-go-v2 v1.37.2
35+
github.com/aws/aws-sdk-go-v2 v1.38.0
3636
github.com/aws/aws-sdk-go-v2/config v1.29.18
3737
github.com/aws/aws-sdk-go-v2/credentials v1.18.3
3838
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.84
39-
github.com/aws/aws-sdk-go-v2/service/s3 v1.86.0
39+
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.0
4040
github.com/aws/smithy-go v1.22.5
4141
github.com/docker/go-connections v0.5.0
4242
github.com/dustin/go-humanize v1.0.1
@@ -147,14 +147,14 @@ require (
147147
github.com/aws/aws-sdk-go v1.55.5 // indirect
148148
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect
149149
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.2 // indirect
150-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.2 // indirect
151-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.2 // indirect
150+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 // indirect
151+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 // indirect
152152
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
153-
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 // indirect
153+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.3 // indirect
154154
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect
155-
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.2 // indirect
156-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.2 // indirect
157-
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.2 // indirect
155+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.3 // indirect
156+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 // indirect
157+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.3 // indirect
158158
github.com/aws/aws-sdk-go-v2/service/sso v1.27.0 // indirect
159159
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.32.0 // indirect
160160
github.com/aws/aws-sdk-go-v2/service/sts v1.36.0 // indirect

sdks/go.sum

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -749,8 +749,8 @@ github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU
749749
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
750750
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
751751
github.com/aws/aws-sdk-go-v2 v1.23.0/go.mod h1:i1XDttT4rnf6vxc9AuskLc6s7XBee8rlLilKlc03uAA=
752-
github.com/aws/aws-sdk-go-v2 v1.37.2 h1:xkW1iMYawzcmYFYEV0UCMxc8gSsjCGEhBXQkdQywVbo=
753-
github.com/aws/aws-sdk-go-v2 v1.37.2/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg=
752+
github.com/aws/aws-sdk-go-v2 v1.38.0 h1:UCRQ5mlqcFk9HJDIqENSLR3wiG1VTWlyUfLDEvY7RxU=
753+
github.com/aws/aws-sdk-go-v2 v1.38.0/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg=
754754
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM=
755755
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1/go.mod h1:t8PYl/6LzdAqsU4/9tz28V/kU+asFePvpOMkdul0gEQ=
756756
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg=
@@ -773,40 +773,40 @@ github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.84 h1:cTXRdLkpBanlDwISl+5c
773773
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.84/go.mod h1:kwSy5X7tfIHN39uucmjQVs2LvDdXEjQucgQQEqCggEo=
774774
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9/go.mod h1:AnVH5pvai0pAF4lXRq0bmhbes1u9R8wTE+g+183bZNM=
775775
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3/go.mod h1:7sGSz1JCKHWWBHq98m6sMtWQikmYPpxjqOydDemiVoM=
776-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.2 h1:sPiRHLVUIIQcoVZTNwqQcdtjkqkPopyYmIX0M5ElRf4=
777-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.2/go.mod h1:ik86P3sgV+Bk7c1tBFCwI3VxMoSEwl4YkRB9xn1s340=
776+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 h1:o9RnO+YZ4X+kt5Z7Nvcishlz0nksIt2PIzDglLMP0vA=
777+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3/go.mod h1:+6aLJzOG1fvMOyzIySYjOFjcguGvVRL68R+uoRencN4=
778778
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3/go.mod h1:ssOhaLpRlh88H3UmEcsBoVKq309quMvm3Ds8e9d4eJM=
779779
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3/go.mod h1:ify42Rb7nKeDDPkFjKn7q1bPscVPu/+gmHH8d2c+anU=
780-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.2 h1:ZdzDAg075H6stMZtbD2o+PyB933M/f20e9WmCBC17wA=
781-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.2/go.mod h1:eE1IIzXG9sdZCB0pNNpMpsYTLl4YdOQD3njiVN1e/E4=
780+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 h1:joyyUFhiTQQmVK6ImzNU9TQSNRNeD9kOklqTzyk5v6s=
781+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3/go.mod h1:+vNIyZQP3b3B1tSLI0lxvrU9cfM7gpdRXMFfm67ZcPc=
782782
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10/go.mod h1:8DcYQcz0+ZJaSxANlHIsbbi6S+zMwjwdDqwW3r9AzaE=
783783
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
784784
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
785785
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
786786
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.3/go.mod h1:5yzAuE9i2RkVAttBl8yxZgQr5OCq4D5yDnG7j9x2L0U=
787-
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 h1:sBpc8Ph6CpfZsEdkz/8bfg8WhKlWMCms5iWj6W/AW2U=
788-
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2/go.mod h1:Z2lDojZB+92Wo6EKiZZmJid9pPrDJW2NNIXSlaEfVlU=
787+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.3 h1:ZV2XK2L3HBq9sCKQiQ/MdhZJppH/rH0vddEAamsHUIs=
788+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.3/go.mod h1:b9F9tk2HdHpbf3xbN7rUZcfmJI26N6NcJu/8OsBFI/0=
789789
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1/go.mod h1:GeUru+8VzrTXV/83XyMJ80KpH8xO89VPoUileyNQ+tc=
790790
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1/go.mod h1:l9ymW25HOqymeU2m1gbUQ3rUIsTwKs8gYHXkqDQUhiI=
791791
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM=
792792
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44=
793793
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3/go.mod h1:Seb8KNmD6kVTjwRjVEgOT5hPin6sq+v4C2ycJQDwuH8=
794794
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3/go.mod h1:R+/S1O4TYpcktbVwddeOYg+uwUfLhADP2S/x4QwsCTM=
795-
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.2 h1:blV3dY6WbxIVOFggfYIo2E1Q2lZoy5imS7nKgu5m6Tc=
796-
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.2/go.mod h1:cBWNeLBjHJRSmXAxdS7mwiMUEgx6zup4wQ9J+/PcsRQ=
795+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.3 h1:3ZKmesYBaFX33czDl6mbrcHb6jeheg6LqjJhQdefhsY=
796+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.3/go.mod h1:7ryVb78GLCnjq7cw45N6oUb9REl7/vNUwjvIqC5UgdY=
797797
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3/go.mod h1:wlY6SVjuwvh3TVRpTqdy4I1JpBFLX4UGeKZdWntaocw=
798798
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3/go.mod h1:Owv1I59vaghv1Ax8zz8ELY8DN7/Y0rGS+WWAmjgi950=
799-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.2 h1:oxmDEO14NBZJbK/M8y3brhMFEIGN4j8a6Aq8eY0sqlo=
800-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.2/go.mod h1:4hH+8QCrk1uRWDPsVfsNDUup3taAjO8Dnx63au7smAU=
799+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 h1:ieRzyHXypu5ByllM7Sp4hC5f/1Fy5wqxqY0yB85hC7s=
800+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3/go.mod h1:O5ROz8jHiOAKAwx179v+7sHMhfobFVi6nZt8DEyiYoM=
801801
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3/go.mod h1:Bm/v2IaN6rZ+Op7zX+bOUMdL4fsrYZiD0dsjLhNKwZc=
802802
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3/go.mod h1:KZgs2ny8HsxRIRbDwgvJcHHBZPOzQr/+NtGwnP+w2ec=
803-
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.2 h1:0hBNFAPwecERLzkhhBY+lQKUMpXSKVv4Sxovikrioms=
804-
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.2/go.mod h1:Vcnh4KyR4imrrjGN7A2kP2v9y6EPudqoPKXtnmBliPU=
803+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.3 h1:SE/e52dq9a05RuxzLcjT+S5ZpQobj3ie3UTaSf2NnZc=
804+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.3/go.mod h1:zkpvBTsR020VVr8TOrwK2TrUW9pOir28sH5ECHpnAfo=
805805
github.com/aws/aws-sdk-go-v2/service/kms v1.16.3/go.mod h1:QuiHPBqlOFCi4LqdSskYYAWpQlx3PKmohy+rE2F+o5g=
806806
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3/go.mod h1:g1qvDuRsJY+XghsV6zg00Z4KJ7DtFFCx8fJD2a491Ak=
807807
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.0/go.mod h1:NXRKkiRF+erX2hnybnVU660cYT5/KChRD4iUgJ97cI8=
808-
github.com/aws/aws-sdk-go-v2/service/s3 v1.86.0 h1:utPhv4ECQzJIUbtx7vMN4A8uZxlQ5tSt1H1toPI41h8=
809-
github.com/aws/aws-sdk-go-v2/service/s3 v1.86.0/go.mod h1:1/eZYtTWazDgVl96LmGdGktHFi7prAcGCrJ9JGvBITU=
808+
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.0 h1:egoDf+Geuuntmw79Mz6mk9gGmELCPzg5PFEABOHB+6Y=
809+
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.0/go.mod h1:t9MDi29H+HDbkolTSQtbI0HP9DemAWQzUjmWC7LGMnE=
810810
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.15.4/go.mod h1:PJc8s+lxyU8rrre0/4a0pn2wgwiDvOEzoOjcJUBr67o=
811811
github.com/aws/aws-sdk-go-v2/service/sns v1.17.4/go.mod h1:kElt+uCcXxcqFyc+bQqZPFD9DME/eC6oHBXvFzQ9Bcw=
812812
github.com/aws/aws-sdk-go-v2/service/sqs v1.18.3/go.mod h1:skmQo0UPvsjsuYYSYMVmrPc1HWCbHUJyrCEp+ZaLzqM=

sdks/go/pkg/beam/core/runtime/exec/userstate.go

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,18 @@ type stateProvider struct {
3535
elementKey []byte
3636
window []byte
3737

38-
transactionsByKey map[string][]state.Transaction
39-
initialValueByKey map[string]any
40-
initialBagByKey map[string][]any
41-
initialMapValuesByKey map[string]map[string]any
42-
initialMapKeysByKey map[string][]any
43-
readersByKey map[string]io.ReadCloser
44-
appendersByKey map[string]io.Writer
45-
clearersByKey map[string]io.Writer
46-
codersByKey map[string]*coder.Coder
47-
keyCodersByID map[string]*coder.Coder
48-
combineFnsByKey map[string]*graph.CombineFn
38+
transactionsByKey map[string][]state.Transaction
39+
initialValueByKey map[string]any
40+
initialBagByKey map[string][]any
41+
blindBagWriteCountsByKey map[string]int // Tracks blind writes to bags before a read.
42+
initialMapValuesByKey map[string]map[string]any
43+
initialMapKeysByKey map[string][]any
44+
readersByKey map[string]io.ReadCloser
45+
appendersByKey map[string]io.Writer
46+
clearersByKey map[string]io.Writer
47+
codersByKey map[string]*coder.Coder
48+
keyCodersByID map[string]*coder.Coder
49+
combineFnsByKey map[string]*graph.CombineFn
4950
}
5051

5152
// ReadValueState reads a value state from the State API
@@ -148,6 +149,12 @@ func (s *stateProvider) ReadBagState(userStateID string) ([]any, []state.Transac
148149
if !ok {
149150
transactions = []state.Transaction{}
150151
}
152+
// If there were blind writes before this read, trim the transactions.
153+
// These don't need to be reset, unless a clear happens.
154+
if s.blindBagWriteCountsByKey[userStateID] > 0 {
155+
// Trim blind writes from the transaction queue, to avoid re-applying them.
156+
transactions = transactions[s.blindBagWriteCountsByKey[userStateID]:]
157+
}
151158

152159
return initialValue, transactions, nil
153160
}
@@ -165,12 +172,17 @@ func (s *stateProvider) ClearBagState(val state.Transaction) error {
165172

166173
// Any transactions before a clear don't matter
167174
s.transactionsByKey[val.Key] = []state.Transaction{val}
175+
s.blindBagWriteCountsByKey[val.Key] = 1 // To account for the clear.
168176

169177
return nil
170178
}
171179

172180
// WriteBagState writes a bag state to the State API
173181
func (s *stateProvider) WriteBagState(val state.Transaction) error {
182+
_, ok := s.initialBagByKey[val.Key]
183+
if !ok {
184+
s.blindBagWriteCountsByKey[val.Key]++
185+
}
174186
ap, err := s.getBagAppender(val.Key)
175187
if err != nil {
176188
return err
@@ -510,22 +522,23 @@ func (s *userStateAdapter) NewStateProvider(ctx context.Context, reader StateRea
510522
return stateProvider{}, err
511523
}
512524
sp := stateProvider{
513-
ctx: ctx,
514-
sr: reader,
515-
SID: s.sid,
516-
elementKey: elementKey,
517-
window: win,
518-
transactionsByKey: make(map[string][]state.Transaction),
519-
initialValueByKey: make(map[string]any),
520-
initialBagByKey: make(map[string][]any),
521-
initialMapValuesByKey: make(map[string]map[string]any),
522-
initialMapKeysByKey: make(map[string][]any),
523-
readersByKey: make(map[string]io.ReadCloser),
524-
appendersByKey: make(map[string]io.Writer),
525-
clearersByKey: make(map[string]io.Writer),
526-
combineFnsByKey: s.stateIDToCombineFn,
527-
codersByKey: s.stateIDToCoder,
528-
keyCodersByID: s.stateIDToKeyCoder,
525+
ctx: ctx,
526+
sr: reader,
527+
SID: s.sid,
528+
elementKey: elementKey,
529+
window: win,
530+
transactionsByKey: make(map[string][]state.Transaction),
531+
initialValueByKey: make(map[string]any),
532+
initialBagByKey: make(map[string][]any),
533+
blindBagWriteCountsByKey: make(map[string]int),
534+
initialMapValuesByKey: make(map[string]map[string]any),
535+
initialMapKeysByKey: make(map[string][]any),
536+
readersByKey: make(map[string]io.ReadCloser),
537+
appendersByKey: make(map[string]io.Writer),
538+
clearersByKey: make(map[string]io.Writer),
539+
combineFnsByKey: s.stateIDToCombineFn,
540+
codersByKey: s.stateIDToCoder,
541+
keyCodersByID: s.stateIDToKeyCoder,
529542
}
530543

531544
return sp, nil

sdks/go/test/integration/primitives/state.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func init() {
3434
register.DoFn3x1[state.Provider, string, int, string](&valueStateClearFn{})
3535
register.DoFn3x1[state.Provider, string, int, string](&bagStateFn{})
3636
register.DoFn3x1[state.Provider, string, int, string](&bagStateClearFn{})
37+
register.DoFn3x1[state.Provider, string, int, string](&bagStateBlindWriteFn{})
3738
register.DoFn3x1[state.Provider, string, int, string](&combiningStateFn{})
3839
register.DoFn3x1[state.Provider, string, int, string](&mapStateFn{})
3940
register.DoFn3x1[state.Provider, string, int, string](&mapStateClearFn{})
@@ -211,6 +212,45 @@ func BagStateParDoClear(s beam.Scope) {
211212
passert.Equals(s, counts, "apple: 0", "pear: 0", "apple: 1", "apple: 2", "pear: 1", "apple: 3", "apple: 0", "pear: 2", "pear: 3", "pear: 0", "apple: 1", "pear: 1")
212213
}
213214

215+
type bagStateBlindWriteFn struct {
216+
State1 state.Bag[int]
217+
}
218+
219+
func (f *bagStateBlindWriteFn) ProcessElement(s state.Provider, w string, c int) string {
220+
err := f.State1.Add(s, 1)
221+
if err != nil {
222+
panic(err)
223+
}
224+
i, ok, err := f.State1.Read(s)
225+
if err != nil {
226+
panic(err)
227+
}
228+
if !ok {
229+
i = []int{}
230+
}
231+
sum := 0
232+
for _, val := range i {
233+
sum += val
234+
}
235+
236+
// Bonus "non-blind" write
237+
err = f.State1.Add(s, 1)
238+
if err != nil {
239+
panic(err)
240+
}
241+
242+
return fmt.Sprintf("%s: %v", w, sum)
243+
}
244+
245+
// BagStateBlindWriteParDo tests a DoFn that uses bag state, but performs a
246+
// blind write to the state before reading.
247+
func BagStateBlindWriteParDo(s beam.Scope) {
248+
in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
249+
keyed := beam.ParDo(s, pairWithOne, in)
250+
counts := beam.ParDo(s, &bagStateBlindWriteFn{}, keyed)
251+
passert.Equals(s, counts, "apple: 1", "pear: 1", "peach: 1", "apple: 3", "apple: 5", "pear: 3")
252+
}
253+
214254
type combiningStateFn struct {
215255
State0 state.Combining[int, int, int]
216256
State1 state.Combining[int, int, int]

sdks/go/test/integration/primitives/state_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ func TestBagStateClear(t *testing.T) {
4747
ptest.BuildAndRun(t, BagStateParDoClear)
4848
}
4949

50+
func TestBagStateBlindWrite(t *testing.T) {
51+
integration.CheckFilters(t)
52+
ptest.BuildAndRun(t, BagStateBlindWriteParDo)
53+
}
54+
5055
func TestCombiningState(t *testing.T) {
5156
integration.CheckFilters(t)
5257
ptest.BuildAndRun(t, CombiningStateParDo)

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
5959

6060
private final TupleTag<PubsubMessage> outputTag;
6161

62-
static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize)
62+
static int validatePubsubMessage(PubsubMessage message, int maxPublishBatchSize)
6363
throws SizeLimitExceededException {
6464
int payloadSize = message.getPayload().length;
6565
if (payloadSize > PUBSUB_MESSAGE_DATA_MAX_BYTES) {
@@ -86,7 +86,12 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
8686
totalSize += orderingKeySize;
8787
}
8888

89-
@Nullable Map<String, String> attributes = message.getAttributeMap();
89+
final @Nullable Map<String, String> attributes = message.getAttributeMap();
90+
if (payloadSize == 0 && (attributes == null || attributes.isEmpty())) {
91+
throw new IllegalArgumentException(
92+
"Pubsub message must contain a non-empty payload or at least one attribute.");
93+
}
94+
9095
if (attributes != null) {
9196
if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) {
9297
throw new SizeLimitExceededException(
@@ -212,7 +217,7 @@ public void process(
212217
message = message.withOrderingKey(null);
213218
}
214219
try {
215-
validatePubsubMessageSize(message, maxPublishBatchSize);
220+
validatePubsubMessage(message, maxPublishBatchSize);
216221
} catch (SizeLimitExceededException e) {
217222
badRecordRouter.route(
218223
o,

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1738,7 +1738,7 @@ public void processElement(@Element PubsubMessage message, @Timestamp Instant ti
17381738
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
17391739
// - Size validation makes no distinction between JSON and Protobuf encoding
17401740
// - Accounting for HTTP to gRPC transcoding is non-trivial
1741-
PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchByteSize);
1741+
PreparePubsubWriteDoFn.validatePubsubMessage(message, maxPublishBatchByteSize);
17421742
// NOTE: The record id is always null since it will be assigned by Pub/Sub.
17431743
final OutgoingMessage msg =
17441744
OutgoingMessage.of(message, timestamp.getMillis(), null, message.getTopic());

0 commit comments

Comments
 (0)