@@ -335,15 +335,17 @@ class CogroupStatement(StreamStatement):
335335 def __init__ (
336336 self ,
337337 stream : Stream ,
338- streams : Sequence [Tuple [Stream , Scalar ]],
338+ streams : Sequence [Tuple [Stream , Union [ Scalar , Sequence [ Scalar ], str ] ]],
339339 join_type : JoinType = JoinType .inner ,
340340 ) -> None :
341341 """Initializer
342342
343343 Args:
344344 stream: Stream containing this statement
345345 streams: List of tuples that each define the stream to combine and the
346- common field that will be used to combine results
346+ common field(s) that will be used to combine results. If there are no
347+ specific fields to group by, pass "all" as the second item in the stream
348+ tuple.
347349 join_type: Type of join that determines how records are included in the
348350 combined stream
349351
@@ -361,7 +363,18 @@ def __str__(self) -> str:
361363 streams = []
362364 for i , item in enumerate (self .streams ):
363365 stream , field_ = item
364- s = f"{ stream .ref } by { field_ } "
366+ if isinstance (field_ , Scalar ):
367+ groups = stringify (field_ )
368+ elif field_ == "all" :
369+ groups = "all"
370+ elif isinstance (field_ , Sequence ):
371+ groups = stringify_list (field_ )
372+ else :
373+ raise ValueError (
374+ f"Cogroup field type not supported. Provided: { field_ } "
375+ )
376+
377+ s = f"{ stream .ref } by { groups } "
365378 if i == 0 and self .join_type != JoinType .inner :
366379 s += f" { self .join_type } "
367380
@@ -432,8 +445,9 @@ def cogroup(
432445 """Combine data from two or more data streams into a single data stream
433446
434447 Args:
435- streams: Each item is a tuple of the stream to combine and the common field
436- that will be used to combine results
448+ streams: Each item is a tuple of the stream to combine and the common field(s)
449+ that will be used to combine results. If there are no specific fields to
450+ group by, pass "all" as the second item in the stream tuple.
437451 join_type: Type of join that determines how records are included in the
438452 combined stream. Defaults to JoinType.inner.
439453
0 commit comments