2424import com .dxfeed .event .option .impl .TheoPriceMapping ;
2525import com .dxfeed .event .option .impl .UnderlyingMapping ;
2626
27+ import java .util .Arrays ;
28+
2729@ ServiceProvider (order = 100 )
2830public class HistorySubscriptionFilterImpl implements HistorySubscriptionFilter {
2931
@@ -41,6 +43,7 @@ public class HistorySubscriptionFilterImpl implements HistorySubscriptionFilter
4143
4244
4345 private final SynchronizedIndexedSet <DataRecord , RecordLimit > maxRecordCounts = SynchronizedIndexedSet .create (RecordLimit ::getRecord );
46+ private volatile RecordLimit [] maxRecordCache = new RecordLimit [256 ];
4447
4548 @ Override
4649 public long getMinHistoryTime (DataRecord record , int cipher , String symbol ) {
@@ -49,10 +52,25 @@ public long getMinHistoryTime(DataRecord record, int cipher, String symbol) {
4952
5053 @ Override
5154 public int getMaxRecordCount (DataRecord record , int cipher , String symbol ) {
52- RecordLimit recordLimit = maxRecordCounts .getByKey (record );
53- if (recordLimit != null )
54- return recordLimit .limit ;
55+ // Cache is intentionally NOT synchronized in any way. Only minimal array visibility is provided.
56+ // Concurrent threads independently compute same result and reach eventual consistency in a stable environment.
57+ int id = record .getId ();
58+ RecordLimit [] cache = maxRecordCache ; // Atomic volatile read.
59+ if (id >= cache .length ) {
60+ cache = Arrays .copyOf (cache , Math .max (cache .length * 2 , id + 1 ));
61+ maxRecordCache = cache ; // Atomic volatile write.
62+ }
63+ RecordLimit limit = cache [id ];
64+ if (limit == null || limit .getRecord () != record ) {
65+ limit = maxRecordCounts .getByKey (record );
66+ if (limit == null )
67+ limit = maxRecordCounts .putIfAbsentAndGet (createRecordLimit (record ));
68+ cache [id ] = limit ;
69+ }
70+ return limit .getLimit ();
71+ }
5572
73+ private RecordLimit createRecordLimit (DataRecord record ) {
5674 int defaultMaxRecordCount ;
5775 if (record .getMapping (CandleMapping .class ) != null ) {
5876 defaultMaxRecordCount = candleMaxRecordCount ;
@@ -74,8 +92,7 @@ public int getMaxRecordCount(DataRecord record, int cipher, String symbol) {
7492 //event property >= category property >= old-fashioned property
7593 String propName = generatePropertyName (record );
7694 int maxRecordCount = SystemProperties .getIntProperty (HistorySubscriptionFilterImpl .class , propName , defaultMaxRecordCount );
77- maxRecordCounts .put (new RecordLimit (record , maxRecordCount ));
78- return maxRecordCount ;
95+ return new RecordLimit (record , maxRecordCount );
7996 }
8097
8198 @ Override
@@ -94,13 +111,17 @@ private static class RecordLimit {
94111 private final DataRecord record ;
95112 private final int limit ;
96113
97- private RecordLimit (DataRecord record , int limit ) {
114+ RecordLimit (DataRecord record , int limit ) {
98115 this .record = record ;
99116 this .limit = limit ;
100117 }
101118
102119 DataRecord getRecord () {
103120 return record ;
104121 }
122+
123+ public int getLimit () {
124+ return limit ;
125+ }
105126 }
106127}
0 commit comments