Skip to content

Commit 360f43f

Browse files
authored
Clean up market processors (#803)
1 parent 93c47da commit 360f43f

22 files changed

+282
-2453
lines changed

src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/CandleProcessor.kt

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package exchange.dydx.abacus.processor.markets
33
import exchange.dydx.abacus.output.MarketCandle
44
import exchange.dydx.abacus.processor.base.BaseProcessor
55
import exchange.dydx.abacus.protocols.ParserProtocol
6-
import exchange.dydx.abacus.utils.ParsingHelper.Companion.transform
76
import indexer.codegen.IndexerCandleResponseObject
87

98
/*
@@ -43,25 +42,6 @@ internal interface CandleProcessorProtocol {
4342
internal class CandleProcessor(
4443
parser: ParserProtocol
4544
) : BaseProcessor(parser), CandleProcessorProtocol {
46-
private val candleKeyMap = mapOf(
47-
"double" to mapOf(
48-
"low" to "low",
49-
"high" to "high",
50-
"open" to "open",
51-
"close" to "close",
52-
"baseTokenVolume" to "baseTokenVolume",
53-
"usdVolume" to "usdVolume",
54-
"startingOpenInterest" to "startingOpenInterest",
55-
),
56-
"datetime" to mapOf(
57-
"startedAt" to "startedAt",
58-
"updatedAt" to "updatedAt",
59-
),
60-
"int" to mapOf(
61-
"trades" to "trades",
62-
),
63-
)
64-
6545
override fun process(
6646
payload: IndexerCandleResponseObject?
6747
): MarketCandle? {
@@ -91,11 +71,4 @@ internal class CandleProcessor(
9171
trades = parser.asInt(payload.trades),
9272
)
9373
}
94-
95-
override fun received(
96-
existing: Map<String, Any>?,
97-
payload: Map<String, Any>
98-
): Map<String, Any> {
99-
return transform(existing, payload, candleKeyMap)
100-
}
10174
}

src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/CandlesProcessor.kt

Lines changed: 0 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package exchange.dydx.abacus.processor.markets
33
import exchange.dydx.abacus.processor.base.BaseProcessor
44
import exchange.dydx.abacus.protocols.ParserProtocol
55
import exchange.dydx.abacus.state.internalstate.InternalMarketState
6-
import exchange.dydx.abacus.utils.mutable
7-
import exchange.dydx.abacus.utils.safeSet
86
import indexer.codegen.IndexerCandleResponse
97
import indexer.codegen.IndexerCandleResponseObject
108
import kotlinx.datetime.Instant
@@ -95,88 +93,4 @@ internal class CandlesProcessor(
9593
}
9694
return existing
9795
}
98-
99-
override fun received(
100-
existing: List<Any>?,
101-
payload: List<Any>
102-
): List<Any>? {
103-
return mergeDeprecated(
104-
parser,
105-
existing,
106-
parser.asNativeList(payload)?.reversed()?.toList(),
107-
"startedAt",
108-
true,
109-
)
110-
}
111-
112-
internal fun subscribed(
113-
existing: Map<String, Any>?,
114-
resolution: String,
115-
content: Map<String, Any>,
116-
): Map<String, Any>? {
117-
val payload =
118-
parser.asNativeList(content["candles"])
119-
return if (payload != null) {
120-
receivedChanges(existing, resolution, payload)
121-
} else {
122-
existing
123-
}
124-
}
125-
126-
@Suppress("FunctionName")
127-
internal fun channel_data(
128-
existing: Map<String, Any>?,
129-
resolution: String,
130-
content: Map<String, Any>,
131-
): Map<String, Any>? {
132-
// content is a single candle update
133-
if (content != null) {
134-
val modified = existing?.mutable() ?: mutableMapOf()
135-
val existingResolution = parser.asNativeList(existing?.get(resolution))
136-
val candles = existingResolution?.mutable() ?: mutableListOf()
137-
val lastExisting = parser.asNativeMap(candles.lastOrNull())
138-
val lastStartAt = parser.asDatetime(lastExisting?.get("startedAt"))
139-
val itemProcessor = itemProcessor as CandleProcessor
140-
val incoming = itemProcessor.received(null, content)
141-
val incomingStartAt = parser.asDatetime(incoming["startedAt"])
142-
if (lastStartAt == incomingStartAt) {
143-
candles.removeLast()
144-
}
145-
candles.add(incoming)
146-
modified.safeSet(resolution, candles)
147-
return modified
148-
} else {
149-
return existing
150-
}
151-
}
152-
153-
private fun receivedChanges(
154-
existing: Map<String, Any>?,
155-
resolution: String,
156-
payload: List<Any>?,
157-
): Map<String, Any>? {
158-
if (payload != null) {
159-
val modified = existing?.mutable() ?: mutableMapOf()
160-
val existingResolution = parser.asNativeList(existing?.get(resolution))
161-
val candles = mutableListOf<Any>()
162-
for (value in payload.reversed()) {
163-
parser.asNativeMap(value)?.let {
164-
val candleProcessor = itemProcessor as CandleProcessor
165-
val candle = candleProcessor.received(null, it)
166-
candles.add(candle)
167-
}
168-
}
169-
val mergedResolution = mergeDeprecated(
170-
parser,
171-
existingResolution,
172-
candles,
173-
"startedAt",
174-
true,
175-
)
176-
modified.safeSet(resolution, mergedResolution)
177-
return modified
178-
} else {
179-
return existing
180-
}
181-
}
18296
}

src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/HistoricalFundingProcessor.kt

Lines changed: 0 additions & 20 deletions
This file was deleted.

src/commonMain/kotlin/exchange.dydx.abacus/processor/markets/HistoricalFundingsProcessor.kt

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,6 @@ import kotlinx.datetime.Instant
99

1010
@Suppress("UNCHECKED_CAST")
1111
internal class HistoricalFundingsProcessor(parser: ParserProtocol) : BaseProcessor(parser) {
12-
private val itemProcessor = HistoricalFundingProcessor(parser = parser)
13-
internal fun received(
14-
existing: List<Any>?,
15-
content: Map<String, Any>,
16-
): List<Any>? {
17-
val payload = parser.asNativeList(content["historicalFunding"]) as? List<Map<String, Any>>
18-
return if (payload != null) receivedList(existing, payload) else null
19-
}
20-
2112
fun process(
2213
existing: InternalMarketState,
2314
payload: List<IndexerHistoricalFundingResponseObject>?,
@@ -51,28 +42,4 @@ internal class HistoricalFundingsProcessor(parser: ParserProtocol) : BaseProcess
5142

5243
return existing
5344
}
54-
55-
private fun receivedList(
56-
existing: List<Any>?,
57-
payload: List<Any>?,
58-
): List<Any>? {
59-
if (payload != null) {
60-
val history = mutableListOf<Any>()
61-
for (value in payload) {
62-
parser.asNativeMap(value)?.let {
63-
val item = itemProcessor.received(null, it)
64-
history.add(item)
65-
}
66-
}
67-
return mergeDeprecated(
68-
parser,
69-
existing,
70-
history.reversed().toList(),
71-
"effectiveAt",
72-
true,
73-
)
74-
} else {
75-
return existing
76-
}
77-
}
7845
}

0 commit comments

Comments
 (0)