|
| 1 | + |
| 2 | +# PIP-393: Improve performance of Negative Acknowledgement |
| 3 | + |
| 4 | +# Background knowledge |
| 5 | + |
| 6 | +Negative Acknowledgement is a feature in Pulsar that allows consumers to trigger the redelivery |
| 7 | +of a message after some time when they fail to process it. When user calls `negativeAcknowledge` method, |
| 8 | +`NegativeAcksTracker` in `ConsumerImpl` will add an entry into the map `NegativeAcksTracker.nackedMessages`, |
| 9 | +mapping the message ID to the redelivery time. When the redelivery time comes, `NegativeAcksTracker` will |
| 10 | +send a redelivery request to the broker to redeliver the message. |
| 11 | + |
| 12 | +# Motivation |
| 13 | + |
| 14 | +There are many issues with the current implementation of Negative Acknowledgement in Pulsar: |
| 15 | +- the memory occupation is high. |
| 16 | +- the code execution efficiency is low. |
| 17 | +- the redelivery time is not accurate. |
| 18 | +- multiple negative ack for messages in the same entry(batch) will interfere with each other. |
| 19 | +All of these problem is severe and need to be solved. |
| 20 | + |
| 21 | +## Memory occupation is high |
| 22 | +After the improvement of https://github.com/apache/pulsar/pull/23582, we have reduce half more memory occupation |
| 23 | +of `NegativeAcksTracker` by replacing `HashMap` with `ConcurrentLongLongPairHashMap`. With 1 million entry, the memory |
| 24 | +occupation decrease from 178MB to 64MB. With 10 million entry, the memory occupation decrease from 1132MB to 512MB. |
| 25 | +The average memory occupation of each entry decrease from 1132MB/10000000=118byte to 512MB/10000000=53byte. |
| 26 | + |
| 27 | +But it is not enough. Assuming that we negative ack message 10k/s, assigning 1h redelivery delay for each message, |
| 28 | +the memory occupation of `NegativeAcksTracker` will be `3600*10000*53/1024/1024/1024=1.77GB`, if the delay is 5h, |
| 29 | +the required memory is `3600*10000*53/1024/1024/1024*5=8.88GB`, which increase too fast. |
| 30 | + |
| 31 | +## Code execution efficiency is low |
| 32 | +Currently, each time the timer task is triggered, it will iterate all the entries in `NegativeAcksTracker.nackedMessages`, |
| 33 | +which is unnecessary. We can sort entries by timestamp and only iterate the entries that need to be redelivered. |
| 34 | + |
| 35 | +## Redelivery time is not accurate |
| 36 | +Currently, the redelivery check time is controlled by the `timerIntervalNanos`, which is 1/3 of the `negativeAckRedeliveryDelay`. |
| 37 | +That means, if the `negativeAckRedeliveryDelay` is 1h, check task will be started every 20min, the deviation of the redelivery |
| 38 | +time is 20min, which is unacceptable. |
| 39 | + |
| 40 | +## Multiple negative ack for messages in the same entry(batch) will interfere with each other |
| 41 | +Currently, `NegativeAcksTracker#nackedMessages` map `(ledgerId, entryId)` to `timestamp`, which means multiple nacks from messages |
| 42 | +in the same batch share single one timestamp. |
| 43 | +If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two messages are delivered 20s later together. |
| 44 | +msg1 will not be redelivered 10s later as the timestamp recorded in `NegativeAcksTracker#nackedMessages` is overrode by the second |
| 45 | +nack call. |
| 46 | + |
| 47 | + |
| 48 | +# Goals |
| 49 | + |
| 50 | +Refactor the `NegativeAcksTracker` to solve the above problems. |
| 51 | + |
| 52 | +To avoid interation of all entries in `NegativeAcksTracker.nackedMessages`, we use a sorted map to store the entries. |
| 53 | +To reduce memory occupation, we use util class provided by fastutil(https://fastutil.di.unimi.it/docs/), and design |
| 54 | +a new algorithm to store the entries, reduce the memory occupation to even 1% less than the current implementation. |
| 55 | +(the actual effect rely on the configuration and the throughput). |
| 56 | + |
| 57 | +# Detailed Design |
| 58 | + |
| 59 | +## Design & Implementation Details |
| 60 | + |
| 61 | +### New Data Structure |
| 62 | +Use following data structure to store the entries: |
| 63 | +```java |
| 64 | +Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new Long2ObjectAVLTreeMap<>(); |
| 65 | +``` |
| 66 | +mapping `timestamp -> ledgerId -> entryId`. |
| 67 | +We need to sort timestamp in ascending order, so we use a sorted map to map timestamp to `ledgerId -> entryId` map. |
| 68 | +As there will many entries in the map, we use `Long2ObjectAVLTreeMap` instead of `Long2ObjectRBTreeMap`. |
| 69 | +As for the inner map, we use `Long2ObjectMap` to map `ledgerId` to `entryId` because we don't need to keep the order of `ledgerId`. |
| 70 | +`Long2ObjectOpenHashMap` will be satisfied. |
| 71 | +All entry id for the same ledger id will be stored in a bit set, as we only care about the existence of the entry id. |
| 72 | + |
| 73 | + |
| 74 | +### TimeStamp Bucket |
| 75 | +Timestamp in ms is used as the key of the map. As most of the use cases don't require that the precision of the delay time is 1ms, |
| 76 | +we can make the timestamp bucketed, that is, we can trim the lower bit of the timestamp to map the timestamp to a bucket. |
| 77 | +For example, if we trim the lower 1 bit of the timestamp, the timestamp 0b1000 and 0b1001 will be mapped to the same bucket 0b1000. |
| 78 | +Then all messages in the same bucket will be redelivered at the same time. |
| 79 | +If user can accept 1024ms deviation of the redelivery time, we can trim the lower 10 bits of the timestamp, which can group a lot |
| 80 | +entries into the same bucket and reduce the memory occupation. |
| 81 | + |
| 82 | +following code snippet will be helpful to understand the design: |
| 83 | +```java |
| 84 | + static long trimLowerBit(long timestamp, int bits) { |
| 85 | + return timestamp & (-1L << bits); |
| 86 | + } |
| 87 | +``` |
| 88 | + |
| 89 | +```java |
| 90 | +Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> map = new Long2ObjectAVLTreeMap<>(); |
| 91 | +Long2ObjectMap<LongSet> ledgerMap = new Long2ObjectOpenHashMap<>(); |
| 92 | +LongSet entrySet = new LongOpenHashSet(); |
| 93 | +entrySet.add(entryId); |
| 94 | +ledgerMap.put(ledgerId, entrySet); |
| 95 | +map.put(timestamp, ledgerMap); |
| 96 | +``` |
| 97 | + |
| 98 | +### Configuration |
| 99 | + |
| 100 | +Add a new configuration `negativeAckPrecisionBitCnt` to control the precision of the redelivery time. |
| 101 | +``` |
| 102 | +@ApiModelProperty( |
| 103 | + name = "negativeAckPrecisionBitCnt", |
| 104 | + value = "The redelivery time precision bit count. The lower bits of the redelivery time will be\n" + |
| 105 | + "trimmed to reduce the memory occupation. The default value is 8, which means the redelivery time\n" + |
| 106 | + "will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later)\n" + |
| 107 | + "than the expected time. If the value is 0, the redelivery time will be accurate to ms.". |
| 108 | + ) |
| 109 | + private long negativeAckPrecisionBitCnt = 8; |
| 110 | +``` |
| 111 | +The higher the value, the more entries will be grouped into the same bucket, the less memory occupation, the less accurate the redelivery time. |
| 112 | +Default value is 8, which means the redelivery time will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later) |
| 113 | +than the expected time. |
| 114 | + |
| 115 | + |
| 116 | +## Space complexity analysis |
| 117 | +### Space complexity of `ConcurrentLongLongPairHashMap` |
| 118 | +Before analyzing the new data structure, we need to know how much space it take before this pip. |
| 119 | + |
| 120 | +We need to store 4 long field for `(ledgerId, entryId, partitionIndex, timestamp)` for each entry, which takes `4*8=32byte`. |
| 121 | +As `ConcurrentLongLongPairHashMap` use open hash addressing and linear probe to handle hash conflict, there are some |
| 122 | +redundant spaces to avoid high conflict rate. There are two configurations that control how much redundant space to reserver: |
| 123 | +`fill factor` and `idle factor`. When the space utility rate soar high to `fill factor`, the size of backing array will |
| 124 | +be double, when the space utility rate reduce to `idle factor`, the size of backing array will reduce by half. |
| 125 | + |
| 126 | +The default value of `fill factor` is 0.66, `idle factor` is 0.15, which means the min space occupation of |
| 127 | +`ConcurrentLongLongPairHashMap` is `32/0.66N byte = 48N byte`, the max space occupation is `32/0.15N byte=213N byte`, |
| 128 | +where N is the number of entries. |
| 129 | + |
| 130 | +In the experiment showed in the PR, there are 1 million entries in the map, taking up `32*1000000/1024/1024byte=30MB`, |
| 131 | +the space utility rate is 30/64=0.46, in the range of `[0.15, 0.66]`. |
| 132 | + |
| 133 | + |
| 134 | +### Space complexity of the new data structure |
| 135 | +The space used by new data structure is related to several factors: `message rate`, `the time deviation user accepted`, |
| 136 | +`the max entries written in one ledger`. |
| 137 | +- Pulsar conf `managedLedgerMaxEntriesPerLedger=50000` determine the max entries can be written into one ledger, |
| 138 | +we use the default value to analyze. |
| 139 | +- `the time deviation user accepted`: when user accept 1024ms delivery time deviation, we can trim the lower 10 bit |
| 140 | +of the timestamp in ms, which can bucket 1024 timestamp. |
| 141 | + |
| 142 | +Following we will analyze the space used by one bucket, and calculate the average space used by one entry. |
| 143 | + |
| 144 | +Assuming that the message rate is `x msg/ms`, and we trim `y bit` of the timestamp, one bucket will contains `2**x` ms, and |
| 145 | +`M=2**x*y` msgs. |
| 146 | +- For one single bucket, we only need to store one timestamp, which takes `8byte`. |
| 147 | +- Then, we need to store the ledgerId, when M is greater than 5w(`managedLedgerMaxEntriesPerLedger`), the ledger will switch. |
| 148 | +There are `L=ceil(M/50000)` ledgers, which take `8*L` byte. |
| 149 | +- Further, we analyze how much space the entry id takes. As there are `L=ceil(M/50000)` ledgers, there will be `L` bitmap to store, |
| 150 | +which take `L*size(bitmap)`. The total space consumed by new data structure is `8byte + 8L byte + L*size(bitmap)`. |
| 151 | + |
| 152 | +As the `size(bitmap)` is far more greater than `8byte`, we can ignore the first two items. Then we get the formular of space |
| 153 | +consumed **one bucket**: `D=L*size(bitmap)=ceil(M/50000)*size(bitmap)`. |
| 154 | + |
| 155 | +Entry id is stored in a `Roaring64Bitmap`, for simplicity we can replace it with `RoaringBitmap`, as the max entry id is 49999, |
| 156 | +which is smaller than `4294967296 (2 * Integer.MAX_VALUE)`(the max value can be stored in `RoaringBitmap`). The space consume |
| 157 | +by `RoaringBitmap` depends on how many elements it contains, when the size of bitmap < 4096, the space is `4N byte`, |
| 158 | +when the size of bitmap > 4096, the consumed space is a fixed value `8KB`. |
| 159 | + |
| 160 | +Then we get the final result: |
| 161 | +- when M>50000, `D = ceil(M/50000)*size(bitmap) ~= M/50000 * 8KB = M/50000 * 8 * 1024 byte = 0.163M byte`, |
| 162 | +each entry takes `0.163byte` by average. |
| 163 | +- when 4096<M<50000, `D = ceil(M/50000)*size(bitmap) = 1 * 8KB = 8KB`, each entry takes `8*1024/M=8192/M byte` by average. |
| 164 | +- when M<4096, `D = ceil(M/50000)*size(bitmap) = 1 * 4M byte = 4M byte`, each entry take `4 byte` by average. |
| 165 | + |
| 166 | +### Conclusion |
| 167 | +Assuming N is the number of entries, M is the number of messages in one bucket. |
| 168 | +- `ConcurrentLongLongPairHashMap`: `48N` byte in best case, `213N byte` in worst case. |
| 169 | +- New data structure: |
| 170 | + - when M>50000, `0.163N byte`. |
| 171 | + - when 4096<M<50000, `8192/M * N byte` . |
| 172 | + - when M<4096, `4N byte`. |
| 173 | + |
| 174 | +Some experiment results are showed in the PR, we can fine tune the configuration to get the best performance. |
| 175 | + |
| 176 | +## Effect |
| 177 | + |
| 178 | +### Memory occupation is high |
| 179 | +With such kind of design, we can reduce the memory occupation of `NegativeAcksTracker` to 1% less than the current implementation. |
| 180 | + |
| 181 | +### Code execution efficiency is low |
| 182 | +With the new design, we can avoid the iteration of all entries in `NegativeAcksTracker.nackedMessages`, and only iterate the entries |
| 183 | +that need to be redelivered. |
| 184 | + |
| 185 | +### Redelivery time is not accurate |
| 186 | +With the new design, we avoid the fixed interval of the redelivery check time. We can control the precision of the redelivery time |
| 187 | +by trimming the lower bits of the timestamp. If user can accept 1024ms deviation of the redelivery time, we can trim the lower |
| 188 | +10 bits of the timestamp, which can group a lot |
| 189 | + |
| 190 | +### Multiple negative ack for messages in the same entry(batch) will interfere with each other |
| 191 | +With the new design, if we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two nacks will not interfere |
| 192 | +with each other, as they are stored in different buckets. |
| 193 | + |
| 194 | + |
| 195 | +## High-Level Design |
| 196 | +As this pip introduce new dependency `fastutil` into client, which is very large(23MB), while few classes are used, we need to |
| 197 | +reduce the size of the dependency. |
| 198 | + |
| 199 | +Though there is alternative dependency `fastutil-core`, which is smaller(6MB), but it is also |
| 200 | +relatively large and using `fastutil-core` will introduce another problem on the broker side since there's already `fastutil` jar |
| 201 | +which also includes `fastutil-core` jar classes. |
| 202 | + |
| 203 | +The optimal solution would be to include only the classes from fastutil into the shaded pulsar-client and pulsar-client-all |
| 204 | +which are really used and needed. This could be achieved in many ways. One possible solution is to introduce an intermediate |
| 205 | +module for shaded pulsar-client and pulsar-client-all that isn't published to maven central at all. |
| 206 | +It would be used to minimize and include only the classes from fastutil which are required by pulsar-client shading. |
| 207 | + |
| 208 | + |
| 209 | + |
| 210 | +# Backward & Forward Compatibility |
| 211 | + |
| 212 | +## Upgrade |
| 213 | + |
| 214 | +User can upgrade to the new version without any compatibility issue. |
| 215 | + |
| 216 | +## Downgrade / Rollback |
| 217 | + |
| 218 | +User can downgrade to the old version without any compatibility issue. |
| 219 | + |
| 220 | +# Links |
| 221 | + |
| 222 | +<!-- |
| 223 | +Updated afterwards |
| 224 | +--> |
| 225 | +* Mailing List discussion thread: https://lists.apache.org/thread/yojl7ylk7cyjxktq3cn8849hvmyv0fg8 |
| 226 | +* Mailing List voting thread: https://lists.apache.org/thread/hyc1r2s9chowdhck53lq07tznopt50dy |
0 commit comments