Skip to content

Commit 911ec6a

Browse files
Add filter() method to PCollection in TypeScript SDK (#37408)
- Add filter() method following same pattern as map() and flatMap() - Add 5 test cases covering various filtering scenarios - Support optional context parameter for side inputs
1 parent f5af98b commit 911ec6a

File tree

2 files changed

+94
-0
lines changed

2 files changed

+94
-0
lines changed

sdks/typescript/src/apache_beam/pvalue.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,55 @@ export class PCollection<T> {
163163
);
164164
}
165165

166+
/**
167+
* Returns a PCollection containing only elements that satisfy the given
168+
* predicate function.
169+
*
170+
* This is analogous to JavaScript's `Array.filter()` method.
171+
*
172+
* Example usage:
173+
* ```
174+
* const evens = pcoll.filter(x => x % 2 === 0);
175+
* const positives = pcoll.filter(x => x > 0);
176+
* ```
177+
*
178+
* @param fn A predicate function that returns true for elements to keep,
179+
* false for elements to filter out. The function receives the
180+
* element and optionally a context object.
181+
* @param context Optional context object to pass to the predicate function.
182+
* @returns A new PCollection containing only the elements for which the
183+
* predicate returned true.
184+
*/
185+
filter<ContextT extends Object | undefined = undefined>(
186+
fn:
187+
| (ContextT extends undefined ? (element: T) => boolean : never)
188+
| ((element: T, context: ContextT) => boolean),
189+
context: ContextT = undefined!,
190+
): PCollection<T> {
191+
if (extractContext(fn)) {
192+
context = { ...extractContext(fn), ...context };
193+
}
194+
return this.apply(
195+
withName(
196+
"filter(" + extractName(fn) + ")",
197+
parDo<T, T, ContextT>(
198+
{
199+
process: function (element: T, context: ContextT) {
200+
// Return the element wrapped in an array if predicate is true,
201+
// otherwise return an empty array to filter it out.
202+
const keep =
203+
context === null || context === undefined
204+
? (fn as (element: T) => boolean)(element)
205+
: fn(element, context);
206+
return keep ? [element] : [];
207+
},
208+
},
209+
context,
210+
),
211+
),
212+
);
213+
}
214+
166215
root(): Root {
167216
return new Root(this.pipeline);
168217
}

sdks/typescript/test/primitives_test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,51 @@ export function suite(runner: beam.Runner = directRunner()) {
9696
});
9797
});
9898

99+
it("runs a filter", async function () {
100+
await runner.run((root) => {
101+
root
102+
.apply(beam.create([1, 2, 3, 4, 5, 6]))
103+
.filter((x) => x % 2 === 0)
104+
.apply(testing.assertDeepEqual([2, 4, 6]));
105+
});
106+
});
107+
108+
it("runs a filter with predicate returning false for all", async function () {
109+
await runner.run((root) => {
110+
root
111+
.apply(beam.create([1, 3, 5, 7]))
112+
.filter((x) => x % 2 === 0)
113+
.apply(testing.assertDeepEqual([]));
114+
});
115+
});
116+
117+
it("runs a filter with predicate returning true for all", async function () {
118+
await runner.run((root) => {
119+
root
120+
.apply(beam.create([2, 4, 6, 8]))
121+
.filter((x) => x % 2 === 0)
122+
.apply(testing.assertDeepEqual([2, 4, 6, 8]));
123+
});
124+
});
125+
126+
it("runs a filter with context", async function () {
127+
await runner.run((root) => {
128+
root
129+
.apply(beam.create([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
130+
.filter((x: number, threshold: number) => x > threshold, 5)
131+
.apply(testing.assertDeepEqual([6, 7, 8, 9, 10]));
132+
});
133+
});
134+
135+
it("runs a filter on strings", async function () {
136+
await runner.run((root) => {
137+
root
138+
.apply(beam.create(["apple", "banana", "apricot", "cherry"]))
139+
.filter((s) => s.startsWith("a"))
140+
.apply(testing.assertDeepEqual(["apple", "apricot"]));
141+
});
142+
});
143+
99144
it("runs a Splitter", async function () {
100145
await runner.run((root) => {
101146
const pcolls = root

0 commit comments

Comments
 (0)