forked from OpenHFT/Chronicle-Map
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathChronicleMapBuilder.java
More file actions
executable file
·1801 lines (1643 loc) · 76.8 KB
/
ChronicleMapBuilder.java
File metadata and controls
executable file
·1801 lines (1643 loc) · 76.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package net.openhft.chronicle.map;
import net.openhft.chronicle.algo.MemoryUnit;
import net.openhft.chronicle.algo.hashing.LongHashFunction;
import net.openhft.chronicle.bytes.Byteable;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.hash.ChronicleHashBuilder;
import net.openhft.chronicle.hash.ChronicleHashRecoveryFailedException;
import net.openhft.chronicle.hash.impl.CompactOffHeapLinearHashTable;
import net.openhft.chronicle.hash.impl.SizePrefixedBlob;
import net.openhft.chronicle.hash.impl.VanillaChronicleHash;
import net.openhft.chronicle.hash.impl.stage.entry.ChecksumStrategy;
import net.openhft.chronicle.hash.impl.util.CanonicalRandomAccessFiles;
import net.openhft.chronicle.hash.impl.util.math.PoissonDistribution;
import net.openhft.chronicle.hash.serialization.*;
import net.openhft.chronicle.hash.serialization.impl.SerializationBuilder;
import net.openhft.chronicle.map.replication.MapRemoteOperations;
import net.openhft.chronicle.set.ChronicleSetBuilder;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.values.ValueModel;
import net.openhft.chronicle.values.Values;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.Wire;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static java.lang.Double.isNaN;
import static java.lang.Math.round;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static net.openhft.chronicle.bytes.NativeBytesStore.lazyNativeBytesStoreWithFixedCapacity;
import static net.openhft.chronicle.core.Maths.*;
import static net.openhft.chronicle.hash.impl.CompactOffHeapLinearHashTable.*;
import static net.openhft.chronicle.hash.impl.SizePrefixedBlob.*;
import static net.openhft.chronicle.hash.impl.VanillaChronicleHash.throwRecoveryOrReturnIOException;
import static net.openhft.chronicle.hash.impl.util.FileIOUtils.readFully;
import static net.openhft.chronicle.hash.impl.util.FileIOUtils.writeFully;
import static net.openhft.chronicle.hash.impl.util.Objects.builderEquals;
import static net.openhft.chronicle.map.DefaultSpi.mapEntryOperations;
import static net.openhft.chronicle.map.DefaultSpi.mapRemoteOperations;
import static net.openhft.chronicle.map.VanillaChronicleMap.alignAddr;
/**
* {@code ChronicleMapBuilder} manages {@link ChronicleMap} configurations; could be used as a
* classic builder and/or factory. This means that in addition to the standard builder usage
* pattern: <pre>{@code
* ChronicleMap<Key, Value> map = ChronicleMapOnHeapUpdatableBuilder
* .of(Key.class, Value.class)
* // ... other configurations
* .create();}</pre>
* it could be prepared and used to create many similar maps: <pre>{@code
* ChronicleMapBuilder<Key, Value> builder = ChronicleMapBuilder
* .of(Key.class, Value.class)
* .entries(..)
* // ... other configurations
*
* ChronicleMap<Key, Value> map1 = builder.create();
* ChronicleMap<Key, Value> map2 = builder.create();}</pre>
* i. e. created {@code ChronicleMap} instances don't depend on the builder.
*
* <p>{@code ChronicleMapBuilder} is mutable, see a note in {@link ChronicleHashBuilder} interface
* documentation.
*
* <p>Later in this documentation, "ChronicleMap" means "ChronicleMaps, created by {@code
* ChronicleMapBuilder}", unless specified different, because theoretically someone might provide
* {@code ChronicleMap} implementations with completely different properties.
*
* <p>In addition to the key and value types, you <i>must</i> configure {@linkplain #entries(long)
* number of entries} you are going to insert into the created map <i>at most</i>. See {@link
* #entries(long)} method documentation for more information on this.
*
* <p>If you key or value type is not constantly sized and known to {@code ChronicleHashBuilder}, i.
* e. it is not a boxed primitive, {@linkplain net.openhft.chronicle.values.Values value interface},
* or {@link Byteable}, you <i>must</i> provide the {@code ChronicleHashBuilder} with some
* information about you keys or values: if they are constantly-sized, call {@link
* #constantKeySizeBySample(Object)}, otherwise {@link #averageKey(Object)} or {@link
* #averageKeySize(double)} method, and accordingly for values.
*
* @param <K> key type of the maps, produced by this builder
* @param <V> value type of the maps, produced by this builder
* @see ChronicleHashBuilder
* @see ChronicleMap
* @see ChronicleSetBuilder
*/
public final class ChronicleMapBuilder<K, V> implements
ChronicleHashBuilder<K, ChronicleMap<K, V>, ChronicleMapBuilder<K, V>> {
private static final int UNDEFINED_ALIGNMENT_CONFIG = -1;
private static final int NO_ALIGNMENT = 1;
/**
* If want to increase this number, note {@link OldDeletedEntriesCleanup} uses array to store
* all segment indexes -- so it could be current JVM max array size, not Integer.MAX_VALUE
* (which is an obvious limitation, as many APIs and internals use int type for representing
* segment index).
*
* Anyway, unlikely anyone ever need more than 1 billion segments.
*/
private static final int MAX_SEGMENTS = (1 << 30);
private static final Logger LOG =
LoggerFactory.getLogger(ChronicleMapBuilder.class.getName());
private static final double UNDEFINED_DOUBLE_CONFIG = Double.NaN;
private static final ConcurrentHashMap<File, Void> fileLockingControl =
new ConcurrentHashMap<>(128);
private static int MAX_BOOTSTRAPPING_HEADER_SIZE = (int) MemoryUnit.KILOBYTES.toBytes(16);
SerializationBuilder<K> keyBuilder;
SerializationBuilder<V> valueBuilder;
K averageKey;
V averageValue;
/**
* Default timeout is 1 minute. Even loopback tests converge often in the course of seconds,
* let alone WAN replication over many nodes might take tens of seconds.
* <p/>
* TODO review
*/
long cleanupTimeout = 1;
TimeUnit cleanupTimeoutUnit = TimeUnit.MINUTES;
boolean cleanupRemovedEntries = true;
//////////////////////////////
// Configuration fields
DefaultValueProvider<K, V> defaultValueProvider = DefaultSpi.defaultValueProvider();
byte replicationIdentifier = -1;
MapMethods<K, V, ?> methods = DefaultSpi.mapMethods();
MapEntryOperations<K, V, ?> entryOperations = mapEntryOperations();
MapRemoteOperations<K, V, ?> remoteOperations = mapRemoteOperations();
// not final because of cloning
private ChronicleMapBuilderPrivateAPI<K, V> privateAPI =
new ChronicleMapBuilderPrivateAPI<>(this);
// used when configuring the number of segments.
private int minSegments = -1;
private int actualSegments = -1;
// used when reading the number of entries per
private long entriesPerSegment = -1L;
private long actualChunksPerSegmentTier = -1L;
private double averageKeySize = UNDEFINED_DOUBLE_CONFIG;
private K sampleKey;
private double averageValueSize = UNDEFINED_DOUBLE_CONFIG;
private V sampleValue;
private int actualChunkSize = 0;
private int worstAlignment = -1;
private int maxChunksPerEntry = -1;
private int alignment = UNDEFINED_ALIGNMENT_CONFIG;
private long entries = -1L;
private double maxBloatFactor = 1.0;
private boolean allowSegmentTiering = true;
private double nonTieredSegmentsPercentile = 0.99999;
private boolean aligned64BitMemoryOperationsAtomic = OS.is64Bit();
private ChecksumEntries checksumEntries = ChecksumEntries.IF_PERSISTED;
private boolean putReturnsNull = false;
private boolean removeReturnsNull = false;
private boolean replicated;
private boolean persisted;
ChronicleMapBuilder(Class<K> keyClass, Class<V> valueClass) {
keyBuilder = new SerializationBuilder<>(keyClass);
valueBuilder = new SerializationBuilder<>(valueClass);
}
private static boolean isDefined(double config) {
return !isNaN(config);
}
private static long toLong(double v) {
long l = round(v);
if (l != v)
throw new IllegalArgumentException("Integer argument expected, given " + v);
return l;
}
private static long roundUp(double v) {
return round(Math.ceil(v));
}
private static long roundDown(double v) {
return (long) v;
}
//////////////////////////////
// Instance fields
/**
* When Chronicle Maps are created using {@link #createPersistedTo(File)} or
* {@link #recoverPersistedTo(File, boolean)} or {@link #createOrRecoverPersistedTo(File)}
* methods, file lock on the Chronicle Map's lock is acquired, that shouldn't be done from
* concurrent threads within the same JVM process. So creation of Chronicle Maps
* persisted to the same File should be synchronized across JVM's threads. Simple way would be
* to synchronize on some static (lock) object, but would serialize all Chronicle Maps creations
* (persisted to any files), ConcurrentHashMap#compute() gives more scalability.
* ConcurrentHashMap is used effectively for lock striping only, because the entries are not
* even landing the map, because compute() always returns null.
*/
private static void fileLockedIO(
File file, FileChannel fileChannel, FileIOAction fileIOAction) throws IOException {
fileLockingControl.compute(file, (k, v) -> {
try {
try (FileLock ignored = fileChannel.lock()) {
fileIOAction.fileIOAction();
}
return null;
} catch (IOException e) {
throw Jvm.rethrow(e);
}
});
}
/**
* Returns a new {@code ChronicleMapBuilder} instance which is able to {@linkplain #create()
* create} maps with the specified key and value classes.
*
* @param keyClass class object used to infer key type and discover it's properties via
* reflection
* @param valueClass class object used to infer value type and discover it's properties via
* reflection
* @param <K> key type of the maps, created by the returned builder
* @param <V> value type of the maps, created by the returned builder
* @return a new builder for the given key and value classes
*/
public static <K, V> ChronicleMapBuilder<K, V> of(
@NotNull Class<K> keyClass, @NotNull Class<V> valueClass) {
return new ChronicleMapBuilder<>(keyClass, valueClass);
}
private static void checkSegments(long segments) {
if (segments <= 0) {
throw new IllegalArgumentException("segments should be positive, " +
segments + " given");
}
if (segments > MAX_SEGMENTS) {
throw new IllegalArgumentException("Max segments is " + MAX_SEGMENTS + ", " +
segments + " given");
}
}
private static String pretty(int value) {
return value > 0 ? value + "" : "not configured";
}
private static String pretty(Object obj) {
return obj != null ? obj + "" : "not configured";
}
private static void checkSizeIsStaticallyKnown(SerializationBuilder builder, String role) {
if (builder.sizeIsStaticallyKnown) {
throw new IllegalStateException("Size of " + builder.tClass +
" instances is constant and statically known, shouldn't be specified via " +
"average" + role + "Size() or average" + role + "() methods");
}
}
private static void checkAverageSize(double averageSize, String role) {
if (averageSize <= 0 || isNaN(averageSize) ||
Double.isInfinite(averageSize)) {
throw new IllegalArgumentException("Average " + role + " size must be a positive, " +
"finite number");
}
}
private static double averageSizeStoringLength(
SerializationBuilder builder, double averageSize) {
SizeMarshaller sizeMarshaller = builder.sizeMarshaller();
if (averageSize == round(averageSize))
return sizeMarshaller.storingLength(round(averageSize));
long lower = roundDown(averageSize);
long upper = lower + 1;
int lowerStoringLength = sizeMarshaller.storingLength(lower);
int upperStoringLength = sizeMarshaller.storingLength(upper);
if (lowerStoringLength == upperStoringLength)
return lowerStoringLength;
return lower * (upper - averageSize) + upper * (averageSize - lower);
}
static int greatestCommonDivisor(int a, int b) {
if (b == 0) return a;
return greatestCommonDivisor(b, a % b);
}
private static int maxDefaultChunksPerAverageEntry(boolean replicated) {
// When replicated, having 8 chunks (=> 8 bits in bitsets) per entry seems more wasteful
// because when replicated we have bit sets per each remote node, not only allocation
// bit set as when non-replicated
return replicated ? 4 : 8;
}
private static int estimateSegmentsForEntries(long size) {
if (size > 200 << 20)
return 256;
if (size >= 1 << 20)
return 128;
if (size >= 128 << 10)
return 64;
if (size >= 16 << 10)
return 32;
if (size >= 4 << 10)
return 16;
if (size >= 1 << 10)
return 8;
return 1;
}
private static long headerChecksum(ByteBuffer headerBuffer, int headerSize) {
return LongHashFunction.xx_r39().hashBytes(headerBuffer, SIZE_WORD_OFFSET, headerSize + 4);
}
private static void writeNotComplete(
FileChannel fileChannel, ByteBuffer headerBuffer, int headerSize) throws IOException {
//noinspection PointlessBitwiseExpression
headerBuffer.putInt(SIZE_WORD_OFFSET, NOT_COMPLETE | DATA | headerSize);
headerBuffer.clear().position(SIZE_WORD_OFFSET).limit(SIZE_WORD_OFFSET + 4);
writeFully(fileChannel, SIZE_WORD_OFFSET, headerBuffer);
}
/**
* @return ByteBuffer, with self bootstrapping header in [position, limit) range
*/
private static <K, V> ByteBuffer writeHeader(
FileChannel fileChannel, VanillaChronicleMap<K, V, ?> map) throws IOException {
ByteBuffer headerBuffer = ByteBuffer.allocate(
SELF_BOOTSTRAPPING_HEADER_OFFSET + MAX_BOOTSTRAPPING_HEADER_SIZE);
headerBuffer.order(LITTLE_ENDIAN);
Bytes<ByteBuffer> headerBytes = Bytes.wrapForWrite(headerBuffer);
headerBytes.writePosition(SELF_BOOTSTRAPPING_HEADER_OFFSET);
Wire wire = new TextWire(headerBytes);
wire.getValueOut().typedMarshallable(map);
int headerLimit = (int) headerBytes.writePosition();
int headerSize = headerLimit - SELF_BOOTSTRAPPING_HEADER_OFFSET;
// First set readiness bit to READY, to compute checksum correctly
//noinspection PointlessBitwiseExpression
headerBuffer.putInt(SIZE_WORD_OFFSET, READY | DATA | headerSize);
long checksum = headerChecksum(headerBuffer, headerSize);
headerBuffer.putLong(HEADER_OFFSET, checksum);
// Set readiness bit to NOT_COMPLETE, because the Chronicle Map instance is not actually
// ready yet
//noinspection PointlessBitwiseExpression
headerBuffer.putInt(SIZE_WORD_OFFSET, NOT_COMPLETE | DATA | headerSize);
// Write the size-prefixed blob to the file
headerBuffer.position(0).limit(headerLimit);
writeFully(fileChannel, 0, headerBuffer);
headerBuffer.position(SELF_BOOTSTRAPPING_HEADER_OFFSET);
return headerBuffer;
}
private static void commitChronicleMapReady(
VanillaChronicleHash map, RandomAccessFile raf, ByteBuffer headerBuffer, int headerSize)
throws IOException {
FileChannel fileChannel = raf.getChannel();
// see HCOLL-396
map.msync();
//noinspection PointlessBitwiseExpression
headerBuffer.putInt(SIZE_WORD_OFFSET, READY | DATA | headerSize);
headerBuffer.clear().position(SIZE_WORD_OFFSET).limit(SIZE_WORD_OFFSET + 4);
writeFully(fileChannel, SIZE_WORD_OFFSET, headerBuffer);
}
@Override
public ChronicleMapBuilder<K, V> clone() {
try {
@SuppressWarnings("unchecked")
ChronicleMapBuilder<K, V> result =
(ChronicleMapBuilder<K, V>) super.clone();
result.keyBuilder = keyBuilder.clone();
result.valueBuilder = valueBuilder.clone();
result.privateAPI = new ChronicleMapBuilderPrivateAPI<>(result);
return result;
} catch (CloneNotSupportedException e) {
throw new AssertionError(e);
}
}
/**
* @deprecated don't use private API in the client code
*/
@Override
@Deprecated
public Object privateAPI() {
return privateAPI;
}
/**
* {@inheritDoc}
*
* <p>Example: if keys in your map(s) are English words in {@link String} form, average English
* word length is 5.1, configure average key size of 6: <pre>{@code
* ChronicleMap<String, LongValue> wordFrequencies = ChronicleMapBuilder
* .of(String.class, LongValue.class)
* .entries(50000)
* .averageKeySize(6)
* .create();}</pre>
* (Note that 6 is chosen as average key size in bytes despite strings in Java are UTF-16
* encoded (and each character takes 2 bytes on-heap), because default off-heap {@link String}
* encoding is UTF-8 in {@code ChronicleMap}.)
*
* @param averageKeySize the average size of the key
* @throws IllegalStateException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
* @see #averageKey(Object)
* @see #constantKeySizeBySample(Object)
* @see #averageValueSize(double)
* @see #actualChunkSize(int)
*/
@Override
public ChronicleMapBuilder<K, V> averageKeySize(double averageKeySize) {
checkSizeIsStaticallyKnown(keyBuilder, "Key");
checkAverageSize(averageKeySize, "key");
this.averageKeySize = averageKeySize;
averageKey = null;
sampleKey = null;
return this;
}
/**
* {@inheritDoc}
*
* @param averageKey the average (by footprint in serialized form) key, is going to be put
* into the hash containers, created by this builder
* @throws NullPointerException {@inheritDoc}
* @see #averageKeySize(double)
* @see #constantKeySizeBySample(Object)
* @see #averageValue(Object)
* @see #actualChunkSize(int)
*/
@Override
public ChronicleMapBuilder<K, V> averageKey(K averageKey) {
Objects.requireNonNull(averageKey);
checkSizeIsStaticallyKnown(keyBuilder, "Key");
this.averageKey = averageKey;
sampleKey = null;
averageKeySize = UNDEFINED_DOUBLE_CONFIG;
return this;
}
/**
* {@inheritDoc}
*
* <p>For example, if your keys are Git commit hashes:<pre>{@code
* Map<byte[], String> gitCommitMessagesByHash =
* ChronicleMapBuilder.of(byte[].class, String.class)
* .constantKeySizeBySample(new byte[20])
* .create();}</pre>
*
* @see #averageKeySize(double)
* @see #averageKey(Object)
* @see #constantValueSizeBySample(Object)
*/
@Override
public ChronicleMapBuilder<K, V> constantKeySizeBySample(K sampleKey) {
this.sampleKey = sampleKey;
averageKey = null;
averageKeySize = UNDEFINED_DOUBLE_CONFIG;
return this;
}
private double averageKeySize() {
if (!isDefined(averageKeySize))
throw new AssertionError();
return averageKeySize;
}
/**
* Configures the average number of bytes, taken by serialized form of values, put into maps,
* created by this builder. However, in many cases {@link #averageValue(Object)} might be easier
* to use and more reliable. If value size is always the same, call {@link
* #constantValueSizeBySample(Object)} method instead of this one.
*
* <p>{@code ChronicleHashBuilder} implementation heuristically chooses {@linkplain
* #actualChunkSize(int) the actual chunk size} based on this configuration and the key size,
* that, however, might result to quite high internal fragmentation, i. e. losses because only
* integral number of chunks could be allocated for the entry. If you want to avoid this, you
* should manually configure the actual chunk size in addition to this average value size
* configuration, which is anyway needed.
*
* <p>If values are of boxed primitive type or {@link Byteable} subclass, i. e. if value size is
* known statically, it is automatically accounted and shouldn't be specified by user.
*
* <p>Calling this method clears any previous {@link #constantValueSizeBySample(Object)} and
* {@link #averageValue(Object)} configurations.
*
* @param averageValueSize number of bytes, taken by serialized form of values
* @return this builder back
* @throws IllegalStateException if value size is known statically and shouldn't be
* configured by user
* @throws IllegalArgumentException if the given {@code averageValueSize} is non-positive
* @see #averageValue(Object)
* @see #constantValueSizeBySample(Object)
* @see #averageKeySize(double)
* @see #actualChunkSize(int)
*/
public ChronicleMapBuilder<K, V> averageValueSize(double averageValueSize) {
checkSizeIsStaticallyKnown(valueBuilder, "Value");
checkAverageSize(averageValueSize, "value");
this.averageValueSize = averageValueSize;
averageValue = null;
sampleValue = null;
return this;
}
/**
* Configures the average number of bytes, taken by serialized form of values, put into maps,
* created by this builder, by serializing the given {@code averageValue} using the configured
* {@link #valueMarshallers(SizedReader, SizedWriter) value marshallers}. In some cases, {@link
* #averageValueSize(double)} might be easier to use, than constructing the "average value".
* If value size is always the same, call {@link #constantValueSizeBySample(Object)} method
* instead of this one.
*
* <p>Example: If you
*
* <p>{@code ChronicleHashBuilder} implementation heuristically chooses {@linkplain
* #actualChunkSize(int) the actual chunk size} based on this configuration and the key size,
* that, however, might result to quite high internal fragmentation, i. e. losses because only
* integral number of chunks could be allocated for the entry. If you want to avoid this, you
* should manually configure the actual chunk size in addition to this average value size
* configuration, which is anyway needed.
*
* <p>If values are of boxed primitive type or {@link Byteable} subclass, i. e. if value size is
* known statically, it is automatically accounted and shouldn't be specified by user.
*
* <p>Calling this method clears any previous {@link #constantValueSizeBySample(Object)}
* and {@link #averageValueSize(double)} configurations.
*
* @param averageValue the average (by footprint in serialized form) value, is going to be put
* into the maps, created by this builder
* @return this builder back
* @throws NullPointerException if the given {@code averageValue} is {@code null}
* @see #averageValueSize(double)
* @see #constantValueSizeBySample(Object)
* @see #averageKey(Object)
* @see #actualChunkSize(int)
*/
public ChronicleMapBuilder<K, V> averageValue(V averageValue) {
Objects.requireNonNull(averageValue);
checkSizeIsStaticallyKnown(valueBuilder, "Value");
this.averageValue = averageValue;
sampleValue = null;
averageValueSize = UNDEFINED_DOUBLE_CONFIG;
return this;
}
/**
* Configures the constant number of bytes, taken by serialized form of values, put into maps,
* created by this builder. This is done by providing the {@code sampleValue}, all values should
* take the same number of bytes in serialized form, as this sample object.
*
* <p>If values are of boxed primitive type or {@link Byteable} subclass, i. e. if value size is
* known statically, it is automatically accounted and this method shouldn't be called.
*
* <p>If value size varies, method {@link #averageValue(Object)} or {@link
* #averageValueSize(double)} should be called instead of this one.
*
* <p>Calling this method clears any previous {@link #averageValue(Object)} and
* {@link #averageValueSize(double)} configurations.
*
* @param sampleValue the sample value
* @return this builder back
* @see #averageValueSize(double)
* @see #averageValue(Object)
* @see #constantKeySizeBySample(Object)
*/
public ChronicleMapBuilder<K, V> constantValueSizeBySample(V sampleValue) {
this.sampleValue = sampleValue;
averageValue = null;
averageValueSize = UNDEFINED_DOUBLE_CONFIG;
return this;
}
double averageValueSize() {
if (!isDefined(averageValueSize))
throw new AssertionError();
return averageValueSize;
}
private <E> double averageKeyOrValueSize(
double configuredSize, SerializationBuilder<E> builder, E average) {
if (isDefined(configuredSize))
return configuredSize;
if (builder.constantSizeMarshaller())
return builder.constantSize();
if (average != null) {
return builder.serializationSize(average);
}
return Double.NaN;
}
/**
* {@inheritDoc}
*
* @throws IllegalStateException is sizes of both keys and values of maps created by this
* builder are constant, hence chunk size shouldn't be configured by user
* @see #entryAndValueOffsetAlignment(int)
* @see #entries(long)
* @see #maxChunksPerEntry(int)
*/
@Override
public ChronicleMapBuilder<K, V> actualChunkSize(int actualChunkSize) {
if (constantlySizedEntries()) {
throw new IllegalStateException("Sizes of key type: " + keyBuilder.tClass + " and " +
"value type: " + valueBuilder.tClass + " are both constant, " +
"so chunk size shouldn't be specified manually");
}
if (actualChunkSize <= 0)
throw new IllegalArgumentException("Chunk size must be positive");
this.actualChunkSize = actualChunkSize;
return this;
}
SerializationBuilder<K> keyBuilder() {
return keyBuilder;
}
private EntrySizeInfo entrySizeInfo() {
double size = 0;
double keySize = averageKeySize();
size += averageSizeStoringLength(keyBuilder, keySize);
size += keySize;
if (replicated)
size += ReplicatedChronicleMap.ADDITIONAL_ENTRY_BYTES;
if (checksumEntries())
size += ChecksumStrategy.CHECKSUM_STORED_BYTES;
double valueSize = averageValueSize();
size += averageSizeStoringLength(valueBuilder, valueSize);
int alignment = valueAlignment();
int worstAlignment;
if (worstAlignmentComputationRequiresValueSize(alignment)) {
long constantSizeBeforeAlignment = toLong(size);
if (constantlySizedValues()) {
// see tierEntrySpaceInnerOffset()
long totalDataSize = constantSizeBeforeAlignment + constantValueSize();
worstAlignment = (int) (alignAddr(totalDataSize, alignment) - totalDataSize);
} else {
determineAlignment:
if (actualChunkSize > 0) {
worstAlignment = worstAlignmentAssumingChunkSize(constantSizeBeforeAlignment,
actualChunkSize);
} else {
int chunkSize = 8;
worstAlignment = worstAlignmentAssumingChunkSize(
constantSizeBeforeAlignment, chunkSize);
if (size + worstAlignment + valueSize >=
maxDefaultChunksPerAverageEntry(replicated) * chunkSize) {
break determineAlignment;
}
chunkSize = 4;
worstAlignment = worstAlignmentAssumingChunkSize(
constantSizeBeforeAlignment, chunkSize);
}
}
} else {
// assume worst case, we always lose most possible bytes for alignment
worstAlignment = worstAlignmentWithoutValueSize(alignment);
}
size += worstAlignment;
size += valueSize;
return new EntrySizeInfo(size, worstAlignment);
}
private boolean worstAlignmentComputationRequiresValueSize(int alignment) {
return alignment != NO_ALIGNMENT &&
constantlySizedKeys() && valueBuilder.constantStoringLengthSizeMarshaller();
}
private int worstAlignmentWithoutValueSize(int alignment) {
return alignment - 1;
}
int segmentEntrySpaceInnerOffset() {
// This is needed, if chunkSize = constant entry size is not aligned, for entry alignment
// to be always the same, we should _misalign_ the first chunk.
if (!constantlySizedEntries())
return 0;
return (int) (constantValueSize() % valueAlignment());
}
private long constantValueSize() {
return valueBuilder.constantSize();
}
boolean constantlySizedKeys() {
return keyBuilder.constantSizeMarshaller() || sampleKey != null;
}
private int worstAlignmentAssumingChunkSize(
long constantSizeBeforeAlignment, int chunkSize) {
int alignment = valueAlignment();
long firstAlignment = alignAddr(constantSizeBeforeAlignment, alignment) -
constantSizeBeforeAlignment;
int gcdOfAlignmentAndChunkSize = greatestCommonDivisor(alignment, chunkSize);
if (gcdOfAlignmentAndChunkSize == alignment)
return (int) firstAlignment;
// assume worst by now because we cannot predict alignment in VanillaCM.entrySize() method
// before allocation
long worstAlignment = firstAlignment;
while (worstAlignment + gcdOfAlignmentAndChunkSize < alignment)
worstAlignment += gcdOfAlignmentAndChunkSize;
return (int) worstAlignment;
}
int worstAlignment() {
if (worstAlignment >= 0)
return worstAlignment;
int alignment = valueAlignment();
if (!worstAlignmentComputationRequiresValueSize(alignment))
return worstAlignment = worstAlignmentWithoutValueSize(alignment);
return worstAlignment = entrySizeInfo().worstAlignment;
}
void worstAlignment(int worstAlignment) {
assert worstAlignment >= 0;
this.worstAlignment = worstAlignment;
}
long chunkSize() {
if (actualChunkSize > 0)
return actualChunkSize;
double averageEntrySize = entrySizeInfo().averageEntrySize;
if (constantlySizedEntries())
return toLong(averageEntrySize);
int maxChunkSize = 1 << 30;
for (long chunkSize = 4; chunkSize <= maxChunkSize; chunkSize *= 2L) {
if (maxDefaultChunksPerAverageEntry(replicated) * chunkSize > averageEntrySize)
return chunkSize;
}
return maxChunkSize;
}
boolean constantlySizedEntries() {
return constantlySizedKeys() && constantlySizedValues();
}
double averageChunksPerEntry() {
if (constantlySizedEntries())
return 1.0;
long chunkSize = chunkSize();
// assuming we always has worst internal fragmentation. This affects total segment
// entry space which is allocated lazily on Linux (main target platform)
// so we can afford this
return (entrySizeInfo().averageEntrySize + chunkSize - 1) / chunkSize;
}
@Override
public ChronicleMapBuilder<K, V> maxChunksPerEntry(int maxChunksPerEntry) {
if (maxChunksPerEntry < 1)
throw new IllegalArgumentException("maxChunksPerEntry should be >= 1, " +
maxChunksPerEntry + " given");
this.maxChunksPerEntry = maxChunksPerEntry;
return this;
}
int maxChunksPerEntry() {
if (constantlySizedEntries())
return 1;
long actualChunksPerSegmentTier = actualChunksPerSegmentTier();
int result = (int) Math.min(actualChunksPerSegmentTier, (long) Integer.MAX_VALUE);
if (this.maxChunksPerEntry > 0)
result = Math.min(this.maxChunksPerEntry, result);
return result;
}
boolean constantlySizedValues() {
return valueBuilder.constantSizeMarshaller() || sampleValue != null;
}
/**
* Configures alignment of address in memory of entries and independently of address in memory
* of values within entries ((i. e. final addresses in native memory are multiples of the given
* alignment) for ChronicleMaps, created by this builder.
*
* <p>Useful when values of the map are updated intensively, particularly fields with volatile
* access, because it doesn't work well if the value crosses cache lines. Also, on some
* (nowadays rare) architectures any misaligned memory access is more expensive than aligned.
*
* <p>If values couldn't reference off-heap memory (i. e. it is not {@link Byteable} or a value
* interface), alignment configuration makes no sense.
*
* <p>Default is {@link ValueModel#recommendedOffsetAlignment()} if the value type is a value
* interface, otherwise 1 (that is effectively no alignment) or chosen heuristically (configure
* explicitly for being sure and to compare performance in your case).
*
* @param alignment the new alignment of the maps constructed by this builder
* @return this {@code ChronicleMapOnHeapUpdatableBuilder} back
* @throws IllegalStateException if values of maps, created by this builder, couldn't reference
* off-heap memory
*/
public ChronicleMapBuilder<K, V> entryAndValueOffsetAlignment(int alignment) {
if (alignment <= 0) {
throw new IllegalArgumentException("Alignment should be positive integer, " +
alignment + " given");
}
if (!isPowerOf2(alignment)) {
throw new IllegalArgumentException("Alignment should be a power of 2, " + alignment +
" given");
}
this.alignment = alignment;
return this;
}
int valueAlignment() {
if (alignment != UNDEFINED_ALIGNMENT_CONFIG)
return alignment;
try {
if (Values.isValueInterfaceOrImplClass(valueBuilder.tClass)) {
return ValueModel.acquire(valueBuilder.tClass).recommendedOffsetAlignment();
} else {
return NO_ALIGNMENT;
}
} catch (Exception e) {
return NO_ALIGNMENT;
}
}
@Override
public ChronicleMapBuilder<K, V> entries(long entries) {
if (entries <= 0L)
throw new IllegalArgumentException("Entries should be positive, " + entries + " given");
this.entries = entries;
return this;
}
long entries() {
if (entries < 0) {
throw new IllegalStateException("If in-memory Chronicle Map is created or persisted\n" +
"to a file for the first time (i. e. not accessing existing file),\n" +
"ChronicleMapBuilder.entries() must be configured.\n" +
"See Chronicle Map 3 tutorial and javadocs for more information");
}
return entries;
}
@Override
public ChronicleMapBuilder<K, V> entriesPerSegment(long entriesPerSegment) {
if (entriesPerSegment <= 0L)
throw new IllegalArgumentException("Entries per segment should be positive, " +
entriesPerSegment + " given");
this.entriesPerSegment = entriesPerSegment;
return this;
}
long entriesPerSegment() {
long entriesPerSegment;
if (this.entriesPerSegment > 0L) {
entriesPerSegment = this.entriesPerSegment;
} else {
int actualSegments = actualSegments();
double averageEntriesPerSegment = entries() * 1.0 / actualSegments;
if (actualSegments > 1) {
entriesPerSegment = PoissonDistribution.inverseCumulativeProbability(
averageEntriesPerSegment, nonTieredSegmentsPercentile);
} else {
// if there is only 1 segment, there is no source of variance in segments filling
entriesPerSegment = roundUp(averageEntriesPerSegment);
}
}
boolean actualChunksDefined = actualChunksPerSegmentTier > 0;
if (!actualChunksDefined) {
double averageChunksPerEntry = averageChunksPerEntry();
if (entriesPerSegment * averageChunksPerEntry >
MAX_TIER_CHUNKS)
throw new IllegalStateException("Max chunks per segment tier is " +
MAX_TIER_CHUNKS + " configured entries() and actualSegments() so that " +
"there should be " + entriesPerSegment + " entries per segment tier, " +
"while average chunks per entry is " + averageChunksPerEntry);
}
if (entriesPerSegment > MAX_TIER_ENTRIES)
throw new IllegalStateException("shouldn't be more than " +
MAX_TIER_ENTRIES + " entries per segment");
return entriesPerSegment;
}
@Override
public ChronicleMapBuilder<K, V> actualChunksPerSegmentTier(long actualChunksPerSegmentTier) {
if (actualChunksPerSegmentTier <= 0 || actualChunksPerSegmentTier > MAX_TIER_CHUNKS)
throw new IllegalArgumentException("Actual chunks per segment tier should be in [1, " +
MAX_TIER_CHUNKS + "], range, " + actualChunksPerSegmentTier + " given");
this.actualChunksPerSegmentTier = actualChunksPerSegmentTier;
return this;
}
private void checkActualChunksPerSegmentTierIsConfiguredOnlyIfOtherLowLevelConfigsAreManual() {
if (actualChunksPerSegmentTier > 0) {
if (entriesPerSegment <= 0 || (actualChunkSize <= 0 && !constantlySizedEntries()) ||
actualSegments <= 0)
throw new IllegalStateException("Actual chunks per segment tier could be " +
"configured only if other three low level configs are manual: " +
"entriesPerSegment(), actualSegments() and actualChunkSize(), unless " +
"both keys and value sizes are constant");
}
}
private void checkActualChunksPerSegmentGreaterOrEqualToEntries() {
if (actualChunksPerSegmentTier > 0 && entriesPerSegment > 0 &&
entriesPerSegment > actualChunksPerSegmentTier) {
throw new IllegalStateException("Entries per segment couldn't be greater than " +
"actual chunks per segment tier. Entries: " + entriesPerSegment + ", " +
"chunks: " + actualChunksPerSegmentTier + " is configured");
}
}
long actualChunksPerSegmentTier() {
if (actualChunksPerSegmentTier > 0)
return actualChunksPerSegmentTier;
return chunksPerSegmentTier(entriesPerSegment());
}
private long chunksPerSegmentTier(long entriesPerSegment) {
return roundUp(entriesPerSegment * averageChunksPerEntry());
}
@Override
public ChronicleMapBuilder<K, V> minSegments(int minSegments) {
checkSegments(minSegments);
this.minSegments = minSegments;
return this;
}
int minSegments() {
return Math.max(estimateSegments(), minSegments);
}
private int estimateSegments() {
return (int) Math.min(nextPower2(entries() / 32, 1), estimateSegmentsBasedOnSize());
}
//TODO review because this heuristic doesn't seem to perform well
private int estimateSegmentsBasedOnSize() {
// the idea is that if values are huge, operations on them (and simply ser/deser)
// could take long time, so we want more segment to minimize probablity that
// two or more concurrent write ops will go to the same segment, and then all but one of
// these threads will wait for long time.
int segmentsForEntries = estimateSegmentsForEntries(entries());
double averageValueSize = averageValueSize();
return averageValueSize >= 1000000
? segmentsForEntries * 16
: averageValueSize >= 100000
? segmentsForEntries * 8
: averageValueSize >= 10000
? segmentsForEntries * 4
: averageValueSize >= 1000
? segmentsForEntries * 2
: segmentsForEntries;
}
@Override
public ChronicleMapBuilder<K, V> actualSegments(int actualSegments) {
checkSegments(actualSegments);
this.actualSegments = actualSegments;
return this;
}
int actualSegments() {
if (actualSegments > 0)
return actualSegments;
if (entriesPerSegment > 0) {
return (int) segmentsGivenEntriesPerSegmentFixed(entriesPerSegment);
}
// Try to fit 4 bytes per hash lookup slot, then 8. Trying to apply small slot
// size (=> segment size, because slot size depends on segment size) not only because
// they take less memory per entry (if entries are of KBs or MBs, it doesn't matter), but
// also because if segment size is small, slot and free list are likely to lie on a single
// memory page, reducing number of memory pages to update, if Chronicle Map is persisted.