Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import dis
from enum import Enum
import functools
import hashlib
import io
import itertools
import logging
Expand Down Expand Up @@ -98,7 +99,7 @@
_DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
_DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary()
_DYNAMIC_CLASS_STATE_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
_DYNAMIC_CLASS_TRACKER_LOCK = threading.Lock()
_DYNAMIC_CLASS_TRACKER_LOCK = threading.RLock()

PYPY = platform.python_implementation() == "PyPy"

Expand Down Expand Up @@ -168,6 +169,7 @@ class CloudPickleConfig:


DEFAULT_CONFIG = CloudPickleConfig()
_GENERATING_SENTINEL = object()
builtin_code_type = None
if PYPY:
# builtin-code objects only exist in pypy
Expand All @@ -179,10 +181,21 @@ class CloudPickleConfig:
def _get_or_create_tracker_id(class_def, id_generator):
with _DYNAMIC_CLASS_TRACKER_LOCK:
class_tracker_id = _DYNAMIC_CLASS_TRACKER_BY_CLASS.get(class_def)
if class_tracker_id is _GENERATING_SENTINEL and id_generator:
raise RuntimeError(
f"Recursive ID generation detected for {class_def}. "
f"The id_generator cannot recursively request an ID for the same class."
)

if class_tracker_id is None and id_generator is not None:
class_tracker_id = id_generator(class_def)
_DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
_DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
_DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = _GENERATING_SENTINEL
try:
class_tracker_id = id_generator(class_def)
_DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
_DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
except:
_DYNAMIC_CLASS_TRACKER_BY_CLASS.pop(class_def, None)
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a bare except: can catch more exceptions than intended, such as SystemExit or KeyboardInterrupt, which can hide bugs or make it harder to stop the program. It's better to be more specific and catch Exception instead to avoid unintentionally swallowing system-level exceptions.

Suggested change
except:
_DYNAMIC_CLASS_TRACKER_BY_CLASS.pop(class_def, None)
raise
except Exception:
_DYNAMIC_CLASS_TRACKER_BY_CLASS.pop(class_def, None)
raise

return class_tracker_id


Expand Down Expand Up @@ -1720,3 +1733,10 @@ def dumps(

# Backward compat alias.
CloudPickler = Pickler


def hash_dynamic_classdef(classdef):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This is seemingly unused?

  2. Do we have to worry about (unlikely) hash collisions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used by flume runner. It is tested in the cloud pickle repo.

Conflicts would be bad, should we try to detect?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although conflicts should result in unpickling ereor

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the original uuid approach can also cause collision?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd worry if the errors are silenced.
Is detecting cheap? If so, that would help with a cleaner error message. Or if we believe this is really rare, we could add some comment in the codepath that will error-out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The goal is to make this available for use and not called yet as an alternative to id_generator within CPConfig.

  2. I feel like since we are using SHA-256 which is pretty collision-resistant, and because a single Beam pipeline will likely have a wide array of variability/complexity in class definitions, the odds are extremely low.

"""Generates a deterministic ID by hashing the pickled class definition."""
hexidgest = hashlib.sha256(
dumps(classdef, config=CloudPickleConfig(id_generator=None))).hexdigest()
return hexidgest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a typo in the variable name hexidgest; it should be hexdigest.

Additionally, this change removes the trailing newline from the file. According to PEP 8, files should end with a single newline. Please add one.

Suggested change
hexidgest = hashlib.sha256(
dumps(classdef, config=CloudPickleConfig(id_generator=None))).hexdigest()
return hexidgest
hexdigest = hashlib.sha256(
dumps(classdef, config=CloudPickleConfig(id_generator=None))).hexdigest()
return hexdigest

Loading