Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/data/src/FlowmapAggregateAccessors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import {
isLocationClusterNode,
} from './types';

export type aggFunctionVars = {
aggvalue: number;
aggweight: number;
};

export default class FlowmapAggregateAccessors<L, F> {
private accessors: FlowmapDataAccessors<L, F>;
constructor(accessors: FlowmapDataAccessors<L, F>) {
Expand Down Expand Up @@ -65,6 +70,18 @@ export default class FlowmapAggregateAccessors<L, F> {
return isAggregateFlow(f) ? f.count : this.accessors.getFlowMagnitude(f);
};

getFlowAggWeight = (f: F | AggregateFlow) => {
return !this.accessors.getFlowAggWeight
? undefined
: this.accessors.getFlowAggWeight(f);
};

getFlowAggFunc = (f: aggFunctionVars[] = []) => {
return !this.accessors.getFlowAggFunc
? undefined
: this.accessors.getFlowAggFunc(f);
};

// Note: Aggregate flows have no time
getFlowTime = (f: F) => {
const {getFlowTime} = this.accessors;
Expand Down
118 changes: 106 additions & 12 deletions packages/data/src/FlowmapSelectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ import {
LayersData,
LocationFilterMode,
LocationTotals,
FlowAggregatorFunc,
} from './types';

const MAX_CLUSTER_ZOOM_LEVEL = 20;
type KDBushTree = any;

export type aggFunctionVars = {
aggvalue: number;
aggweight: number;
};

export type Selector<L, F, T> = ParametricSelector<
FlowmapState,
FlowmapData<L, F>,
Expand Down Expand Up @@ -700,20 +706,108 @@ export default class FlowmapSelectors<L, F> {
if (d.internalCount != null) rv.internalCount += d.internalCount;
return rv;
};
for (const f of flows) {
if (
this.isFlowInSelection(f, selectedLocationsSet, locationFilterMode)
) {
const originId = this.accessors.getFlowOriginId(f);
const destId = this.accessors.getFlowDestId(f);
const count = this.accessors.getFlowMagnitude(f);
if (originId === destId) {
totals.set(originId, add(originId, {internalCount: count}));
} else {
totals.set(originId, add(originId, {outgoingCount: count}));
totals.set(destId, add(destId, {incomingCount: count}));

const aggFuncAdd = (
id: string | number,
d: Partial<LocationTotals>,
): LocationTotals => {
const rv = totals.get(id) ?? {
incomingCount: 0,
outgoingCount: 0,
internalCount: 0,
};
if (d.incomingCount != null) rv.incomingCount = d.incomingCount;
if (d.outgoingCount != null) rv.outgoingCount = d.outgoingCount;
if (d.internalCount != null) rv.internalCount = d.internalCount;
return rv;
};

if (this.accessors.getFlowAggFunc() === undefined) {
for (const f of flows) {
if (
this.isFlowInSelection(f, selectedLocationsSet, locationFilterMode)
) {
const originId = this.accessors.getFlowOriginId(f);
const destId = this.accessors.getFlowDestId(f);
const count = this.accessors.getFlowMagnitude(f);
if (originId === destId) {
totals.set(originId, add(originId, {internalCount: count}));
} else {
totals.set(originId, add(originId, {outgoingCount: count}));
totals.set(destId, add(destId, {incomingCount: count}));
}
}
}
} else {
const flowDestValues = new Map<string, aggFunctionVars[]>();
const flowOriginValues = new Map<string, aggFunctionVars[]>();
const flowInternalValues = new Map<string, aggFunctionVars[]>();
for (const f of flows) {
if (
this.isFlowInSelection(f, selectedLocationsSet, locationFilterMode)
) {
const originId = this.accessors.getFlowOriginId(f).toString();
const destId = this.accessors.getFlowDestId(f).toString();
const count = this.accessors.getFlowMagnitude(f);
const aggweightmap =
this.accessors.getFlowAggWeight === undefined
? this.accessors.getFlowMagnitude(f)
: this.accessors.getFlowAggWeight(f);
if (originId === destId) {
const interval = flowInternalValues.get(originId);
if (!interval) {
flowInternalValues.set(originId, [
{aggvalue: count, aggweight: aggweightmap},
]);
} else {
interval.push({aggvalue: count, aggweight: aggweightmap});
}
} else {
const destval = flowDestValues.get(destId);
if (!destval) {
flowDestValues.set(destId, [
{aggvalue: count, aggweight: aggweightmap},
]);
} else {
destval.push({aggvalue: count, aggweight: aggweightmap});
}

const originval = flowOriginValues.get(originId);
if (!originval) {
flowOriginValues.set(originId, [
{aggvalue: count, aggweight: aggweightmap},
]);
} else {
originval.push({aggvalue: count, aggweight: aggweightmap});
}
}
}
}

for (const [originId, values] of flowOriginValues.entries()) {
totals.set(
originId,
aggFuncAdd(originId, {
outgoingCount: this.accessors.getFlowAggFunc(values),
}),
);
}
for (const [destId, values] of flowDestValues.entries()) {
totals.set(
destId,
aggFuncAdd(destId, {
incomingCount: this.accessors.getFlowAggFunc(values),
}),
);
}
for (const [internalIds, values] of flowInternalValues.entries()) {
totals.set(
internalIds,
aggFuncAdd(internalIds, {
internalCount: this.accessors.getFlowAggFunc(values),
}),
);
}
}
return totals;
},
Expand Down
138 changes: 113 additions & 25 deletions packages/data/src/cluster/ClusterIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import {
} from './../types';
import {ascending, bisectLeft, extent} from 'd3-array';

export type aggFunctionVars = {
aggvalue: number;
aggweight: number;
};

export type LocationWeightGetter = (id: string | number) => number;

/**
Expand Down Expand Up @@ -53,9 +58,10 @@ export interface ClusterIndex<F> {
getFlowDestId,
getFlowMagnitude,
getFlowAggFunc,
getFlowAggWeight,
}: FlowAccessors<F>,
options?: {
flowCountsMapReduce?: FlowCountsMapReduce<F>;
flowCountsMapReduce?: FlowCountsMapReduce<F, F>;
},
) => (F | AggregateFlow)[];
}
Expand Down Expand Up @@ -170,24 +176,30 @@ export function buildIndex<F>(clusterLevels: ClusterLevels): ClusterIndex<F> {
aggregateFlows: (
flows,
zoom,
{getFlowOriginId, getFlowDestId, getFlowMagnitude, getFlowAggFunc},
{
getFlowOriginId,
getFlowDestId,
getFlowMagnitude,
getFlowAggFunc,
getFlowAggWeight,
},
options = {},
) => {
if (zoom > maxZoom) {
return flows;
}
if (!getFlowAggFunc) {
getFlowAggFunc = (flowValues: number[]) =>
flowValues.reduce((a, b) => a + b, 0);
}
const result: (F | AggregateFlow)[] = [];
const aggFlowsByKey = new Map<string, AggregateFlow>();
const aggFlowCountsByKey = new Map<string, aggFunctionVars[]>();
const makeKey = (origin: string | number, dest: string | number) =>
`${origin}:${dest}`;
const {
flowCountsMapReduce = {
map: getFlowMagnitude,
reduce: getFlowAggFunc,
aggweightmap: !getFlowAggWeight ? getFlowMagnitude : getFlowAggWeight,
reduce: !getFlowAggFunc
? (acc: any, count: number) => (acc || 0) + count
: getFlowAggFunc,
},
} = options;
for (const flow of flows) {
Expand All @@ -206,43 +218,119 @@ export function buildIndex<F>(clusterLevels: ClusterLevels): ClusterIndex<F> {
dest: destCluster,
count: flowCountsMapReduce.map(flow),
aggregate: true,
values: [flowCountsMapReduce.map(flow)],
};
result.push(aggregateFlow);
aggFlowsByKey.set(key, aggregateFlow);
aggFlowCountsByKey.set(key, [
{
aggvalue: flowCountsMapReduce.map(flow),
aggweight: flowCountsMapReduce.aggweightmap(flow),
},
]);
} else {
aggregateFlow.values.push(flowCountsMapReduce.map(flow));
aggregateFlow.count = flowCountsMapReduce.reduce(
aggregateFlow.values,
);
if (!getFlowAggFunc) {
aggregateFlow.count = flowCountsMapReduce.reduce(
aggregateFlow.count,
flowCountsMapReduce.map(flow),
);
} else {
const aggFlowsCounts = aggFlowCountsByKey.get(key);
if (!aggFlowsCounts) {
aggFlowCountsByKey.set(key, [
{
aggvalue: flowCountsMapReduce.map(flow),
aggweight: flowCountsMapReduce.aggweightmap(flow),
},
]);
} else {
aggFlowsCounts.push({
aggvalue: flowCountsMapReduce.map(flow),
aggweight: flowCountsMapReduce.aggweightmap(flow),
});
}
}
}
}
}
if (getFlowAggFunc !== undefined) {
for (const [key, aggregateFlow] of aggFlowsByKey.entries()) {
aggregateFlow.count = flowCountsMapReduce.reduce(
aggFlowCountsByKey.get(key),
null,
);
}
}

return result;
},
};
}

export function makeLocationWeightGetter<F>(
flows: F[],
{getFlowOriginId, getFlowDestId, getFlowMagnitude}: FlowAccessors<F>,
{
getFlowOriginId,
getFlowDestId,
getFlowMagnitude,
getFlowAggFunc,
getFlowAggWeight,
}: FlowAccessors<F>,
): LocationWeightGetter {
const locationTotals = {
incoming: new Map<string | number, number>(),
outgoing: new Map<string | number, number>(),
};
for (const flow of flows) {
const origin = getFlowOriginId(flow);
const dest = getFlowDestId(flow);
const count = getFlowMagnitude(flow);
locationTotals.incoming.set(
dest,
(locationTotals.incoming.get(dest) || 0) + count,
);
locationTotals.outgoing.set(
origin,
(locationTotals.outgoing.get(origin) || 0) + count,
);
const flowAggFunc = !getFlowAggFunc
? (acc: any, count: number) => (acc || 0) + count
: getFlowAggFunc;
if (!getFlowAggFunc) {
for (const flow of flows) {
const origin = getFlowOriginId(flow);
const dest = getFlowDestId(flow);
const count = getFlowMagnitude(flow);
locationTotals.incoming.set(
dest,
flowAggFunc(locationTotals.incoming.get(dest) || 0, count),
);
locationTotals.outgoing.set(
origin,
flowAggFunc(locationTotals.outgoing.get(origin) || 0, count),
);
}
} else {
const flowDestValues = new Map<string, aggFunctionVars[]>();
const flowOriginValues = new Map<string, aggFunctionVars[]>();
for (const flow of flows) {
const origin = getFlowOriginId(flow).toString();
const dest = getFlowDestId(flow).toString();
const count = getFlowMagnitude(flow);
const aggweightmap =
getFlowAggWeight === undefined
? getFlowMagnitude(flow)
: getFlowAggWeight(flow);

const destFlowVal = flowDestValues.get(dest);
if (!destFlowVal) {
flowDestValues.set(dest, [{aggvalue: count, aggweight: aggweightmap}]);
} else {
destFlowVal.push({aggvalue: count, aggweight: aggweightmap});
}

const originFlowVal = flowOriginValues.get(origin);
if (!originFlowVal) {
flowOriginValues.set(origin, [
{aggvalue: count, aggweight: aggweightmap},
]);
} else {
originFlowVal.push({aggvalue: count, aggweight: aggweightmap});
}
}
for (const [dest, values] of flowDestValues.entries()) {
locationTotals.incoming.set(dest, flowAggFunc(values, null));
}
for (const [origin, values] of flowOriginValues.entries()) {
locationTotals.outgoing.set(origin, flowAggFunc(values, null));
}
}
return (id: string | number) =>
Math.max(
Expand Down
5 changes: 3 additions & 2 deletions packages/data/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface FlowAccessors<F> {
getFlowTime?: FlowAccessor<F, Date>; // TODO: use number instead of Date
// getFlowColor?: FlowAccessor<string | undefined>;
getFlowAggFunc: FlowAggregatorFunc<number[], number>;
getFlowAggWeight: FlowAccessor<F, number>;
Copy link
Author

@arashkav arashkav Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new weight function that similar to the magnitude function is fed into flowmap. getFlowAggWeight provides the weight for each edge volume/count provided via getFlowMagnitude.

}

export interface LocationAccessors<L> {
Expand Down Expand Up @@ -118,7 +119,6 @@ export interface AggregateFlow {
dest: string | number;
count: number;
aggregate: true;
values: number[];
}

export function isAggregateFlow(
Expand All @@ -133,8 +133,9 @@ export function isAggregateFlow(
);
}

export interface FlowCountsMapReduce<F, T = any> {
export interface FlowCountsMapReduce<F, H, T = any> {
map: (flow: F) => T;
aggweightmap: (flow: H) => T;
reduce: (values: T) => T;
}

Expand Down