Skip to content

Commit c1bb57a

Browse files
committed
Add clickhouseEngine
1 parent b76bfa0 commit c1bb57a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+358
-58
lines changed

.vscode/launch.json

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,74 +4,67 @@
44
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
55
"version": "0.2.0",
66
"configurations": [
7+
{
8+
"type": "node",
9+
"request": "attach",
10+
"name": "Attach to Node.js",
11+
"address": "127.0.0.1",
12+
"port": 9229,
13+
"restart": true,
14+
"timeout": 30000
15+
},
716
{
817
"type": "node",
918
"request": "launch",
1019
"name": "indexQuery",
1120
"args": [
1221
"-r",
1322
"sucrase/register",
14-
"${workspaceFolder}/src/indexQuery.ts",
15-
23+
"${workspaceFolder}/src/indexQuery.ts"
1624
],
1725
"internalConsoleOptions": "openOnSessionStart",
18-
"skipFiles": [
19-
"<node_internals>/**"
20-
]
26+
"skipFiles": ["<node_internals>/**"]
2127
},
2228
{
2329
"type": "node",
2430
"request": "launch",
2531
"name": "indexRates",
26-
"args":[
32+
"args": [
2733
"-r",
2834
"sucrase/register",
29-
"${workspaceFolder}/src/indexRates.ts",
35+
"${workspaceFolder}/src/indexRates.ts"
3036
],
31-
"skipFiles": [
32-
"<node_internals>/**"
33-
]
37+
"skipFiles": ["<node_internals>/**"]
3438
},
3539
{
3640
"type": "node",
3741
"request": "launch",
3842
"name": "indexApi",
39-
"args": [
40-
"-r",
41-
"sucrase/register",
42-
"${workspaceFolder}/src/indexApi.ts",
43-
44-
],
43+
"args": ["-r", "sucrase/register", "${workspaceFolder}/src/indexApi.ts"],
4544
"internalConsoleOptions": "openOnSessionStart",
46-
"skipFiles": [
47-
"<node_internals>/**"
48-
]
45+
"skipFiles": ["<node_internals>/**"]
4946
},
5047
{
5148
"type": "node",
5249
"request": "launch",
5350
"name": "indexCache",
54-
"args":[
51+
"args": [
5552
"-r",
5653
"sucrase/register",
57-
"${workspaceFolder}/src/indexCache.ts",
54+
"${workspaceFolder}/src/indexCache.ts"
5855
],
59-
"skipFiles": [
60-
"<node_internals>/**"
61-
]
56+
"skipFiles": ["<node_internals>/**"]
6257
},
6358
{
6459
"type": "node",
6560
"request": "launch",
6661
"name": "lifiReporter",
67-
"args":[
62+
"args": [
6863
"-r",
6964
"sucrase/register",
70-
"${workspaceFolder}/src/bin/lifiReporter.ts",
65+
"${workspaceFolder}/src/bin/lifiReporter.ts"
7166
],
72-
"skipFiles": [
73-
"<node_internals>/**"
74-
]
67+
"skipFiles": ["<node_internals>/**"]
7568
},
7669
{
7770
"type": "node",
@@ -80,13 +73,10 @@
8073
"args": [
8174
"-r",
8275
"sucrase/register",
83-
"${workspaceFolder}/src/bin/testpartner.ts",
84-
76+
"${workspaceFolder}/src/bin/testpartner.ts"
8577
],
8678
"internalConsoleOptions": "openOnSessionStart",
87-
"skipFiles": [
88-
"<node_internals>/**"
89-
]
79+
"skipFiles": ["<node_internals>/**"]
9080
},
9181
{
9282
"type": "node",
@@ -95,13 +85,10 @@
9585
"args": [
9686
"-r",
9787
"sucrase/register",
98-
"${workspaceFolder}/src/bin/partnerTotals.ts",
99-
88+
"${workspaceFolder}/src/bin/partnerTotals.ts"
10089
],
10190
"internalConsoleOptions": "openOnSessionStart",
102-
"skipFiles": [
103-
"<node_internals>/**"
104-
]
91+
"skipFiles": ["<node_internals>/**"]
10592
},
10693
{
10794
"type": "node",
@@ -115,10 +102,7 @@
115102
"edge_letsexchange"
116103
],
117104
"internalConsoleOptions": "openOnSessionStart",
118-
"skipFiles": [
119-
"<node_internals>/**"
120-
]
105+
"skipFiles": ["<node_internals>/**"]
121106
}
122-
123107
]
124108
}

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"Bitrefill",
55
"Bity",
66
"Changelly",
7+
"Clickhouse",
78
"Faast",
89
"godex",
910
"Kado",

package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
"prepare": "./scripts/prepare.sh && npm-run-all clean configure -p build.*",
2121
"start": "node -r sucrase/register src/indexQuery.ts",
2222
"start:cache": "node -r sucrase/register src/indexCache.ts",
23+
"start:clickhouse": "node -r sucrase/register src/indexClickhouse.ts",
24+
"start:clickhouse:inspect": "node --inspect-brk -r sucrase/register src/indexClickhouse.ts",
2325
"start:rates": "node -r sucrase/register src/indexRates.ts",
2426
"start:api": "node -r sucrase/register src/indexApi.ts",
2527
"start:destroyPartition": "node -r sucrase/register src/bin/destroyPartition.ts",
@@ -39,7 +41,8 @@
3941
"*.{js,jsx,ts,tsx}": "eslint"
4042
},
4143
"dependencies": {
42-
"@types/node": "^14.0.22",
44+
"@clickhouse/client": "^1.10.1",
45+
"@types/node": "^20.17.17",
4346
"api-changelly": "git://github.com/changelly/api-changelly.git#8e350f3",
4447
"axios": "^0.21.2",
4548
"biggystring": "^4.1.3",

src/clickhouseEngine.ts

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import { createClient } from '@clickhouse/client'
2+
import { asDate, asObject, asOptional, asString, uncleaner } from 'cleaners'
3+
import nano from 'nano'
4+
5+
import { config } from './config'
6+
import { initDbs } from './initDbs'
7+
import { asStandardTx, DbTx, StandardTx, wasDbTx } from './types'
8+
import { datelog, snooze } from './util'
9+
10+
// Clickhouse recommends large batch inserts. We consider the couchdb
11+
// query size as well.
12+
const PAGE_SIZE = 10_000
13+
14+
const nanoDb = nano(config.couchDbFullpath)
15+
const clickhouseDb = createClient(config.clickhouseConnection)
16+
17+
const progressDocName = 'clickhouse:clickhouseEngine'
18+
const asClickhouseProgress = asObject({
19+
_id: asOptional(asString, progressDocName),
20+
_rev: asOptional(asString),
21+
afterTime: asOptional(asDate, new Date(0))
22+
})
23+
const wasClickhouseProgress = uncleaner(asClickhouseProgress)
24+
25+
export async function clickhouseEngine(): Promise<void> {
26+
await initDbs()
27+
await initClickhouseDb()
28+
29+
const dbTransactions = nanoDb.db.use<StandardTx>('reports_transactions')
30+
31+
const dbProgress = nanoDb.db.use('reports_progresscache')
32+
const out = await dbProgress.get(progressDocName).catch(error => {
33+
if (error.error != null && error.error === 'not_found') {
34+
datelog(`Previous Progress Record Not Found ${progressDocName}`)
35+
return {}
36+
}
37+
throw error
38+
})
39+
const progressDoc = asClickhouseProgress(out)
40+
41+
let bookmark: string | undefined
42+
43+
while (true) {
44+
const response = await dbTransactions.find({
45+
selector: {
46+
status: { $eq: 'complete' },
47+
updateTime: { $gt: progressDoc.afterTime.toISOString() }
48+
},
49+
sort: [{ updateTime: 'asc' }],
50+
use_index: 'status-updatetime',
51+
limit: PAGE_SIZE,
52+
bookmark
53+
})
54+
55+
bookmark = response.bookmark
56+
57+
const startDocId = response.docs[0]?._id
58+
const endDocId = response.docs[response.docs.length - 1]?._id
59+
60+
if (response.docs.length > 0) {
61+
datelog(
62+
`Processing ${response.docs.length} rows from ${startDocId} to ${endDocId}.`
63+
)
64+
} else {
65+
datelog(
66+
`Queried for new documents after ${progressDoc.afterTime.toISOString()}.`
67+
)
68+
}
69+
70+
const newDocs: DbTx[] = []
71+
const newRows: any[][] = []
72+
let lastDocUpdateTime: string | undefined
73+
74+
for (const doc of response.docs) {
75+
const { appId, partnerId } = getDocumentIdentifiers(doc._id)
76+
77+
const standardTx = asStandardTx(doc)
78+
79+
newRows.push([
80+
appId,
81+
partnerId,
82+
standardTx.orderId,
83+
standardTx.countryCode,
84+
standardTx.depositTxid,
85+
standardTx.depositAddress,
86+
standardTx.depositCurrency,
87+
standardTx.depositAmount,
88+
standardTx.direction,
89+
standardTx.exchangeType,
90+
standardTx.paymentType,
91+
standardTx.payoutTxid,
92+
standardTx.payoutAddress,
93+
standardTx.payoutCurrency,
94+
standardTx.payoutAmount,
95+
standardTx.status,
96+
Math.round(standardTx.timestamp),
97+
standardTx.usdValue,
98+
config.clickhouseIndexVersion
99+
])
100+
101+
newDocs.push(
102+
wasDbTx({
103+
_id: doc._id,
104+
_rev: doc._rev,
105+
...standardTx
106+
})
107+
)
108+
109+
const txUpdateTime = standardTx.updateTime.toISOString()
110+
if (lastDocUpdateTime == null || lastDocUpdateTime < txUpdateTime) {
111+
lastDocUpdateTime = txUpdateTime
112+
}
113+
}
114+
115+
// Add the standardTx to the clickhouse database
116+
await clickhouseDb.insert({
117+
table: 'reports_transactions',
118+
columns: [
119+
'appId',
120+
'partnerId',
121+
'orderId',
122+
'countryCode',
123+
'depositTxid',
124+
'depositAddress',
125+
'depositCurrency',
126+
'depositAmount',
127+
'direction',
128+
'exchangeType',
129+
'paymentType',
130+
'payoutTxid',
131+
'payoutAddress',
132+
'payoutCurrency',
133+
'payoutAmount',
134+
'status',
135+
'timestamp',
136+
'usdValue',
137+
'indexVersion'
138+
],
139+
values: newRows
140+
})
141+
// Update all documents processed
142+
await dbTransactions.bulk({ docs: newDocs })
143+
144+
// We've reached the end of the view index, so we'll continue but with a
145+
// delay so as not to thrash the couchdb unnecessarily.
146+
if (response.docs.length !== PAGE_SIZE) {
147+
bookmark = undefined
148+
if (lastDocUpdateTime != null) {
149+
progressDoc.afterTime = new Date(lastDocUpdateTime)
150+
await dbProgress.insert(wasClickhouseProgress(progressDoc))
151+
}
152+
await snooze(5000)
153+
}
154+
}
155+
}
156+
157+
function getDocumentIdentifiers(
158+
documentId: string
159+
): { appId: string; partnerId: string } {
160+
const parts = documentId.split(':')[0].split('_')
161+
if (parts.length === 0) {
162+
throw new Error(`Invalid documentId ${documentId}`)
163+
}
164+
const partnerId = parts.pop() as string
165+
const appId = parts.join('_')
166+
return { appId, partnerId }
167+
}
168+
169+
const REPORTS_TRANSACTIONS_SCHEMA = `\
170+
CREATE TABLE default.reports_transactions
171+
(
172+
\`partnerId\` String,
173+
\`appId\` String,
174+
\`orderId\` String,
175+
\`countryCode\` String,
176+
\`depositTxid\` String,
177+
\`depositAddress\` String,
178+
\`depositCurrency\` String,
179+
\`depositAmount\` Float64,
180+
\`direction\` String,
181+
\`exchangeType\` String,
182+
\`paymentType\` String,
183+
\`payoutTxid\` String,
184+
\`payoutAddress\` String,
185+
\`payoutCurrency\` String,
186+
\`payoutAmount\` Float64,
187+
\`status\` String,
188+
\`timestamp\` DateTime DEFAULT now(),
189+
\`usdValue\` Float64,
190+
\`indexVersion\` UInt16
191+
)
192+
ENGINE = ReplacingMergeTree
193+
PRIMARY KEY (appId, partnerId, orderId)
194+
ORDER BY (appId, partnerId, orderId, timestamp)
195+
SETTINGS index_granularity = 8192`
196+
197+
async function initClickhouseDb(): Promise<void> {
198+
// Check if the table exists
199+
const tableExists = await clickhouseDb.query({
200+
query: `
201+
SELECT 1
202+
FROM system.tables
203+
WHERE database = 'default' AND name = 'reports_transactions'
204+
LIMIT 1;
205+
`,
206+
format: 'JSONEachRow'
207+
})
208+
209+
const result = await tableExists.json()
210+
if (result.length > 0) {
211+
const response = await clickhouseDb.query({
212+
query: 'SHOW CREATE reports_transactions'
213+
})
214+
const result = await response.json()
215+
const tableSchema = (result.data[0] as any).statement
216+
217+
if (tableSchema !== REPORTS_TRANSACTIONS_SCHEMA) {
218+
console.log(tableSchema)
219+
throw new Error('Table "reports_transactions" schema does not match.')
220+
}
221+
222+
datelog('Table "reports_transactions" exists.')
223+
return
224+
}
225+
226+
// Create the table
227+
await clickhouseDb.command({
228+
query: REPORTS_TRANSACTIONS_SCHEMA
229+
})
230+
datelog('Table "reports_transactions" has been created.')
231+
}

0 commit comments

Comments
 (0)