Skip to content

Commit aa4e01f

Browse files
committed
Fix kamino reserve oracle configuration change detection.
1 parent 9896a4c commit aa4e01f

File tree

9 files changed

+211
-127
lines changed

9 files changed

+211
-127
lines changed

gradle/sava.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ developerName="Jim"
55
developerId="jpe7s"
66
developerEmail="james@glam.systems"
77
javaVersion=25
8-
solanaBOMVersion=25.12.15
8+
solanaBOMVersion=25.12.17

services/src/main/java/systems/glam/services/db/sql/BatchSqlExecutor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@ static <T> BatchSqlExecutor<T> create(final Class<T> componentType,
2020
);
2121
}
2222

23+
void awaitBatchComplete() throws InterruptedException;
24+
2325
void queue(final T item);
2426
}

services/src/main/java/systems/glam/services/db/sql/BatchSqlExecutorImpl.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ final class BatchSqlExecutorImpl<T> implements BatchSqlExecutor<T> {
3030
private final ReentrantLock lock;
3131
private final Condition startWindow;
3232
private final Condition batchLimit;
33+
private final Condition batchCompleteCondition;
34+
private volatile boolean batchComplete;
3335

3436
BatchSqlExecutorImpl(final Class<T> componentType,
3537
final SqlDataSource datasource,
@@ -49,22 +51,27 @@ final class BatchSqlExecutorImpl<T> implements BatchSqlExecutor<T> {
4951
this.lock = new ReentrantLock();
5052
this.startWindow = lock.newCondition();
5153
this.batchLimit = lock.newCondition();
54+
this.batchCompleteCondition = lock.newCondition();
55+
this.batchComplete = true;
5256
}
5357

5458
@Override
5559
public void run() {
5660
try {
5761
//noinspection unchecked
5862
final T[] batch = (T[]) Array.newInstance(componentType, batchSize);
59-
int numItems = 0, numInserted = 0;
63+
int numItems = 0, numInserted;
6064
for (long errorCount = 0, remainingNanos; ; ) {
6165
if (pending.size() < batchSize) {
6266
Arrays.fill(batch, null);
6367
lock.lock();
6468
try {
6569
while (pending.isEmpty()) {
70+
this.batchComplete = true;
71+
this.batchCompleteCondition.signalAll();
6672
startWindow.await();
6773
}
74+
this.batchComplete = false;
6875
for (remainingNanos = batchDelayNanos; pending.size() < batchSize && remainingNanos > 0; ) {
6976
remainingNanos = batchLimit.awaitNanos(remainingNanos);
7077
}
@@ -122,6 +129,20 @@ public void run() {
122129
}
123130
}
124131

132+
@Override
133+
public void awaitBatchComplete() throws InterruptedException {
134+
if (!this.batchComplete) {
135+
lock.lock();
136+
try {
137+
while (!this.batchComplete) {
138+
this.batchCompleteCondition.await();
139+
}
140+
} finally {
141+
lock.unlock();
142+
}
143+
}
144+
}
145+
125146
static int batchExecutionCount(final int[] result) throws SQLException {
126147
int sum = 0;
127148
for (int i = 0; i < result.length; ++i) {

services/src/main/java/systems/glam/services/integrations/kamino/KaminoCache.java

Lines changed: 18 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.concurrent.ConcurrentHashMap;
3131
import java.util.concurrent.ConcurrentMap;
3232
import java.util.function.Consumer;
33-
import java.util.stream.Collectors;
3433

3534
import static java.nio.file.StandardOpenOption.*;
3635
import static software.sava.core.accounts.PublicKey.PUBLIC_KEY_LENGTH;
@@ -112,7 +111,6 @@ static CompletableFuture<KaminoCache> initService(final RpcCaller rpcCaller,
112111
final byte[] nullKeyBytes = PublicKey.NONE.toByteArray();
113112
final byte[] nilKeyBytes = KaminoAccounts.NULL_KEY.toByteArray();
114113

115-
final var priceFeedsNeeded = new HashSet<PublicKey>();
116114
final var noMappings = Map.<PublicKey, MappingsContext>of();
117115
for (final var reserveAccountInfo : reserveAccounts) {
118116
final byte[] data = reserveAccountInfo.data();
@@ -125,9 +123,6 @@ static CompletableFuture<KaminoCache> initService(final RpcCaller rpcCaller,
125123
)) {
126124
final var reserveContext = ReserveContext.createContext(reserveAccountInfo, noMappings);
127125
reserveContextMap.put(reserveContext.pubKey(), reserveContext);
128-
} else {
129-
final var priceFeedKey = PublicKey.readPubKey(data, priceFeedKeyFromOffset);
130-
priceFeedsNeeded.add(priceFeedKey);
131126
}
132127
}
133128

@@ -162,13 +157,15 @@ static CompletableFuture<KaminoCache> initService(final RpcCaller rpcCaller,
162157
vaultStateMap.put(vaultStateContext.sharesMint(), vaultStateContext);
163158
}
164159

165-
return new KaminoCacheImpl(
160+
final var cache = new KaminoCacheImpl(
166161
notifyClient,
167162
rpcCaller,
168163
accountFetcher,
169164
kLendProgram,
170165
scopeProgram,
171-
kVaultsProgram, KVaultsRequest,
166+
kVaultsProgram,
167+
reserveAccountsRequest,
168+
KVaultsRequest,
172169
pollingDelay,
173170
null,
174171
null,
@@ -178,6 +175,8 @@ static CompletableFuture<KaminoCache> initService(final RpcCaller rpcCaller,
178175
reserveContextMap,
179176
vaultStateMap
180177
);
178+
accountFetcher.listenToAll(cache);
179+
return cache;
181180
});
182181
}
183182

@@ -207,6 +206,12 @@ static CompletableFuture<KaminoCache> initService(final Path kaminoAccountsPath,
207206
"rpcClient#getKaminoVaultAccounts"
208207
);
209208

209+
final var reserveAccountsRequest = ProgramAccountsRequest.build()
210+
.filters(List.of(Reserve.SIZE_FILTER, Reserve.DISCRIMINATOR_FILTER))
211+
.programId(kLendProgram)
212+
.dataSliceLength(0, Reserve.CONFIG_PADDING_OFFSET)
213+
.createRequest();
214+
210215
final ConcurrentMap<PublicKey, ReserveContext> reserveContextMap;
211216
final List<AccountInfo<byte[]>> reserveAccounts;
212217
final Set<PublicKey> priceFeedsNeeded;
@@ -215,11 +220,6 @@ static CompletableFuture<KaminoCache> initService(final Path kaminoAccountsPath,
215220
reserveAccounts = List.of();
216221
priceFeedsNeeded = Set.of();
217222
} else {
218-
final var reserveAccountsRequest = ProgramAccountsRequest.build()
219-
.filters(List.of(Reserve.SIZE_FILTER, Reserve.DISCRIMINATOR_FILTER))
220-
.programId(kLendProgram)
221-
.dataSliceLength(0, Reserve.CONFIG_PADDING_OFFSET)
222-
.createRequest();
223223
reserveAccounts = rpcCaller.courteousGet(
224224
rpcClient -> rpcClient.getProgramAccounts(reserveAccountsRequest),
225225
"rpcClient#getKaminoReserves"
@@ -342,13 +342,15 @@ static CompletableFuture<KaminoCache> initService(final Path kaminoAccountsPath,
342342

343343
// analyzeMarkets(reserveContextMap);
344344

345-
return new KaminoCacheImpl(
345+
final var cache = new KaminoCacheImpl(
346346
notifyClient,
347347
rpcCaller,
348348
accountFetcher,
349349
kLendProgram,
350350
scopeProgram,
351-
kVaultsProgram, KVaultsRequest,
351+
kVaultsProgram,
352+
reserveAccountsRequest,
353+
KVaultsRequest,
352354
pollingDelay,
353355
configurationsPath,
354356
mappingsPath,
@@ -358,112 +360,14 @@ static CompletableFuture<KaminoCache> initService(final Path kaminoAccountsPath,
358360
reserveContextMap,
359361
vaultStateMap
360362
);
363+
accountFetcher.listenToAll(cache);
364+
return cache;
361365
} catch (final IOException e) {
362366
throw new UncheckedIOException(e);
363367
}
364368
});
365369
}
366370

367-
private static void analyzeMarkets(final Map<PublicKey, ReserveContext> reserveContextMap) {
368-
final var byMarket = reserveContextMap.values().stream()
369-
.collect(Collectors.groupingBy(ReserveContext::market));
370-
371-
record MarketOracles(PublicKey market, Set<PublicKey> switchboard, Set<PublicKey> pyth, Set<PublicKey> scope) {
372-
373-
String toJson() {
374-
return String.format("""
375-
{
376-
"market": "%s",
377-
"switchboard": [%s],
378-
"pyth": [%s],
379-
"scope": [%s]
380-
}""",
381-
market.toBase58(),
382-
switchboard.isEmpty() ? "" : switchboard.stream().map(PublicKey::toBase58).collect(Collectors.joining("\",\"", "\"", "\"")),
383-
pyth.isEmpty() ? "" : pyth.stream().map(PublicKey::toBase58).collect(Collectors.joining("\",\"", "\"", "\"")),
384-
scope.isEmpty() ? "" : scope.stream().map(PublicKey::toBase58).collect(Collectors.joining("\",\"", "\"", "\""))
385-
);
386-
}
387-
}
388-
389-
final class MarketOraclesBuilder {
390-
391-
Set<PublicKey> switchboard;
392-
Set<PublicKey> pyth;
393-
Set<PublicKey> scope;
394-
395-
int addAccounts(final TokenInfo tokenInfo) {
396-
int numOracleSources = 0;
397-
final var priceFeed = tokenInfo.scopeConfiguration().priceFeed();
398-
if (!priceFeed.equals(PublicKey.NONE) && !priceFeed.equals(KaminoAccounts.NULL_KEY)) {
399-
if (scope == null) {
400-
scope = new HashSet<>();
401-
}
402-
scope.add(priceFeed);
403-
++numOracleSources;
404-
}
405-
final var switchboard = tokenInfo.switchboardConfiguration().priceAggregator();
406-
if (!switchboard.equals(PublicKey.NONE) && !switchboard.equals(KaminoAccounts.NULL_KEY)) {
407-
if (this.switchboard == null) {
408-
this.switchboard = new HashSet<>();
409-
}
410-
this.switchboard.add(switchboard);
411-
++numOracleSources;
412-
}
413-
final var pyth = tokenInfo.pythConfiguration().price();
414-
if (!pyth.equals(PublicKey.NONE) && !pyth.equals(KaminoAccounts.NULL_KEY)) {
415-
if (this.pyth == null) {
416-
this.pyth = new HashSet<>();
417-
}
418-
this.pyth.add(pyth);
419-
++numOracleSources;
420-
}
421-
return numOracleSources;
422-
}
423-
424-
int numAccounts() {
425-
int numAccounts = 0;
426-
if (switchboard != null) {
427-
numAccounts += switchboard.size();
428-
}
429-
if (pyth != null) {
430-
numAccounts += pyth.size();
431-
}
432-
if (scope != null) {
433-
numAccounts += scope.size();
434-
}
435-
return numAccounts;
436-
}
437-
438-
MarketOracles build(final PublicKey market) {
439-
return new MarketOracles(
440-
market,
441-
switchboard == null ? Set.of() : Set.copyOf(switchboard),
442-
pyth == null ? Set.of() : Set.copyOf(pyth),
443-
scope == null ? Set.of() : Set.copyOf(scope)
444-
);
445-
}
446-
}
447-
448-
final var marketOracles = HashMap.<PublicKey, MarketOracles>newHashMap(byMarket.size());
449-
450-
for (final var entry : byMarket.entrySet()) {
451-
final var market = entry.getKey();
452-
final var builder = new MarketOraclesBuilder();
453-
for (final var reserveContext : entry.getValue()) {
454-
final var tokenInfo = reserveContext.tokenInfo();
455-
builder.addAccounts(tokenInfo);
456-
}
457-
marketOracles.put(market, builder.build(market));
458-
}
459-
460-
final var json = marketOracles.values().stream()
461-
.map(MarketOracles::toJson)
462-
.map(indented -> indented.indent(2).stripTrailing())
463-
.collect(Collectors.joining(",\n", "[\n", "\n]"));
464-
System.out.println(json);
465-
}
466-
467371
private static void loadReserves(final Path reservesJsonFilePath,
468372
final Map<PublicKey, MappingsContext> mappingsContextByPriceFeed,
469373
final Map<PublicKey, ReserveContext> reserveContexts) throws IOException {

0 commit comments

Comments
 (0)