Skip to content

Commit 167301d

Browse files
committed
Add clickhouseEngine
1 parent b76bfa0 commit 167301d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+334
-58
lines changed

.vscode/launch.json

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,74 +4,67 @@
44
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
55
"version": "0.2.0",
66
"configurations": [
7+
{
8+
"type": "node",
9+
"request": "attach",
10+
"name": "Attach to Node.js",
11+
"address": "127.0.0.1",
12+
"port": 9229,
13+
"restart": true,
14+
"timeout": 30000
15+
},
716
{
817
"type": "node",
918
"request": "launch",
1019
"name": "indexQuery",
1120
"args": [
1221
"-r",
1322
"sucrase/register",
14-
"${workspaceFolder}/src/indexQuery.ts",
15-
23+
"${workspaceFolder}/src/indexQuery.ts"
1624
],
1725
"internalConsoleOptions": "openOnSessionStart",
18-
"skipFiles": [
19-
"<node_internals>/**"
20-
]
26+
"skipFiles": ["<node_internals>/**"]
2127
},
2228
{
2329
"type": "node",
2430
"request": "launch",
2531
"name": "indexRates",
26-
"args":[
32+
"args": [
2733
"-r",
2834
"sucrase/register",
29-
"${workspaceFolder}/src/indexRates.ts",
35+
"${workspaceFolder}/src/indexRates.ts"
3036
],
31-
"skipFiles": [
32-
"<node_internals>/**"
33-
]
37+
"skipFiles": ["<node_internals>/**"]
3438
},
3539
{
3640
"type": "node",
3741
"request": "launch",
3842
"name": "indexApi",
39-
"args": [
40-
"-r",
41-
"sucrase/register",
42-
"${workspaceFolder}/src/indexApi.ts",
43-
44-
],
43+
"args": ["-r", "sucrase/register", "${workspaceFolder}/src/indexApi.ts"],
4544
"internalConsoleOptions": "openOnSessionStart",
46-
"skipFiles": [
47-
"<node_internals>/**"
48-
]
45+
"skipFiles": ["<node_internals>/**"]
4946
},
5047
{
5148
"type": "node",
5249
"request": "launch",
5350
"name": "indexCache",
54-
"args":[
51+
"args": [
5552
"-r",
5653
"sucrase/register",
57-
"${workspaceFolder}/src/indexCache.ts",
54+
"${workspaceFolder}/src/indexCache.ts"
5855
],
59-
"skipFiles": [
60-
"<node_internals>/**"
61-
]
56+
"skipFiles": ["<node_internals>/**"]
6257
},
6358
{
6459
"type": "node",
6560
"request": "launch",
6661
"name": "lifiReporter",
67-
"args":[
62+
"args": [
6863
"-r",
6964
"sucrase/register",
70-
"${workspaceFolder}/src/bin/lifiReporter.ts",
65+
"${workspaceFolder}/src/bin/lifiReporter.ts"
7166
],
72-
"skipFiles": [
73-
"<node_internals>/**"
74-
]
67+
"skipFiles": ["<node_internals>/**"]
7568
},
7669
{
7770
"type": "node",
@@ -80,13 +73,10 @@
8073
"args": [
8174
"-r",
8275
"sucrase/register",
83-
"${workspaceFolder}/src/bin/testpartner.ts",
84-
76+
"${workspaceFolder}/src/bin/testpartner.ts"
8577
],
8678
"internalConsoleOptions": "openOnSessionStart",
87-
"skipFiles": [
88-
"<node_internals>/**"
89-
]
79+
"skipFiles": ["<node_internals>/**"]
9080
},
9181
{
9282
"type": "node",
@@ -95,13 +85,10 @@
9585
"args": [
9686
"-r",
9787
"sucrase/register",
98-
"${workspaceFolder}/src/bin/partnerTotals.ts",
99-
88+
"${workspaceFolder}/src/bin/partnerTotals.ts"
10089
],
10190
"internalConsoleOptions": "openOnSessionStart",
102-
"skipFiles": [
103-
"<node_internals>/**"
104-
]
91+
"skipFiles": ["<node_internals>/**"]
10592
},
10693
{
10794
"type": "node",
@@ -115,10 +102,7 @@
115102
"edge_letsexchange"
116103
],
117104
"internalConsoleOptions": "openOnSessionStart",
118-
"skipFiles": [
119-
"<node_internals>/**"
120-
]
105+
"skipFiles": ["<node_internals>/**"]
121106
}
122-
123107
]
124108
}

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"Bitrefill",
55
"Bity",
66
"Changelly",
7+
"Clickhouse",
78
"Faast",
89
"godex",
910
"Kado",

package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
"prepare": "./scripts/prepare.sh && npm-run-all clean configure -p build.*",
2121
"start": "node -r sucrase/register src/indexQuery.ts",
2222
"start:cache": "node -r sucrase/register src/indexCache.ts",
23+
"start:clickhouse": "node -r sucrase/register src/indexClickhouse.ts",
24+
"start:clickhouse:inspect": "node --inspect-brk -r sucrase/register src/indexClickhouse.ts",
2325
"start:rates": "node -r sucrase/register src/indexRates.ts",
2426
"start:api": "node -r sucrase/register src/indexApi.ts",
2527
"start:destroyPartition": "node -r sucrase/register src/bin/destroyPartition.ts",
@@ -39,7 +41,8 @@
3941
"*.{js,jsx,ts,tsx}": "eslint"
4042
},
4143
"dependencies": {
42-
"@types/node": "^14.0.22",
44+
"@clickhouse/client": "^1.10.1",
45+
"@types/node": "^20.17.17",
4346
"api-changelly": "git://github.com/changelly/api-changelly.git#8e350f3",
4447
"axios": "^0.21.2",
4548
"biggystring": "^4.1.3",

src/clickhouseEngine.ts

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
import { createClient } from '@clickhouse/client'
2+
import nano from 'nano'
3+
4+
import { config } from './config'
5+
import { initDbs } from './initDbs'
6+
import { asStandardTx, DbTx, StandardTx, wasDbTx } from './types'
7+
import { datelog, snooze } from './util'
8+
9+
// Clickhouse recommends large batch inserts. We consider the couchdb
10+
// query size as well.
11+
const PAGE_SIZE = 10_000
12+
13+
const nanoDb = nano(config.couchDbFullpath)
14+
const clickhouseDb = createClient(config.clickhouseConnection)
15+
16+
export async function clickhouseEngine(): Promise<void> {
17+
await initDbs()
18+
await initClickhouseDb()
19+
20+
const dbTransactions = nanoDb.db.use<StandardTx>('reports_transactions')
21+
22+
let afterTime = new Date(0).toISOString()
23+
let bookmark: string | undefined
24+
25+
while (true) {
26+
const response = await dbTransactions.find({
27+
selector: {
28+
status: { $eq: 'complete' },
29+
updateTime: { $gt: afterTime }
30+
},
31+
sort: [{ updateTime: 'asc' }],
32+
use_index: 'status-updatetime',
33+
limit: PAGE_SIZE,
34+
bookmark
35+
})
36+
37+
bookmark = response.bookmark
38+
39+
const startDocId = response.docs[0]?._id
40+
const endDocId = response.docs[response.docs.length - 1]?._id
41+
42+
if (response.docs.length > 0) {
43+
datelog(
44+
`Processing ${response.docs.length} rows from ${startDocId} to ${endDocId}.`
45+
)
46+
} else {
47+
datelog(`Queried for new documents after ${afterTime}.`)
48+
}
49+
50+
const newDocs: DbTx[] = []
51+
const newRows: any[][] = []
52+
let lastDocUpdateTime: string | undefined
53+
54+
for (const doc of response.docs) {
55+
const { appId, partnerId } = getDocumentIdentifiers(doc._id)
56+
57+
const standardTx = asStandardTx(doc)
58+
59+
newRows.push([
60+
appId,
61+
partnerId,
62+
standardTx.orderId,
63+
standardTx.countryCode,
64+
standardTx.depositTxid,
65+
standardTx.depositAddress,
66+
standardTx.depositCurrency,
67+
standardTx.depositAmount,
68+
standardTx.direction,
69+
standardTx.exchangeType,
70+
standardTx.paymentType,
71+
standardTx.payoutTxid,
72+
standardTx.payoutAddress,
73+
standardTx.payoutCurrency,
74+
standardTx.payoutAmount,
75+
standardTx.status,
76+
Math.round(standardTx.timestamp),
77+
standardTx.usdValue,
78+
config.clickhouseIndexVersion
79+
])
80+
81+
newDocs.push(
82+
wasDbTx({
83+
_id: doc._id,
84+
_rev: doc._rev,
85+
...standardTx
86+
})
87+
)
88+
89+
lastDocUpdateTime = standardTx.updateTime.toISOString()
90+
}
91+
92+
// Add the standardTx to the clickhouse database
93+
await clickhouseDb.insert({
94+
table: 'reports_transactions',
95+
columns: [
96+
'appId',
97+
'partnerId',
98+
'orderId',
99+
'countryCode',
100+
'depositTxid',
101+
'depositAddress',
102+
'depositCurrency',
103+
'depositAmount',
104+
'direction',
105+
'exchangeType',
106+
'paymentType',
107+
'payoutTxid',
108+
'payoutAddress',
109+
'payoutCurrency',
110+
'payoutAmount',
111+
'status',
112+
'timestamp',
113+
'usdValue',
114+
'indexVersion'
115+
],
116+
values: newRows
117+
})
118+
// Update all documents processed
119+
await dbTransactions.bulk({ docs: newDocs })
120+
121+
// We've reached the end of the view index, so we'll continue but with a
122+
// delay so as not to thrash the couchdb unnecessarily.
123+
if (response.docs.length !== PAGE_SIZE) {
124+
bookmark = undefined
125+
if (lastDocUpdateTime != null) {
126+
afterTime = lastDocUpdateTime
127+
}
128+
await snooze(5000)
129+
}
130+
}
131+
}
132+
133+
function getDocumentIdentifiers(
134+
documentId: string
135+
): { appId: string; partnerId: string } {
136+
const parts = documentId.split(':')[0].split('_')
137+
if (parts.length === 0) {
138+
throw new Error(`Invalid documentId ${documentId}`)
139+
}
140+
const partnerId = parts.pop() as string
141+
const appId = parts.join('_')
142+
return { appId, partnerId }
143+
}
144+
145+
const REPORTS_TRANSACTIONS_SCHEMA = `\
146+
CREATE TABLE default.reports_transactions
147+
(
148+
\`partnerId\` String,
149+
\`appId\` String,
150+
\`orderId\` String,
151+
\`countryCode\` String,
152+
\`depositTxid\` String,
153+
\`depositAddress\` String,
154+
\`depositCurrency\` String,
155+
\`depositAmount\` Float64,
156+
\`direction\` String,
157+
\`exchangeType\` String,
158+
\`paymentType\` String,
159+
\`payoutTxid\` String,
160+
\`payoutAddress\` String,
161+
\`payoutCurrency\` String,
162+
\`payoutAmount\` Float64,
163+
\`status\` String,
164+
\`timestamp\` DateTime DEFAULT now(),
165+
\`usdValue\` Float64,
166+
\`indexVersion\` UInt16
167+
)
168+
ENGINE = ReplacingMergeTree
169+
PRIMARY KEY (appId, partnerId, orderId)
170+
ORDER BY (appId, partnerId, orderId, timestamp)
171+
SETTINGS index_granularity = 8192`
172+
173+
async function initClickhouseDb(): Promise<void> {
174+
// Check if the table exists
175+
const tableExists = await clickhouseDb.query({
176+
query: `
177+
SELECT 1
178+
FROM system.tables
179+
WHERE database = 'default' AND name = 'reports_transactions'
180+
LIMIT 1;
181+
`,
182+
format: 'JSONEachRow'
183+
})
184+
185+
const result = await tableExists.json()
186+
if (result.length > 0) {
187+
const response = await clickhouseDb.query({
188+
query: 'SHOW CREATE reports_transactions'
189+
})
190+
const result = await response.json()
191+
const tableSchema = (result.data[0] as any).statement
192+
193+
if (tableSchema !== REPORTS_TRANSACTIONS_SCHEMA) {
194+
console.log(tableSchema)
195+
throw new Error('Table "reports_transactions" schema does not match.')
196+
}
197+
198+
datelog('Table "reports_transactions" exists.')
199+
return
200+
}
201+
202+
// Create the table
203+
await clickhouseDb.command({
204+
query: REPORTS_TRANSACTIONS_SCHEMA
205+
})
206+
datelog('Table "reports_transactions" has been created.')
207+
}

src/config.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,17 @@ export const asConfig = asObject({
66
asString,
77
'http://username:password@localhost:5984'
88
),
9+
clickhouseConnection: asOptional(
10+
asObject({
11+
url: asString,
12+
password: asString
13+
}),
14+
{
15+
url: 'http://localhost:8123',
16+
password: ''
17+
}
18+
),
19+
clickhouseIndexVersion: asOptional(asNumber, 1),
920
httpPort: asOptional(asNumber, 8008),
1021
bog: asOptional(asObject({ apiKey: asString }), { apiKey: '' }),
1122

src/indexClickhouse.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import { clickhouseEngine } from './clickhouseEngine'
2+
3+
clickhouseEngine().catch(e => console.log(e))

0 commit comments

Comments
 (0)