Skip to content

Commit c650ff6

Browse files
destinationPush to utils and type updates
1 parent d55ec09 commit c650ff6

File tree

22 files changed

+183
-240
lines changed

22 files changed

+183
-240
lines changed

packages/destinations/node/aws/src/push.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ export const push: PushFn = async function (event, config) {
66

77
if (firehose) pushFirehose([{ event }], firehose);
88

9-
return { queue: [] }; // @TODO
9+
return;
1010
};

packages/destinations/node/aws/src/types/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import type { DestinationNode } from '@elbwalker/source-node';
2-
import type { Handler } from '@elbwalker/types';
2+
import type {
3+
Handler,
4+
Destination as WalkerOSDestination,
5+
} from '@elbwalker/types';
36
import type {
47
FirehoseClient,
58
FirehoseClientConfig,
@@ -10,7 +13,7 @@ export interface Destination
1013
init: InitFn;
1114
}
1215

13-
export type PushFn = DestinationNode.PushFn<Custom, CustomEvent>;
16+
export type PushFn = WalkerOSDestination.PushFn<Custom, CustomEvent>;
1417
export type InitFn = DestinationNode.InitFn<PartialConfig, Config>;
1518

1619
export type Config = {

packages/destinations/node/bigquery/src/push.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export const push: PushFn = async function (event, config) {
88

99
await client.dataset(datasetId).table(tableId).insert(rows);
1010

11-
return { queue: [] };
11+
return;
1212
};
1313

1414
export const mapEvent = (event: WalkerOS.Event): Row => {

packages/destinations/node/bigquery/src/types/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import type { DestinationNode } from '@elbwalker/source-node';
2-
import type { Handler } from '@elbwalker/types';
2+
import type {
3+
Handler,
4+
Destination as WalkerOSDestination,
5+
} from '@elbwalker/types';
36
import type { BigQuery, BigQueryOptions } from '@google-cloud/bigquery';
47

58
export interface Destination
69
extends DestinationNode.Destination<Custom, CustomEvent> {
710
init: InitFn;
811
}
912

10-
export type PushFn = DestinationNode.PushFn<Custom, CustomEvent>;
13+
export type PushFn = WalkerOSDestination.PushFn<Custom, CustomEvent>;
1114
export type InitFn = DestinationNode.InitFn<PartialConfig, Config>;
1215

1316
export type Config = {

packages/destinations/node/etag/src/push.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { getParameters } from '@elbwalker/destination-core-etag';
77
export const push: PushFn = async function (pushEvent, config) {
88
const { custom } = config;
99

10-
if (!custom) return {};
10+
if (!custom) return;
1111

1212
// @TODO
1313
let pageTitle;
@@ -33,8 +33,6 @@ export const push: PushFn = async function (pushEvent, config) {
3333
);
3434
}),
3535
);
36-
37-
return { queue: [] }; // @TODO
3836
};
3937

4038
async function sendRequest(custom: Custom, path: string, body?: string) {

packages/destinations/node/etag/src/types/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { DestinationNode } from '@elbwalker/source-node';
22
import type { DestinationCoreEtag } from '@elbwalker/destination-core-etag';
3+
import type { Destination as WalkerOSDestination } from '@elbwalker/types';
34

45
export interface Destination
56
extends DestinationNode.Destination<Custom, CustomEvent> {}
@@ -11,7 +12,7 @@ export type PartialConfig = DestinationNode.Config<
1112
Partial<CustomEvent>
1213
>;
1314

14-
export type PushFn = DestinationNode.PushFn<Custom, CustomEvent>;
15+
export type PushFn = WalkerOSDestination.PushFn<Custom, CustomEvent>;
1516

1617
export interface Custom extends DestinationCoreEtag.Config {}
1718

packages/destinations/node/meta/src/push.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export const push: PushFn = async function (event, config, mapping) {
3434

3535
return eventRequest.execute().then(
3636
() => {
37-
return {};
37+
return;
3838
},
3939
(err: unknown) => {
4040
throw err;

packages/destinations/node/meta/src/types/index.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import type { DestinationNode } from '@elbwalker/source-node';
2-
import type { Handler, Mapping } from '@elbwalker/types';
2+
import type {
3+
Handler,
4+
Mapping,
5+
Destination as WalkerOSDestination,
6+
} from '@elbwalker/types';
37

48
export interface Destination
59
extends DestinationNode.Destination<Custom, CustomEvent> {
610
init: InitFn;
711
}
812

9-
export type PushFn = DestinationNode.PushFn<Custom, CustomEvent>;
13+
export type PushFn = WalkerOSDestination.PushFn<Custom, CustomEvent>;
1014
export type InitFn = DestinationNode.InitFn<PartialConfig, Config>;
1115

1216
export type Config = {

packages/sources/node/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export function sourceNode(
2222
version,
2323
...state,
2424
push: (() => {}) as unknown as Elb.Fn, // Placeholder for the actual push function
25+
on: {}, // Initialize empty on handlers
2526
};
2627

2728
// Overwrite the push function with the instance-reference
Lines changed: 1 addition & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,5 @@
1-
import type { Destination, Mapping, WalkerOS } from '@elbwalker/types';
21
import type { SourceNode, DestinationNode } from '../types';
3-
import {
4-
debounce,
5-
getMappingEvent,
6-
getId,
7-
isSameType,
8-
useHooks,
9-
getMappingValue,
10-
isDefined,
11-
isObject,
12-
assign,
13-
} from '@elbwalker/utils';
2+
import { getId, isSameType } from '@elbwalker/utils';
143
import { pushToDestinations } from './push';
154

165
export async function addDestination(
@@ -66,80 +55,3 @@ export async function destinationInit(
6655

6756
return true; // Destination is ready to push
6857
}
69-
70-
function resolveMappingData(
71-
event: WalkerOS.Event,
72-
data?: Mapping.Data,
73-
): Destination.Data {
74-
if (!data) return;
75-
76-
return getMappingValue(event, data);
77-
}
78-
79-
export async function destinationPush(
80-
instance: SourceNode.Instance,
81-
destination: DestinationNode.Destination,
82-
event: WalkerOS.Event,
83-
): Promise<boolean> {
84-
const { config } = destination;
85-
const { eventMapping, mappingKey } = getMappingEvent(event, config.mapping);
86-
87-
let data = resolveMappingData(event, config.data);
88-
89-
if (eventMapping) {
90-
// Check if event should be processed or ignored
91-
if (eventMapping.ignore) return false;
92-
93-
// Check to use specific event names
94-
if (eventMapping.name) event.event = eventMapping.name;
95-
96-
// Transform event to a custom data
97-
if (eventMapping.data) {
98-
const dataEvent = resolveMappingData(event, eventMapping.data);
99-
data =
100-
isObject(data) && isObject(dataEvent) // Only merge objects
101-
? assign(data, dataEvent)
102-
: dataEvent;
103-
}
104-
}
105-
106-
const options = { data, instance };
107-
108-
if (eventMapping?.batch && destination.pushBatch) {
109-
const batched = eventMapping.batched || {
110-
key: mappingKey || '',
111-
events: [],
112-
data: [],
113-
};
114-
batched.events.push(event);
115-
if (isDefined(data)) batched.data.push(data);
116-
117-
eventMapping.batchFn =
118-
eventMapping.batchFn ||
119-
debounce((destination, instance) => {
120-
useHooks(
121-
destination.pushBatch!,
122-
'DestinationPushBatch',
123-
instance.hooks,
124-
)(batched, destination.config, options);
125-
126-
// Reset the batched queues
127-
// pushBatch isn't async yet, may cause trouble
128-
batched.events = [];
129-
batched.data = [];
130-
}, eventMapping.batch);
131-
132-
eventMapping.batched = batched;
133-
eventMapping.batchFn(destination, instance);
134-
} else {
135-
// It's time to go to the destination's side now
136-
await useHooks(destination.push, 'DestinationPush', instance.hooks)(
137-
event,
138-
destination.config,
139-
eventMapping,
140-
options,
141-
);
142-
}
143-
144-
return true;
145-
}

0 commit comments

Comments
 (0)