@@ -546,13 +546,18 @@ def expand(self, pcoll):
546546 pcoll .element_type ).tuple_types )
547547 kv_type_hint = typehints .KV [key_type , value_type ]
548548 if kv_type_hint and kv_type_hint != typehints .Any :
549- coder = coders .registry .get_coder (kv_type_hint ).as_deterministic_coder (
550- f'GroupByEncryptedKey { self .label } '
551- 'The key coder is not deterministic. This may result in incorrect '
552- 'pipeline output. This can be fixed by adding a type hint to the '
553- 'operation preceding the GroupByKey step, and for custom key '
554- 'classes, by writing a deterministic custom Coder. Please see the '
555- 'documentation for more details.' )
549+ coder = coders .registry .get_coder (kv_type_hint )
550+ try :
551+ coder = coder .as_deterministic_coder (self .label )
552+ except ValueError :
553+ logging .warning (
554+ 'GroupByEncryptedKey %s: '
555+ 'The key coder is not deterministic. This may result in incorrect '
556+ 'pipeline output. This can be fixed by adding a type hint to the '
557+ 'operation preceding the GroupByKey step, and for custom key '
558+ 'classes, by writing a deterministic custom Coder. Please see the '
559+ 'documentation for more details.' ,
560+ self .label )
556561 if not coder .is_kv_coder ():
557562 raise ValueError (
558563 'Input elements to the transform %s with stateful DoFn must be '
0 commit comments