Skip to content

Commit f2d0dc8

Browse files
authored
Improve LogElements to show pane_info and timestamps in seconds. (apache#35387)
* Improve LogElements to show pane_info and timestamps in seconds. * Rename the new argument to use_epoch. Adjust argument order.
1 parent 01411e7 commit f2d0dc8

File tree

2 files changed

+79
-9
lines changed

2 files changed

+79
-9
lines changed

sdks/python/apache_beam/transforms/util.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,30 +1445,49 @@ class LogElements(PTransform):
14451445
level: (optional) The logging level for the output (e.g. `logging.DEBUG`,
14461446
`logging.INFO`, `logging.WARNING`, `logging.ERROR`). If not specified,
14471447
the log is printed to stdout.
1448+
with_pane_info (bool): (optional) Whether to include element's pane info.
1449+
use_epoch_time (bool): (optional) Whether to display epoch timestamps.
14481450
"""
14491451
class _LoggingFn(DoFn):
14501452
def __init__(
1451-
self, prefix='', with_timestamp=False, with_window=False, level=None):
1453+
self,
1454+
prefix='',
1455+
with_timestamp=False,
1456+
with_window=False,
1457+
level=None,
1458+
with_pane_info=False,
1459+
use_epoch_time=False):
14521460
super().__init__()
14531461
self.prefix = prefix
14541462
self.with_timestamp = with_timestamp
14551463
self.with_window = with_window
14561464
self.level = level
1465+
self.with_pane_info = with_pane_info
1466+
self.use_epoch_time = use_epoch_time
1467+
1468+
def format_timestamp(self, timestamp):
1469+
if self.use_epoch_time:
1470+
return timestamp.seconds()
1471+
return timestamp.to_rfc3339()
14571472

14581473
def process(
14591474
self,
14601475
element,
14611476
timestamp=DoFn.TimestampParam,
14621477
window=DoFn.WindowParam,
1478+
pane_info=DoFn.PaneInfoParam,
14631479
**kwargs):
14641480
log_line = self.prefix + str(element)
14651481

14661482
if self.with_timestamp:
1467-
log_line += ', timestamp=' + repr(timestamp.to_rfc3339())
1483+
log_line += ', timestamp=' + repr(self.format_timestamp(timestamp))
14681484

14691485
if self.with_window:
1470-
log_line += ', window(start=' + window.start.to_rfc3339()
1471-
log_line += ', end=' + window.end.to_rfc3339() + ')'
1486+
log_line += ', window(start=' + str(self.format_timestamp(window.start))
1487+
log_line += ', end=' + str(self.format_timestamp(window.end)) + ')'
1488+
1489+
if self.with_pane_info:
1490+
log_line += ', pane_info=' + repr(pane_info)
14721491

14731492
if self.level == logging.DEBUG:
14741493
logging.debug(log_line)
@@ -1491,17 +1510,28 @@ def __init__(
14911510
prefix='',
14921511
with_timestamp=False,
14931512
with_window=False,
1494-
level=None):
1513+
level=None,
1514+
with_pane_info=False,
1515+
use_epoch_time=False,
1516+
):
14951517
super().__init__(label)
14961518
self.prefix = prefix
14971519
self.with_timestamp = with_timestamp
14981520
self.with_window = with_window
1521+
self.with_pane_info = with_pane_info
1522+
self.use_epoch_time = use_epoch_time
14991523
self.level = level
15001524

15011525
def expand(self, input):
15021526
return input | ParDo(
15031527
self._LoggingFn(
1504-
self.prefix, self.with_timestamp, self.with_window, self.level))
1528+
self.prefix,
1529+
self.with_timestamp,
1530+
self.with_window,
1531+
self.level,
1532+
self.with_pane_info,
1533+
self.use_epoch_time,
1534+
))
15051535

15061536

15071537
class Reify(object):

sdks/python/apache_beam/transforms/util_test.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,7 +1613,10 @@ def _capture_stdout_log(request, capsys):
16131613
])
16141614
| beam.WindowInto(FixedWindows(60))
16151615
| util.LogElements(
1616-
prefix='prefix_', with_window=True, with_timestamp=True))
1616+
prefix='prefix_',
1617+
with_window=True,
1618+
with_timestamp=True,
1619+
with_pane_info=True))
16171620

16181621
request.captured_stdout = capsys.readouterr().out
16191622
return result
@@ -1622,9 +1625,46 @@ def _capture_stdout_log(request, capsys):
16221625
def test_stdout_logs(self):
16231626
assert self.captured_stdout == \
16241627
("prefix_event, timestamp='2022-10-01T00:00:00Z', "
1625-
"window(start=2022-10-01T00:00:00Z, end=2022-10-01T00:01:00Z)\n"
1628+
"window(start=2022-10-01T00:00:00Z, end=2022-10-01T00:01:00Z), "
1629+
"pane_info=PaneInfo(first: True, last: True, timing: UNKNOWN, "
1630+
"index: 0, nonspeculative_index: 0)\n"
16261631
"prefix_event, timestamp='2022-10-02T00:00:00Z', "
1627-
"window(start=2022-10-02T00:00:00Z, end=2022-10-02T00:01:00Z)\n"), \
1632+
"window(start=2022-10-02T00:00:00Z, end=2022-10-02T00:01:00Z), "
1633+
"pane_info=PaneInfo(first: True, last: True, timing: UNKNOWN, "
1634+
"index: 0, nonspeculative_index: 0)\n"), \
1635+
f'Received from stdout: {self.captured_stdout}'
1636+
1637+
@pytest.fixture(scope="function")
1638+
def _capture_stdout_log_without_rfc3339(request, capsys):
1639+
with TestPipeline() as p:
1640+
result = (
1641+
p | beam.Create([
1642+
TimestampedValue(
1643+
"event",
1644+
datetime(2022, 10, 1, 0, 0, 0, 0,
1645+
tzinfo=pytz.UTC).timestamp()),
1646+
TimestampedValue(
1647+
"event",
1648+
datetime(2022, 10, 2, 0, 0, 0, 0,
1649+
tzinfo=pytz.UTC).timestamp()),
1650+
])
1651+
| beam.WindowInto(FixedWindows(60))
1652+
| util.LogElements(
1653+
prefix='prefix_',
1654+
with_window=True,
1655+
with_timestamp=True,
1656+
use_epoch_time=True))
1657+
1658+
request.captured_stdout = capsys.readouterr().out
1659+
return result
1660+
1661+
@pytest.mark.usefixtures("_capture_stdout_log_without_rfc3339")
1662+
def test_stdout_logs_without_rfc3339(self):
1663+
assert self.captured_stdout == \
1664+
("prefix_event, timestamp=1664582400, "
1665+
"window(start=1664582400, end=1664582460)\n"
1666+
"prefix_event, timestamp=1664668800, "
1667+
"window(start=1664668800, end=1664668860)\n"), \
16281668
f'Received from stdout: {self.captured_stdout}'
16291669

16301670
def test_ptransform_output(self):

0 commit comments

Comments
 (0)