File tree Expand file tree Collapse file tree 2 files changed +11
-7
lines changed
sdks/python/apache_beam/transforms Expand file tree Collapse file tree 2 files changed +11
-7
lines changed Original file line number Diff line number Diff line change @@ -40,6 +40,7 @@ apply from: "$projectDir/common.gradle"
4040
4141dependencies {
4242 implementation enforcedPlatform(library. java. google_cloud_platform_libraries_bom)
43+ implementation library. java. google_cloud_secret_manager
4344 implementation library. java. vendored_guava_32_1_2_jre
4445 if (project. findProperty(' testJavaVersion' ) == ' 21' || JavaVersion . current(). compareTo(JavaVersion . VERSION_21 ) >= 0 ) {
4546 // this dependency is a provided dependency for kafka-avro-serializer. It is not needed to compile with Java<=17
Original file line number Diff line number Diff line change @@ -546,13 +546,16 @@ def expand(self, pcoll):
546546 pcoll .element_type ).tuple_types )
547547 kv_type_hint = typehints .KV [key_type , value_type ]
548548 if kv_type_hint and kv_type_hint != typehints .Any :
549- coder = coders .registry .get_coder (kv_type_hint ).as_deterministic_coder (
550- f'GroupByEncryptedKey { self .label } '
551- 'The key coder is not deterministic. This may result in incorrect '
552- 'pipeline output. This can be fixed by adding a type hint to the '
553- 'operation preceding the GroupByKey step, and for custom key '
554- 'classes, by writing a deterministic custom Coder. Please see the '
555- 'documentation for more details.' )
549+ coder = coders .registry .get_coder (kv_type_hint )
550+ try :
551+ coder = coder .as_deterministic_coder ()
552+ except ValueError :
553+ logging .warning (f'GroupByEncryptedKey { self .label } : '
554+ 'The key coder is not deterministic. This may result in incorrect '
555+ 'pipeline output. This can be fixed by adding a type hint to the '
556+ 'operation preceding the GroupByKey step, and for custom key '
557+ 'classes, by writing a deterministic custom Coder. Please see the '
558+ 'documentation for more details.' )
556559 if not coder .is_kv_coder ():
557560 raise ValueError (
558561 'Input elements to the transform %s with stateful DoFn must be '
You can’t perform that action at this time.
0 commit comments