Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions packages/data/src/cluster/ClusterIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ export interface ClusterIndex<F> {
aggregateFlows: (
flows: F[],
zoom: number,
{getFlowOriginId, getFlowDestId, getFlowMagnitude}: FlowAccessors<F>,
{
getFlowOriginId,
getFlowDestId,
getFlowMagnitude,
getFlowAggFunc,
}: FlowAccessors<F>,
options?: {
flowCountsMapReduce?: FlowCountsMapReduce<F>;
},
Expand Down Expand Up @@ -165,20 +170,24 @@ export function buildIndex<F>(clusterLevels: ClusterLevels): ClusterIndex<F> {
aggregateFlows: (
flows,
zoom,
{getFlowOriginId, getFlowDestId, getFlowMagnitude},
{getFlowOriginId, getFlowDestId, getFlowMagnitude, getFlowAggFunc},
options = {},
) => {
if (zoom > maxZoom) {
return flows;
}
if (!getFlowAggFunc) {
getFlowAggFunc = (flowValues: number[]) =>
flowValues.reduce((a, b) => a + b, 0);
}
const result: (F | AggregateFlow)[] = [];
const aggFlowsByKey = new Map<string, AggregateFlow>();
const makeKey = (origin: string | number, dest: string | number) =>
`${origin}:${dest}`;
const {
flowCountsMapReduce = {
map: getFlowMagnitude,
reduce: (acc: any, count: number) => (acc || 0) + count,
reduce: getFlowAggFunc,
},
} = options;
for (const flow of flows) {
Expand All @@ -197,13 +206,14 @@ export function buildIndex<F>(clusterLevels: ClusterLevels): ClusterIndex<F> {
dest: destCluster,
count: flowCountsMapReduce.map(flow),
aggregate: true,
values: [flowCountsMapReduce.map(flow)],
};
result.push(aggregateFlow);
aggFlowsByKey.set(key, aggregateFlow);
} else {
aggregateFlow.values.push(flowCountsMapReduce.map(flow));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you add values to flow here? It will likely significantly increase the memory use for the resulting flows data structure.

Copy link
Author

@arashkav arashkav Jan 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to explain why I picked this approach. Previously, you applied a map-reduce approach which means summing up every new counts added to a cluster in each iteration. This approach works fine when our aggregation function is 'sum'.

The same can not be achieved if we wanna 'average'. To explain, say we have three edges with values of 10,5 and 8. If we apply the same approach for sum and just average them every time 'reduce' function is called, we get a different number than a real average.
(((10+5)/2) + 8)/2 != (10+5+8)/3

This approach, despite your concern, gives the developer full flexibility on what aggregation function to be used (e.x. weighted sum, logarithmic, exponential, etc.).

To avoid any performance loss with normal 'sum' aggregation, I change the code to use the previous 'reduce' function (and stop pushing values into the array) if getFlowAggFunc is not defined.

I also consider replacing arrays with a better performing function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I get that. I was more concerned with the memory footprint of the resulting data structure. In line 209 we add the new values property to the flow object, so the resulting data structure will keep more data than necessary. It appears to me that values is only used as a temporary accumulator for the flow counts, so we don't need to keep it forever.

Another issue with your proposed approach is that the reduce function is called every time a new value is added. This might slow the calculations down unnecessarily esp. if the aggregation function is costly.

Maybe we can instead accumulate the values in a separate temporary map similar to aggFlowsByKey, e.g.:

const aggFlowCountsByKey = new Map<string, number[]>();

After iterating over all flows we can call flowCountsMapReduce.reduce once for each of them and save the results to the flow counts. Then, we can leave aggFlowCountsByKey behind to be garbage collected.

Or alternatively, we can use your accumulation approach, but give the resulting array another pass at the end in which we calculate the averages from the values (calling the agg function only once per flow), add them as counts to the flows and delete the values property from the results. Maybe that's simpler.

Copy link
Author

@arashkav arashkav Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the changes following the proposed approach. Added node aggregation feature as well. Please let me know if they work better now.

To be able to allow weighted aggregation I added another argument that allows to determine the weight attribute. Example would be we have a performance metric and we wanna weigthed sum it considering the volume(flow).
getFlowMagnitude: (flow) => flow.metric_1,
getFlowAggWeight: (flow) => flow.count,
getFlowAggFunc: (values) => values.reduce((accumulator, curr:any) => accumulator + curr.aggvalue*curr.aggweight, 0)
/values.reduce((acc,cur:any)=>acc + cur.aggweight,0),

The example app seem to be working quite responsive as the memory footprint dropped dramatically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably aggregateFlow.values.push(flowCountsMapReduce.map(flow)) is not doing what was intended:

Copy link
Author

@arashkav arashkav Jan 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest commit, I made the required changes to apply the plugged aggregation function on the node clustering and volume function.

aggregateFlow.count = flowCountsMapReduce.reduce(
aggregateFlow.count,
flowCountsMapReduce.map(flow),
aggregateFlow.values,
);
}
}
Expand Down
6 changes: 5 additions & 1 deletion packages/data/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ export interface ViewState {
export type FlowAccessor<F, T> = (flow: F) => T; // objectInfo?: AccessorObjectInfo,
export type LocationAccessor<L, T> = (location: L) => T;

export type FlowAggregatorFunc<V, T> = (values: V) => T;

export interface FlowAccessors<F> {
getFlowOriginId: FlowAccessor<F, string | number>;
getFlowDestId: FlowAccessor<F, string | number>;
getFlowMagnitude: FlowAccessor<F, number>;
getFlowTime?: FlowAccessor<F, Date>; // TODO: use number instead of Date
// getFlowColor?: FlowAccessor<string | undefined>;
getFlowAggFunc: FlowAggregatorFunc<number[], number>;
}

export interface LocationAccessors<L> {
Expand Down Expand Up @@ -115,6 +118,7 @@ export interface AggregateFlow {
dest: string | number;
count: number;
aggregate: true;
values: number[];
}

export function isAggregateFlow(
Expand All @@ -131,7 +135,7 @@ export function isAggregateFlow(

export interface FlowCountsMapReduce<F, T = any> {
map: (flow: F) => T;
reduce: (accumulated: T, val: T) => T;
reduce: (values: T) => T;
}

export enum LocationFilterMode {
Expand Down