Skip to content

Commit b76bfa0

Browse files
committed
Add migrate CLI
1 parent 19c84a9 commit b76bfa0

File tree

4 files changed

+342
-4
lines changed

4 files changed

+342
-4
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
"start:rates": "node -r sucrase/register src/indexRates.ts",
2424
"start:api": "node -r sucrase/register src/indexApi.ts",
2525
"start:destroyPartition": "node -r sucrase/register src/bin/destroyPartition.ts",
26+
"start:migrate": "node -r sucrase/register src/bin/migrate.ts",
27+
"start:migrate:inspect": "node --inspect-brk -r sucrase/register src/bin/migrate.ts",
2628
"stats": "node -r sucrase/register src/bin/partitionStats.ts",
2729
"test": "mocha -r sucrase/register 'test/**/*.test.ts'",
2830
"demo": "parcel serve src/demo/index.html",

src/bin/migrate.ts

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
import {
2+
asDate,
3+
asJSON,
4+
asObject,
5+
asOptional,
6+
asString,
7+
uncleaner
8+
} from 'cleaners'
9+
import fs from 'fs'
10+
import nano from 'nano'
11+
import path from 'path'
12+
13+
import { config } from '../config'
14+
import { processBanxaTx } from '../partners/banxa'
15+
import { processBitaccessTx } from '../partners/bitaccess'
16+
import { processBitrefillTx } from '../partners/bitrefill'
17+
import { processBitsOfGoldTx } from '../partners/bitsofgold'
18+
import { processBityTx } from '../partners/bity'
19+
import { processChangeHeroTx } from '../partners/changehero'
20+
import { processChangellyTx } from '../partners/changelly'
21+
import { processChangeNowTx } from '../partners/changenow'
22+
import { processCoinSwitchTx } from '../partners/coinswitch'
23+
import { processExolixTx } from '../partners/exolix'
24+
import { processFaastTx } from '../partners/faast'
25+
import { processFoxExchangeTx } from '../partners/foxExchange'
26+
import { processGodexTx } from '../partners/godex'
27+
import { processIoniaGiftCardsTx } from '../partners/ioniagiftcard'
28+
import { processIoniaVisaRewardsTx } from '../partners/ioniavisarewards'
29+
import { processKadoTx } from '../partners/kado'
30+
import { processLetsExchangeTx } from '../partners/letsexchange'
31+
import { processLibertyxTx } from '../partners/libertyx'
32+
import { processLifiTx } from '../partners/lifi'
33+
import { processMoonpayTx } from '../partners/moonpay'
34+
import { processPaybisTx } from '../partners/paybis'
35+
import { processPaytrieTx } from '../partners/paytrie'
36+
import { processSafelloTx } from '../partners/safello'
37+
import { processShapeshiftTx } from '../partners/shapeshift'
38+
import { processSideshiftTx } from '../partners/sideshift'
39+
import { processSimplexTx } from '../partners/simplex'
40+
import { processSwapuzTx } from '../partners/swapuz'
41+
import { processSwitchainTx } from '../partners/switchain'
42+
import { processTransakTx } from '../partners/transak'
43+
import { processWyreTx } from '../partners/wyre'
44+
import { processXanpoolTx } from '../partners/xanpool'
45+
import { DbTx, StandardTx, wasDbTx } from '../types'
46+
import { datelog } from '../util'
47+
48+
const processors: {
49+
[partnerId: string]: undefined | ((rawTx: unknown) => StandardTx)
50+
} = {
51+
banxa: processBanxaTx,
52+
bitaccess: processBitaccessTx,
53+
bitrefill: processBitrefillTx,
54+
bitsofgold: processBitsOfGoldTx,
55+
bity: processBityTx,
56+
changehero: processChangeHeroTx,
57+
changelly: processChangellyTx,
58+
changenow: processChangeNowTx,
59+
coinswitch: processCoinSwitchTx,
60+
exolix: processExolixTx,
61+
faast: processFaastTx,
62+
foxExchange: processFoxExchangeTx,
63+
gebo: undefined,
64+
godex: processGodexTx,
65+
ioniagiftcards: processIoniaGiftCardsTx,
66+
ioniavisarewards: processIoniaVisaRewardsTx,
67+
kado: processKadoTx,
68+
letsexchange: processLetsExchangeTx,
69+
libertyx: processLibertyxTx,
70+
lifi: processLifiTx,
71+
moonpay: processMoonpayTx,
72+
paybis: processPaybisTx,
73+
paytrie: processPaytrieTx,
74+
safello: processSafelloTx,
75+
shapeshift: processShapeshiftTx,
76+
sideshift: processSideshiftTx,
77+
simplex: processSimplexTx,
78+
swapuz: processSwapuzTx,
79+
switchain: processSwitchainTx,
80+
// thorchain: processThorchainTx,
81+
totle: undefined,
82+
transak: processTransakTx,
83+
wyre: processWyreTx,
84+
xanpool: processXanpoolTx
85+
}
86+
87+
type MigrationState = ReturnType<typeof asMigrationState>
88+
const asMigrationState = asJSON(
89+
asObject({
90+
bookmark: asOptional(asString)
91+
})
92+
)
93+
const wasMigrationState = uncleaner(asMigrationState)
94+
95+
const MIGRATION_STATE_FILE = './cache/migrationState.json'
96+
const PAGE_SIZE = 1000
97+
98+
const nanoDb = nano(config.couchDbFullpath)
99+
100+
// Ensure migration state file directory exists.
101+
fs.mkdirSync(path.dirname(MIGRATION_STATE_FILE), { recursive: true })
102+
103+
migration().catch(e => {
104+
datelog(e)
105+
})
106+
107+
async function migration(): Promise<void> {
108+
// Args:
109+
const shouldInitialize = process.argv.includes('--init')
110+
const shouldStartOver = process.argv.includes('--start-over')
111+
112+
const reportsTransactions = nanoDb.use<StandardTx>('reports_transactions')
113+
114+
// Initialize migration state file:
115+
if (!fs.existsSync(MIGRATION_STATE_FILE)) {
116+
saveMigrationState({
117+
bookmark: undefined
118+
})
119+
}
120+
const migrationState = readMigrationState()
121+
122+
if (shouldStartOver) {
123+
migrationState.bookmark = undefined
124+
saveMigrationState(migrationState)
125+
}
126+
127+
// Migrate all transactions that do not have a createTime field.
128+
if (shouldInitialize) {
129+
console.log('Initializing documents...')
130+
while (true) {
131+
const response = await reportsTransactions.find({
132+
selector: {
133+
status: { $eq: 'complete' },
134+
createTime: { $exists: false }
135+
},
136+
use_index: 'status-createtime',
137+
limit: PAGE_SIZE
138+
})
139+
if (response.docs.length === 0) {
140+
break
141+
}
142+
const newDocs = await initializeDocument(response.docs)
143+
console.log(
144+
`Initialized ${newDocs.length} documents after ${response.docs[0]._id}`
145+
)
146+
await reportsTransactions.bulk({ docs: newDocs })
147+
}
148+
console.log('Initial migration complete.')
149+
}
150+
151+
while (true) {
152+
const response = await reportsTransactions.find({
153+
selector: {
154+
status: { $eq: 'complete' }
155+
},
156+
sort: [{ status: 'asc' }, { createTime: 'asc' }],
157+
use_index: 'status-createtime',
158+
limit: PAGE_SIZE,
159+
bookmark: migrationState.bookmark
160+
})
161+
if (response.docs.length === 0) {
162+
break
163+
}
164+
const newDocs = await updateDocument(response.docs)
165+
console.log(
166+
`Updated ${newDocs.length} documents after ${response.docs[0]._id}`
167+
)
168+
await reportsTransactions.bulk({ docs: newDocs })
169+
// Update migration state:
170+
migrationState.bookmark = response.bookmark
171+
saveMigrationState(migrationState)
172+
}
173+
console.log('Migration complete.')
174+
175+
migrationState.bookmark = undefined
176+
saveMigrationState(migrationState)
177+
}
178+
179+
function readMigrationState(): MigrationState {
180+
const migrationStateFileContent = fs.readFileSync(MIGRATION_STATE_FILE, {
181+
encoding: 'utf8'
182+
})
183+
return asMigrationState(migrationStateFileContent)
184+
}
185+
186+
function saveMigrationState(migrationState: MigrationState): void {
187+
fs.writeFileSync(MIGRATION_STATE_FILE, wasMigrationState(migrationState), {
188+
encoding: 'utf8'
189+
})
190+
}
191+
192+
function getDocumentIdentifiers(
193+
documentId: string
194+
): { appId: string; partnerId: string } {
195+
const parts = documentId.split(':')[0].split('_')
196+
if (parts.length === 0) {
197+
throw new Error(`Invalid documentId ${documentId}`)
198+
}
199+
const partnerId = parts.pop() as string
200+
const appId = parts.join('_')
201+
return { appId, partnerId }
202+
}
203+
204+
/**
205+
* Adds the createTime field to documents that are missing it.
206+
* This is a required field for migration processing.
207+
*/
208+
async function initializeDocument(
209+
docs: Array<StandardTx & nano.Document>
210+
): Promise<DbTx[]> {
211+
const newDocs: DbTx[] = []
212+
for (const doc of docs) {
213+
const { partnerId } = getDocumentIdentifiers(doc._id)
214+
const processor = processors[partnerId]
215+
216+
if (processor == null) {
217+
// Add createTime to document for minimal requirement for
218+
// migration/processing.
219+
newDocs.push({ ...doc, createTime: new Date() })
220+
datelog(`Not found ${partnerId} for document ${doc._id}`)
221+
continue
222+
}
223+
224+
if (doc.rawTx == null) {
225+
// Add createTime to document for minimal requirement for
226+
// migration/processing.
227+
newDocs.push({ ...doc, createTime: new Date() })
228+
datelog(`Missing rawTx for document ${doc._id}`)
229+
continue
230+
}
231+
232+
let standardTx
233+
try {
234+
standardTx = processor(doc.rawTx)
235+
} catch (error) {
236+
// Add createTime to document for minimal requirement for
237+
// migration/processing.
238+
newDocs.push({ ...doc, createTime: new Date() })
239+
datelog(`Error processing ${doc._id}`, error)
240+
continue
241+
}
242+
243+
newDocs.push(
244+
wasDbTx({
245+
_id: doc._id,
246+
_rev: doc._rev,
247+
...standardTx
248+
})
249+
)
250+
}
251+
252+
return newDocs
253+
}
254+
255+
/**
256+
* Updates the documents with any new fields using its processor function.
257+
*/
258+
async function updateDocument(
259+
docs: Array<StandardTx & nano.Document>
260+
): Promise<DbTx[]> {
261+
const newDocs: DbTx[] = []
262+
for (const doc of docs) {
263+
const { partnerId } = getDocumentIdentifiers(doc._id)
264+
const processor = processors[partnerId]
265+
266+
if (processor == null) {
267+
datelog(`Not found ${partnerId} for document ${doc._id}`)
268+
continue
269+
}
270+
271+
if (doc.rawTx == null) {
272+
datelog(`Missing rawTx for document ${doc._id}`)
273+
continue
274+
}
275+
276+
let standardTx
277+
try {
278+
standardTx = processor(doc.rawTx)
279+
} catch (error) {
280+
datelog(`Error processing ${doc._id}`, error)
281+
continue
282+
}
283+
284+
newDocs.push(
285+
wasDbTx({
286+
_id: doc._id,
287+
_rev: doc._rev,
288+
createTime: asDate(doc.createTime),
289+
...standardTx
290+
})
291+
)
292+
}
293+
294+
return newDocs
295+
}

src/initDbs.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const transactionIndexes: DesignDocumentMap = {
3838
...fieldsToDesign(['status', 'isoDate']),
3939
...fieldsToDesign(['status', 'payoutAmount', 'depositAmount']),
4040
...fieldsToDesign(['status', 'payoutCurrency', 'isoDate']),
41+
...fieldsToDesign(['status', 'createTime']),
4142
...fieldsToDesign(['status', 'usdValue']),
4243
...fieldsToDesign(['status', 'usdValue', 'timestamp']),
4344
...fieldsToDesign(['usdValue']),

0 commit comments

Comments
 (0)