Skip to content

Commit f75344d

Browse files
committed
Add DriftListener to notify of critical market configuration changes.
1 parent 82b8898 commit f75344d

File tree

8 files changed

+507
-197
lines changed

8 files changed

+507
-197
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package systems.glam.services.integrations.drift;
2+
3+
import software.sava.core.accounts.PublicKey;
4+
5+
import java.util.Set;
6+
7+
public interface DriftListener {
8+
9+
PublicKey key();
10+
11+
void onCriticalPerpMarketChange(final Set<DriftMarketChange> changes,
12+
final DriftPerpMarketContext previous,
13+
final DriftPerpMarketContext latest);
14+
15+
void onCriticalSpotMarketChange(final Set<DriftMarketChange> changes,
16+
final DriftSpotMarketContext previous,
17+
final DriftSpotMarketContext latest);
18+
}

services/src/main/java/systems/glam/services/integrations/drift/DriftMarketCache.java

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package systems.glam.services.integrations.drift;
22

33
import software.sava.core.accounts.PublicKey;
4-
import software.sava.core.encoding.ByteUtil;
54
import software.sava.core.rpc.Filter;
65
import software.sava.idl.clients.drift.DriftAccounts;
76
import software.sava.idl.clients.drift.gen.types.MarketStatus;
87
import software.sava.idl.clients.drift.gen.types.PerpMarket;
98
import software.sava.idl.clients.drift.gen.types.SpotMarket;
109
import software.sava.rpc.json.http.response.AccountInfo;
10+
import software.sava.rpc.json.http.ws.SolanaRpcWebsocket;
1111
import software.sava.services.solana.remote.call.RpcCaller;
1212
import systems.glam.services.rpc.AccountFetcher;
1313

@@ -24,7 +24,7 @@
2424
import static java.lang.System.Logger.Level.ERROR;
2525
import static systems.glam.services.io.FileUtils.ACCOUNT_FILE_EXTENSION;
2626

27-
public interface DriftMarketCache {
27+
public interface DriftMarketCache extends Runnable {
2828

2929
static CompletableFuture<DriftMarketCache> initCache(final Path driftCacheDirectory,
3030
final DriftAccounts driftAccounts,
@@ -33,7 +33,7 @@ static CompletableFuture<DriftMarketCache> initCache(final Path driftCacheDirect
3333
final var driftProgram = driftAccounts.driftProgram();
3434
try {
3535
final var spotMarketsDirectory = driftCacheDirectory.resolve("spot_markets");
36-
final CompletableFuture<AtomicReferenceArray<DriftMarketContext>> spotMarketsFuture;
36+
final CompletableFuture<AtomicReferenceArray<DriftSpotMarketContext>> spotMarketsFuture;
3737
if (Files.exists(spotMarketsDirectory)) {
3838
spotMarketsFuture = loadSpotMarkets(spotMarketsDirectory, rpcCaller, driftAccounts);
3939
} else {
@@ -42,7 +42,7 @@ static CompletableFuture<DriftMarketCache> initCache(final Path driftCacheDirect
4242
}
4343

4444
final var perpMarketsDirectory = driftCacheDirectory.resolve("perp_markets");
45-
final CompletableFuture<AtomicReferenceArray<DriftMarketContext>> perpMarketsFuture;
45+
final CompletableFuture<AtomicReferenceArray<DriftPerpMarketContext>> perpMarketsFuture;
4646
if (Files.exists(perpMarketsDirectory)) {
4747
perpMarketsFuture = loadPerpMarkets(perpMarketsDirectory, rpcCaller, driftAccounts);
4848
} else {
@@ -63,43 +63,31 @@ static CompletableFuture<DriftMarketCache> initCache(final Path driftCacheDirect
6363
}
6464
}
6565

66-
private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> loadSpotMarkets(final Path marketsDirectory,
67-
final RpcCaller rpcCaller,
68-
final DriftAccounts driftAccounts) {
66+
private static CompletableFuture<AtomicReferenceArray<DriftSpotMarketContext>> loadSpotMarkets(final Path marketsDirectory,
67+
final RpcCaller rpcCaller,
68+
final DriftAccounts driftAccounts) {
6969
final var driftProgram = driftAccounts.driftProgram();
7070
return loadMarkets(
7171
marketsDirectory,
72-
data -> {
73-
final var marketKey = PublicKey.readPubKey(data, SpotMarket.PUBKEY_OFFSET);
74-
final var oracle = PublicKey.readPubKey(data, SpotMarket.ORACLE_OFFSET);
75-
final int marketIndex = ByteUtil.getInt16LE(data, SpotMarket.MARKET_INDEX_OFFSET);
76-
final int poolId = data[SpotMarket.POOL_ID_OFFSET] & 0xFF;
77-
return DriftMarketContext.createContext(poolId, marketIndex, marketKey, oracle);
78-
},
72+
data -> DriftSpotMarketContext.createContext(0, data),
7973
() -> fetchSpotMarkets(marketsDirectory, rpcCaller, driftProgram)
8074
);
8175
}
8276

83-
private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> loadPerpMarkets(final Path marketsDirectory,
84-
final RpcCaller rpcCaller,
85-
final DriftAccounts driftAccounts) {
77+
private static CompletableFuture<AtomicReferenceArray<DriftPerpMarketContext>> loadPerpMarkets(final Path marketsDirectory,
78+
final RpcCaller rpcCaller,
79+
final DriftAccounts driftAccounts) {
8680
final var driftProgram = driftAccounts.driftProgram();
8781
return loadMarkets(
8882
marketsDirectory,
89-
data -> {
90-
final var marketKey = PublicKey.readPubKey(data, PerpMarket.PUBKEY_OFFSET);
91-
final var oracle = PublicKey.readPubKey(data, PerpMarket.AMM_OFFSET);
92-
final int marketIndex = ByteUtil.getInt16LE(data, PerpMarket.MARKET_INDEX_OFFSET);
93-
final int poolId = data[PerpMarket.POOL_ID_OFFSET] & 0xFF;
94-
return DriftMarketContext.createContext(poolId, marketIndex, marketKey, oracle);
95-
},
83+
data -> DriftPerpMarketContext.createContext(0, data),
9684
() -> fetchPerpMarkets(marketsDirectory, rpcCaller, driftProgram)
9785
);
9886
}
9987

100-
private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> loadMarkets(final Path marketsDirectory,
101-
final Function<byte[], DriftMarketContext> contextFactory,
102-
final Supplier<CompletableFuture<AtomicReferenceArray<DriftMarketContext>>> fallback) {
88+
private static <T extends DriftMarketContext> CompletableFuture<AtomicReferenceArray<T>> loadMarkets(final Path marketsDirectory,
89+
final Function<byte[], T> contextFactory,
90+
final Supplier<CompletableFuture<AtomicReferenceArray<T>>> fallback) {
10391
try (final var files = Files.list(marketsDirectory)) {
10492
final var datFiles = files
10593
.filter(p -> p.getFileName().toString().endsWith(ACCOUNT_FILE_EXTENSION))
@@ -109,9 +97,9 @@ private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> loadM
10997
return fallback.get();
11098
}
11199

112-
final var marketsArray = new AtomicReferenceArray<DriftMarketContext>(datFiles.size() << 1);
100+
final var marketsArray = new AtomicReferenceArray<T>(datFiles.size() << 1);
113101
datFiles.parallelStream().forEach(datFile -> {
114-
final DriftMarketContext marketContext;
102+
final T marketContext;
115103
try {
116104
final byte[] data = Files.readAllBytes(datFile);
117105
marketContext = contextFactory.apply(data);
@@ -139,10 +127,10 @@ static List<Filter> spotMarketFilters() {
139127
);
140128
}
141129

142-
private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> fetchSpotMarkets(final Path marketsDirectory,
143-
final RpcCaller rpcCaller,
144-
final PublicKey driftProgram) {
145-
return fetchMarkets(marketsDirectory, rpcCaller, spotMarketFilters(), driftProgram, DriftMarketContext::createSpotContext);
130+
private static CompletableFuture<AtomicReferenceArray<DriftSpotMarketContext>> fetchSpotMarkets(final Path marketsDirectory,
131+
final RpcCaller rpcCaller,
132+
final PublicKey driftProgram) {
133+
return fetchMarkets(marketsDirectory, rpcCaller, spotMarketFilters(), driftProgram, DriftSpotMarketContext::createContext);
146134
}
147135

148136
static List<Filter> perpMarketFilters() {
@@ -153,25 +141,25 @@ static List<Filter> perpMarketFilters() {
153141
);
154142
}
155143

156-
private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> fetchPerpMarkets(final Path marketsDirectory,
157-
final RpcCaller rpcCaller,
158-
final PublicKey driftProgram) {
159-
return fetchMarkets(marketsDirectory, rpcCaller, perpMarketFilters(), driftProgram, DriftMarketContext::createPerpContext);
144+
private static CompletableFuture<AtomicReferenceArray<DriftPerpMarketContext>> fetchPerpMarkets(final Path marketsDirectory,
145+
final RpcCaller rpcCaller,
146+
final PublicKey driftProgram) {
147+
return fetchMarkets(marketsDirectory, rpcCaller, perpMarketFilters(), driftProgram, DriftPerpMarketContext::createContext);
160148
}
161149

162-
private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> fetchMarkets(final Path marketsDirectory,
163-
final RpcCaller rpcCaller,
164-
final List<Filter> filters,
165-
final PublicKey driftProgram,
166-
final Function<AccountInfo<byte[]>, DriftMarketContext> contextFactory) {
150+
private static <T extends DriftMarketContext> CompletableFuture<AtomicReferenceArray<T>> fetchMarkets(final Path marketsDirectory,
151+
final RpcCaller rpcCaller,
152+
final List<Filter> filters,
153+
final PublicKey driftProgram,
154+
final Function<AccountInfo<byte[]>, T> contextFactory) {
167155
final var fetchFuture = rpcCaller.courteousCall(
168156
rpcClient -> rpcClient.getProgramAccounts(driftProgram, filters),
169157
"rpcClient::getDriftSpotMarkets"
170158
);
171159
return fetchFuture.thenApplyAsync(accounts -> {
172-
final var marketsArray = new AtomicReferenceArray<DriftMarketContext>(accounts.size() << 1);
160+
final var marketsArray = new AtomicReferenceArray<T>(accounts.size() << 1);
173161
accounts.parallelStream().forEach(accountInfo -> {
174-
final DriftMarketContext marketContext;
162+
final T marketContext;
175163
try {
176164
marketContext = contextFactory.apply(accountInfo);
177165
} catch (final RuntimeException ex) {
@@ -190,11 +178,33 @@ private static CompletableFuture<AtomicReferenceArray<DriftMarketContext>> fetch
190178

191179
DriftAccounts driftAccounts();
192180

193-
DriftMarketContext spotMarket(final int marketIndex);
181+
DriftSpotMarketContext spotMarket(final int marketIndex);
194182

195-
DriftMarketContext perpMarket(final int marketIndex);
183+
DriftPerpMarketContext perpMarket(final int marketIndex);
196184

197185
void refreshSpotMarket(final int marketIndex);
198186

199187
void refreshPerpMarket(final int marketIndex);
188+
189+
void subscribeToCriticalMarketChanges(final DriftListener listener);
190+
191+
void unSubscribeToCriticalMarketChanges(final DriftListener listener);
192+
193+
void subscribeToCriticalPerpMarketChanges(final PublicKey market, final DriftListener listener);
194+
195+
void unSubscribeToCriticalPerpMarketChanges(final PublicKey market, final DriftListener listener);
196+
197+
void subscribeToCriticalSpotMarketChanges(final PublicKey market, final DriftListener listener);
198+
199+
void unSubscribeToCriticalSpotMarketChanges(final PublicKey market, final DriftListener listener);
200+
201+
void subscribeToCriticalPerpMarketChanges(final int marketIndex, final DriftListener listener);
202+
203+
void unSubscribeToCriticalPerpMarketChanges(final int marketIndex, final DriftListener listener);
204+
205+
void subscribeToCriticalSpotMarketChanges(final int marketIndex, final DriftListener listener);
206+
207+
void unSubscribeToCriticalSpotMarketChanges(final int marketIndex, final DriftListener listener);
208+
209+
void subscribe(SolanaRpcWebsocket websocket);
200210
}

0 commit comments

Comments
 (0)