Skip to content

Conversation

@shunping
Copy link
Collaborator

@shunping shunping commented Aug 14, 2025

We received an internal report (internal bug id: 430560535) that cogbk does not honor custom coders.

Below is the code to reproduce.

"""Test Beam's use of coders for keys in CoGroupByKey."""
import apache_beam as beam
from apache_beam.options import pipeline_options


class _Unpicklable:

  def __init__(self, value):
    self.value = value

  def __getstate__(self):
    raise NotImplementedError()

  def __setstate__(self, state):
    raise NotImplementedError()

  def __repr__(self):
    return f"Unpicklable({self.value})"


class _UnpicklableCoder(beam.coders.Coder):
  """."""

  def encode(self, value):
    return str(value.value).encode()

  def decode(self, encoded):
    return _Unpicklable(int(encoded.decode()))

  def to_type_hint(self):
    return _Unpicklable

  def is_deterministic(self):
    return True


beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)


def pipeline_fn(root):
  values = [_Unpicklable(i) for i in range(5)]
  xs = root | beam.Create(values) | beam.WithKeys(lambda x: x)
  return (
      {'x': xs}
      | beam.CoGroupByKey()
      | beam.FlatMapTuple(lambda k, tagged: (k.value, tagged['x'][0].value * 2))
      | beam.LogElements()
  )


def main():
  options = pipeline_options.PipelineOptions(
      runner='DirectRunner', direct_num_workers=1, type_check_additional='all'
  )
  with beam.Pipeline(options=options) as pipeline:
    _ = pipeline_fn(pipeline)

if __name__ == '__main__':
  main()

Running this code will result in a "NotImplementedError", because pickled coder rather than the registered coder is used.

My current PR can fix this problem by propagating type hints correctly.

Related PR: #33932

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @shunping, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a bug in Apache Beam's CoGroupByKey transform where it failed to properly utilize registered custom coders for unpicklable key types. The fix involves explicitly adding type hints to an internal MapTuple operation within the CoGroupByKey implementation, ensuring that type information is correctly propagated and the registered coder is used instead of attempting to pickle unpicklable objects. This resolves an NotImplementedError that occurred when CoGroupByKey was used with custom unpicklable keys.

Highlights

  • Bug Fix for CoGroupByKey: Corrected an issue where CoGroupByKey did not honor registered custom coders for unpicklable key types, leading to NotImplementedError.
  • Type Hint Propagation: Added explicit input and output type hints to the MapTuple transform within CoGroupByKey's internal implementation to ensure proper type information flow.
  • New Test Case: Introduced a new unit test in util_test.py that specifically reproduces the bug with unpicklable keys and verifies the fix, ensuring CoGroupByKey correctly processes such keys using registered coders.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@shunping shunping self-assigned this Aug 14, 2025
@codecov
Copy link

codecov bot commented Aug 14, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 56.69%. Comparing base (0e47f80) to head (e731616).
⚠️ Report is 6 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff            @@
##             master   #35862   +/-   ##
=========================================
  Coverage     56.69%   56.69%           
  Complexity     3380     3380           
=========================================
  Files          1204     1205    +1     
  Lines        184118   184164   +46     
  Branches       3507     3507           
=========================================
+ Hits         104383   104413   +30     
- Misses        76406    76422   +16     
  Partials       3329     3329           
Flag Coverage Δ
python 80.96% <100.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @liferoad for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

| Flatten(pipeline=self.pipeline)
| GroupByKey()
| MapTuple(collect_values))
| MapTuple(collect_values).with_input_types(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this break some internal tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to #33932, it is possible to break some tests because we are changing the coder for this particular step, from a universal pickled coder to the actual coder for the element.

However, the impact radius will be much smaller than the previous PR, as cogbk is used less often than Reshuffle.

@shunping
Copy link
Collaborator Author

shunping commented Aug 14, 2025

cc'ed @jrmccluskey, as this could be a bug when trivial inference tries to infer the output type of function collect_values:

| MapTuple(collect_values))

Given the input element type and no typehint, the trivial inference returns an empty union rather than class '__main__._Unpicklable' as the List value type.

image

@shunping
Copy link
Collaborator Author

Internal tests are green. Merging.

@shunping shunping merged commit bb9ab00 into apache:master Aug 14, 2025
97 of 98 checks passed
parveensania pushed a commit to parveensania/beam-dp that referenced this pull request Aug 17, 2025
* Fix a bug in cogbk for not using registered coder.

* Allow custom tag type in typehint.
DKER2 pushed a commit to DKER2/beam that referenced this pull request Aug 20, 2025
* Fix a bug in cogbk for not using registered coder.

* Allow custom tag type in typehint.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants