Skip to content

Commit 12fe912

Browse files
committed
4x ordinal limit;
1 parent 16a1e24 commit 12fe912

21 files changed

+441
-199
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
**.iws
1111
.idea
1212
venv
13+
.cursor
1314

1415
# publishing secrets
1516
secrets/signing-key

hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ abstract class AbstractHollowProducer {
102102
private final boolean allowTypeResharding;
103103
private final boolean forceCoverageOfTypeResharding; // exercise re-sharding often (for testing)
104104
private final Supplier<Boolean> ignoreSoftLimits;
105+
private final boolean partitionedOrdinalMap;
105106

106107
@Deprecated
107108
public AbstractHollowProducer(
@@ -110,7 +111,7 @@ public AbstractHollowProducer(
110111
this(new HollowFilesystemBlobStager(), publisher, announcer,
111112
Collections.emptyList(),
112113
new VersionMinterWithCounter(), null, 0,
113-
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, false, false, null,
114+
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, false, false, false, null,
114115
new DummyBlobStorageCleaner(), new BasicSingleProducerEnforcer(),
115116
null, true, HollowConsumer.UpdatePlanBlobVerifier.DEFAULT_INSTANCE, null);
116117
}
@@ -123,7 +124,7 @@ public AbstractHollowProducer(
123124
b.eventListeners,
124125
b.versionMinter, b.snapshotPublishExecutor,
125126
b.numStatesBetweenSnapshots, b.targetMaxTypeShardSize, b.focusHoleFillInFewestShards,
126-
b.allowTypeResharding, b.forceCoverageOfTypeResharding,
127+
b.allowTypeResharding, b.forceCoverageOfTypeResharding, b.partitionedOrdinalMap,
127128
b.metricsCollector, b.blobStorageCleaner, b.singleProducerEnforcer,
128129
b.hashCodeFinder, b.doIntegrityCheck, b.updatePlanBlobVerifier, b.ignoreSoftLimits);
129130
}
@@ -145,6 +146,7 @@ private AbstractHollowProducer(
145146
boolean focusHoleFillInFewestShards,
146147
boolean allowTypeResharding,
147148
boolean forceCoverageOfTypeResharding,
149+
boolean partitionedOrdinalMap,
148150
HollowMetricsCollector<HollowProducerMetrics> metricsCollector,
149151
HollowProducer.BlobStorageCleaner blobStorageCleaner,
150152
SingleProducerEnforcer singleProducerEnforcer,
@@ -166,6 +168,7 @@ private AbstractHollowProducer(
166168
this.forceCoverageOfTypeResharding = forceCoverageOfTypeResharding;
167169
this.focusHoleFillInFewestShards = focusHoleFillInFewestShards;
168170
this.ignoreSoftLimits = ignoreSoftLimits;
171+
this.partitionedOrdinalMap = partitionedOrdinalMap;
169172

170173
HollowWriteStateEngine writeEngine = hashCodeFinder == null
171174
? new HollowWriteStateEngine()
@@ -174,6 +177,7 @@ private AbstractHollowProducer(
174177
writeEngine.allowTypeResharding(allowTypeResharding);
175178
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
176179
writeEngine.setIgnoreOrdinalLimits(ignoreSoftLimits);
180+
writeEngine.setPartitionedOrdinalMap(partitionedOrdinalMap);
177181

178182
this.objectMapper = new HollowObjectMapper(writeEngine);
179183
if (hashCodeFinder != null) {
@@ -332,6 +336,7 @@ private HollowProducer.ReadState restore(
332336
writeEngine.allowTypeResharding(allowTypeResharding);
333337
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
334338
writeEngine.setIgnoreOrdinalLimits(ignoreSoftLimits);
339+
writeEngine.setPartitionedOrdinalMap(partitionedOrdinalMap);
335340
HollowWriteStateCreator.populateStateEngineWithTypeWriteStates(writeEngine, schemas);
336341
HollowObjectMapper newObjectMapper = new HollowObjectMapper(writeEngine);
337342
if (hashCodeFinder != null) {

hollow/src/main/java/com/netflix/hollow/api/producer/HollowProducer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,7 @@ public static class Builder<B extends HollowProducer.Builder<B>> {
751751
SingleProducerEnforcer singleProducerEnforcer = new BasicSingleProducerEnforcer();
752752
HollowObjectHashCodeFinder hashCodeFinder = null;
753753
boolean doIntegrityCheck = true;
754+
boolean partitionedOrdinalMap = false;
754755
ProducerOptionalBlobPartConfig optionalPartConfig = null;
755756
HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier = HollowConsumer.UpdatePlanBlobVerifier.DEFAULT_INSTANCE;
756757
Supplier<Boolean> ignoreSoftLimits = null;
@@ -970,6 +971,11 @@ public B noIntegrityCheck() {
970971
return (B) this;
971972
}
972973

974+
public B withPartitionedOrdinalMap(boolean partitionedOrdinalMap) {
975+
this.partitionedOrdinalMap = partitionedOrdinalMap;
976+
return (B) this;
977+
}
978+
973979
public B withUpdatePlanVerifier(HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier) {
974980
this.updatePlanBlobVerifier = updatePlanBlobVerifier;
975981
return (B) this;
@@ -996,6 +1002,13 @@ protected void checkArguments() {
9961002
// simple test case for when features are allowed to work together passes, see {@code testReshardingWithFocusHoleFillInFewestShards}
9971003
throw new IllegalArgumentException("Producer does not yet support using both re-sharding and focusHoleFillInFewestShards features in tandem");
9981004
}
1005+
/// TODO: remove this mutual exclusive cond
1006+
if (partitionedOrdinalMap == true && focusHoleFillInFewestShards == true) {
1007+
// partitionedOrdinalMap feature rollout.
1008+
// Changes in {@code FreeOrdinalTracker.sort(int)} and more testings are needed before enabling these
1009+
// features together.
1010+
throw new IllegalArgumentException("Producer does not yet support using both partitioned ByteArrayOrdinalMap and focusHoleFillInFewestShards features in tandem");
1011+
}
9991012
if (stager != null && compressor != null) {
10001013
throw new IllegalArgumentException(
10011014
"Both a custom BlobStager and BlobCompressor were specified -- please specify only one of these.");

hollow/src/main/java/com/netflix/hollow/core/memory/ByteArrayOrdinalMap.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,20 @@ public int getOrAssignOrdinal(ByteDataArray serializedRepresentation, int prefer
168168
return ordinal != -1 ? ordinal : assignOrdinal(serializedRepresentation, hash, preferredOrdinal);
169169
}
170170

171+
/**
172+
* Adds a sequence of bytes to this map using a pre-computed hash.
173+
* This avoids re-computing the hash when it's already been calculated for routing purposes.
174+
*
175+
* @param serializedRepresentation the sequence of bytes
176+
* @param hash the pre-computed hash of the serialized representation
177+
* @param preferredOrdinal the preferred ordinal to assign
178+
* @return the assigned ordinal
179+
*/
180+
public int getOrAssignOrdinal(ByteDataArray serializedRepresentation, int hash, int preferredOrdinal) {
181+
int ordinal = get(serializedRepresentation, hash);
182+
return ordinal != -1 ? ordinal : assignOrdinal(serializedRepresentation, hash, preferredOrdinal);
183+
}
184+
171185
/// acquire the lock before writing.
172186
private synchronized int assignOrdinal(ByteDataArray serializedRepresentation, int hash, int preferredOrdinal) {
173187
if (preferredOrdinal < -1 || preferredOrdinal > ORDINAL_MASK) {
@@ -364,7 +378,15 @@ public int get(ByteDataArray serializedRepresentation) {
364378
return get(serializedRepresentation, HashCodes.hashCode(serializedRepresentation));
365379
}
366380

367-
private int get(ByteDataArray serializedRepresentation, int hash) {
381+
/**
382+
* Public method to get an ordinal using a pre-computed hash.
383+
* Used by HollowTypeWriteState for efficient multi-map routing.
384+
*
385+
* @param serializedRepresentation the serialized representation
386+
* @param hash the pre-computed hash of the serialized representation
387+
* @return the ordinal for this serialized representation, or -1 if not found
388+
*/
389+
public int get(ByteDataArray serializedRepresentation, int hash) {
368390
AtomicLongArray pao = pointersAndOrdinals;
369391

370392
int modBitmask = pao.length() - 1;
@@ -436,9 +458,9 @@ public int prepareForWrite() {
436458
* the key array to reflect the new pointers and exclude the removed entries. This is also where ordinals
437459
* which are unused are returned to the pool.<p>
438460
*
439-
* @param usedOrdinals a bit set representing the ordinals which are currently referenced by any image.
461+
* @param usedGlobalOrdinals a bit set representing the ordinals which are currently referenced.
440462
*/
441-
public void compact(ThreadSafeBitSet usedOrdinals, int numShards, boolean focusHoleFillInFewestShards) {
463+
public void compact(ThreadSafeBitSet usedGlobalOrdinals, int numShards, boolean focusHoleFillInFewestShards, int mapIdx, int mapIndexBits) {
442464
long[] populatedReverseKeys = new long[size];
443465

444466
int counter = 0;
@@ -456,10 +478,12 @@ public void compact(ThreadSafeBitSet usedOrdinals, int numShards, boolean focusH
456478
SegmentedByteArray arr = byteData.getUnderlyingArray();
457479
long currentCopyPointer = 0;
458480

481+
int currMapUsedOrdinalCnt = 0;
459482
for (int i = 0; i < populatedReverseKeys.length; i++) {
460483
int ordinal = (int) (populatedReverseKeys[i] & ORDINAL_MASK);
484+
int globalOrdinal = (ordinal << mapIndexBits) | mapIdx;
461485

462-
if (usedOrdinals.get(ordinal)) {
486+
if (usedGlobalOrdinals.get(globalOrdinal)) {
463487
long pointer = populatedReverseKeys[i] >>> BITS_PER_ORDINAL;
464488
int length = VarInt.readVInt(arr, pointer);
465489
length += VarInt.sizeOfVInt(length);
@@ -471,6 +495,7 @@ public void compact(ThreadSafeBitSet usedOrdinals, int numShards, boolean focusH
471495
populatedReverseKeys[i] = populatedReverseKeys[i] << BITS_PER_POINTER | currentCopyPointer;
472496

473497
currentCopyPointer += length;
498+
currMapUsedOrdinalCnt++;
474499
} else {
475500
freeOrdinalTracker.returnOrdinalToPool(ordinal);
476501
populatedReverseKeys[i] = EMPTY_BUCKET_VALUE;
@@ -480,6 +505,7 @@ public void compact(ThreadSafeBitSet usedOrdinals, int numShards, boolean focusH
480505
byteData.setPosition(currentCopyPointer);
481506

482507
if(focusHoleFillInFewestShards && numShards > 1)
508+
// TODO: fix to be compatible with multi-ByteArrayOrdinalMap
483509
freeOrdinalTracker.sort(numShards);
484510
else
485511
freeOrdinalTracker.sort();
@@ -491,7 +517,7 @@ public void compact(ThreadSafeBitSet usedOrdinals, int numShards, boolean focusH
491517
pao.lazySet(i, EMPTY_BUCKET_VALUE);
492518
}
493519
populateNewHashArray(pao, populatedReverseKeys);
494-
size = usedOrdinals.cardinality();
520+
size = currMapUsedOrdinalCnt;
495521

496522
pointersByOrdinal = null;
497523
unusedPreviousOrdinals = null;

hollow/src/main/java/com/netflix/hollow/core/util/HollowWriteStateCreator.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,20 +96,22 @@ public static void readSchemaFileIntoWriteState(String schemaFilePath, HollowWri
9696
*/
9797
public static void populateStateEngineWithTypeWriteStates(HollowWriteStateEngine stateEngine, Collection<HollowSchema> schemas) {
9898
List<HollowSchema> dependencyOrderedSchemas = getDependencyOrderedSchemas(schemas);
99+
boolean partitioned = stateEngine.isPartitionedOrdinalMap();
100+
99101
for(HollowSchema schema : dependencyOrderedSchemas) {
100102
if(stateEngine.getTypeState(schema.getName()) == null) {
101103
switch(schema.getSchemaType()) {
102104
case OBJECT:
103-
stateEngine.addTypeState(new HollowObjectTypeWriteState((HollowObjectSchema)schema, -1, stateEngine.getIgnoreOrdinalLimitsSupplier()));
105+
stateEngine.addTypeState(new HollowObjectTypeWriteState((HollowObjectSchema)schema, -1, partitioned, stateEngine.getIgnoreOrdinalLimitsSupplier()));
104106
break;
105107
case LIST:
106-
stateEngine.addTypeState(new HollowListTypeWriteState((HollowListSchema)schema, -1, stateEngine.getIgnoreOrdinalLimitsSupplier()));
108+
stateEngine.addTypeState(new HollowListTypeWriteState((HollowListSchema)schema, -1, partitioned, stateEngine.getIgnoreOrdinalLimitsSupplier()));
107109
break;
108110
case SET:
109-
stateEngine.addTypeState(new HollowSetTypeWriteState((HollowSetSchema)schema, -1, stateEngine.getIgnoreOrdinalLimitsSupplier()));
111+
stateEngine.addTypeState(new HollowSetTypeWriteState((HollowSetSchema)schema, -1, partitioned, stateEngine.getIgnoreOrdinalLimitsSupplier()));
110112
break;
111113
case MAP:
112-
stateEngine.addTypeState(new HollowMapTypeWriteState((HollowMapSchema)schema, -1, stateEngine.getIgnoreOrdinalLimitsSupplier()));
114+
stateEngine.addTypeState(new HollowMapTypeWriteState((HollowMapSchema)schema, -1, partitioned, stateEngine.getIgnoreOrdinalLimitsSupplier()));
113115
break;
114116
}
115117
}
@@ -204,7 +206,7 @@ public void run() {
204206

205207
BitSet populatedOrdinals = readState.getListener(PopulatedOrdinalListener.class).getPopulatedOrdinals();
206208

207-
writeState.resizeOrdinalMap(populatedOrdinals.cardinality());
209+
writeState.resizeOrdinalMaps(populatedOrdinals.cardinality() >> writeState.getOrdinalMapIndexBits());
208210
int ordinal = populatedOrdinals.nextSetBit(0);
209211
while(ordinal != -1) {
210212
HollowWriteRecord rec = copier.copy(ordinal);

hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,30 @@ public class HollowListTypeWriteState extends HollowTypeWriteState {
3333
private int bitsPerListPointer;
3434
private int revBitsPerListPointer;
3535
private int bitsPerElement;
36-
private long totalOfListSizes[];
37-
private long revTotalOfListSizes[];
36+
private long[] totalOfListSizes;
37+
private long[] revTotalOfListSizes;
3838

3939
/// data required for writing snapshot or delta
40-
private FixedLengthElementArray listPointerArray[];
41-
private FixedLengthElementArray elementArray[];
40+
private FixedLengthElementArray[] listPointerArray;
41+
private FixedLengthElementArray[] elementArray;
4242

4343
/// additional data required for writing delta
44-
private int numListsInDelta[];
45-
private long numElementsInDelta[];
46-
private ByteDataArray deltaAddedOrdinals[];
47-
private ByteDataArray deltaRemovedOrdinals[];
44+
private int[] numListsInDelta;
45+
private long[] numElementsInDelta;
46+
private ByteDataArray[] deltaAddedOrdinals;
47+
private ByteDataArray[] deltaRemovedOrdinals;
4848

4949
public HollowListTypeWriteState(HollowListSchema schema) {
5050
this(schema, -1);
5151
}
52-
52+
53+
@Deprecated
5354
public HollowListTypeWriteState(HollowListSchema schema, int numShards) {
54-
this(schema, numShards, null);
55+
this(schema, numShards, false, null);
5556
}
5657

57-
public HollowListTypeWriteState(HollowListSchema schema, int numShards, Supplier<Boolean> ignoreSoftLimits) {
58-
super(schema, numShards, ignoreSoftLimits);
58+
public HollowListTypeWriteState(HollowListSchema schema, int numShards, boolean usePartitionedOrdinalMap, Supplier<Boolean> ignoreSoftLimits) {
59+
super(schema, numShards, usePartitionedOrdinalMap, ignoreSoftLimits);
5960
}
6061

6162
@Override
@@ -74,7 +75,6 @@ public void prepareForWrite(boolean canReshard) {
7475
private void gatherStatistics(boolean numShardsChanged) {
7576

7677
int maxElementOrdinal = 0;
77-
ByteData data = ordinalMap.getByteData().getUnderlyingArray();
7878

7979
totalOfListSizes = new long[numShards];
8080
if (numShardsChanged) {
@@ -83,7 +83,8 @@ private void gatherStatistics(boolean numShardsChanged) {
8383

8484
for(int i=0;i<=maxOrdinal;i++) {
8585
if(currentCyclePopulated.get(i) || previousCyclePopulated.get(i)) {
86-
long pointer = ordinalMap.getPointerForData(i);
86+
long pointer = getPointerForData(i);
87+
ByteData data = getByteDataForOrdinal(i);
8788
int size = VarInt.readVInt(data, pointer);
8889

8990
pointer += VarInt.sizeOfVInt(size);
@@ -122,14 +123,13 @@ private void gatherStatistics(boolean numShardsChanged) {
122123

123124
@Override
124125
protected int typeStateNumShards(int maxOrdinal) {
125-
ByteData data = ordinalMap.getByteData().getUnderlyingArray();
126-
127126
long maxElementOrdinal = 0;
128127
long totalOfListSizes = 0;
129-
128+
130129
for(int i=0;i<=maxOrdinal;i++) {
131130
if(currentCyclePopulated.get(i) || previousCyclePopulated.get(i)) {
132-
long pointer = ordinalMap.getPointerForData(i);
131+
long pointer = getPointerForData(i);
132+
ByteData data = getByteDataForOrdinal(i);
133133
int size = VarInt.readVInt(data, pointer);
134134

135135
pointer += VarInt.sizeOfVInt(size);
@@ -167,17 +167,16 @@ public void calculateSnapshot() {
167167
elementArray[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, (long)bitsPerElement * totalOfListSizes[i]);
168168
}
169169

170-
ByteData data = ordinalMap.getByteData().getUnderlyingArray();
171-
172170
long elementCounter[] = new long[numShards];
173171
int shardMask = numShards - 1;
174172

175173
for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) {
176174
int shardNumber = ordinal & shardMask;
177175
int shardOrdinal = ordinal / numShards;
178-
176+
179177
if(currentCyclePopulated.get(ordinal)) {
180-
long readPointer = ordinalMap.getPointerForData(ordinal);
178+
long readPointer = getPointerForData(ordinal);
179+
ByteData data = getByteDataForOrdinal(ordinal);
181180

182181
int size = VarInt.readVInt(data, readPointer);
183182
readPointer += VarInt.sizeOfVInt(size);
@@ -263,21 +262,19 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet
263262
int addedOrdinal = deltaAdditions.nextSetBit(0);
264263
while(addedOrdinal != -1) {
265264
numListsInDelta[addedOrdinal & shardMask]++;
266-
long readPointer = ordinalMap.getPointerForData(addedOrdinal);
267-
numElementsInDelta[addedOrdinal & shardMask] += VarInt.readVInt(ordinalMap.getByteData().getUnderlyingArray(), readPointer);
268-
265+
long readPointer = getPointerForData(addedOrdinal);
266+
numElementsInDelta[addedOrdinal & shardMask] += VarInt.readVInt(getByteDataForOrdinal(addedOrdinal), readPointer);
267+
269268
addedOrdinal = deltaAdditions.nextSetBit(addedOrdinal + 1);
270269
}
271-
270+
272271
for(int i=0;i<numShards;i++) {
273272
listPointerArray[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, (long)numListsInDelta[i] * bitsPerListPointer);
274273
elementArray[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, numElementsInDelta[i] * bitsPerElement);
275274
deltaAddedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
276275
deltaRemovedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
277276
}
278277

279-
ByteData data = ordinalMap.getByteData().getUnderlyingArray();
280-
281278
int listCounter[] = new int[numShards];
282279
long elementCounter[] = new long[numShards];
283280
int previousRemovedOrdinal[] = new int[numShards];
@@ -286,7 +283,8 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet
286283
for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) {
287284
int shardNumber = ordinal & shardMask;
288285
if(deltaAdditions.get(ordinal)) {
289-
long readPointer = ordinalMap.getPointerForData(ordinal);
286+
long readPointer = getPointerForData(ordinal);
287+
ByteData data = getByteDataForOrdinal(ordinal);
290288

291289
int size = VarInt.readVInt(data, readPointer);
292290
readPointer += VarInt.sizeOfVInt(size);

0 commit comments

Comments
 (0)