Skip to content

Commit ecbec94

Browse files
committed
feat(mu): add push msg to hb route
1 parent 775db5b commit ecbec94

File tree

4 files changed

+141
-1
lines changed

4 files changed

+141
-1
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { Rejected, Resolved, fromPromise, of } from 'hyper-async'
2+
3+
import { getCuAddressWith } from '../lib/get-cu-address.js'
4+
import { pullResultWith } from '../lib/pull-result.js'
5+
6+
export function pushResultToHbWith ({
7+
selectNode,
8+
fetchResult,
9+
buildAndSign,
10+
logger,
11+
HB_GRAPHQL_URL,
12+
fetch
13+
}) {
14+
const getCuAddress = getCuAddressWith({ selectNode, logger })
15+
const pullResult = pullResultWith({ fetchResult, logger })
16+
const buildAndSignAsync = fromPromise(buildAndSign)
17+
18+
const uploadToHb = async ({ signedDataItem, processId, messageId, logId }) => {
19+
const url = `${HB_GRAPHQL_URL}/id?codec-device=ans104@1.0`
20+
logger({ log: `[pushResultToHb] Uploading signed data item to HB: ${url} processId=${processId} messageId=${messageId} logId=${logId}` })
21+
const res = await fetch(url, {
22+
method: 'POST',
23+
headers: { 'Content-Type': 'application/octet-stream' },
24+
body: signedDataItem
25+
})
26+
const text = await res.text()
27+
if (!res.ok) {
28+
throw new Error(`[pushResultToHb] HB upload failed: ${res.status} ${text}`)
29+
}
30+
logger({ log: `[pushResultToHb] HB upload succeeded: ${text}` })
31+
return text
32+
}
33+
34+
const uploadToHbAsync = fromPromise(uploadToHb)
35+
36+
return (ctx) => {
37+
return of(ctx)
38+
.chain(getCuAddress)
39+
.chain(pullResult)
40+
.chain((res) => {
41+
const { msgs, number } = res
42+
if (msgs.length <= number) {
43+
return Rejected(new Error('Message number does not exist in the result.', { cause: ctx }))
44+
}
45+
return Resolved(res)
46+
})
47+
.chain((res) => {
48+
const { msgs, number } = res
49+
const targetMsg = msgs[number].msg
50+
logger({ log: `[pushResultToHb] Building and signing result message ${number} for ${ctx.tx.id} -> target=${targetMsg.Target}` })
51+
console.dir({ targetMsg }, { depth: null })
52+
return buildAndSignAsync({
53+
processId: targetMsg.Target,
54+
tags: targetMsg.Tags,
55+
anchor: targetMsg.Anchor,
56+
data: targetMsg.Data
57+
}).chain((tx) => {
58+
logger({ log: `[pushResultToHb] Signed data item id=${tx.id}, uploading to HB` })
59+
return uploadToHbAsync({
60+
signedDataItem: tx.data,
61+
processId: ctx.tx.processId,
62+
messageId: ctx.tx.id,
63+
logId: ctx.logId
64+
}).map((hbRes) => ({ ...res, hbRes, txId: tx.id }))
65+
})
66+
})
67+
}
68+
}

servers/mu/src/domain/index.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { sendDataItemWith, startMessageRecoveryCronWith } from './api/sendDataIt
2929
import { sendAssignWith } from './api/sendAssign.js'
3030
import { processAssignWith } from './api/processAssign.js'
3131
import { pushMsgWith } from './api/pushMsg.js'
32+
import { pushResultToHbWith } from './api/pushResultToHb.js'
3233

3334
import { createLogger } from './logger.js'
3435
import { cuFetchWithCache } from './lib/cu-fetch-with-cache.js'
@@ -410,6 +411,16 @@ export const createApis = async (ctx) => {
410411

411412
const traceMsgs = fromPromise(readTracesWith({ db: traceDb, TRACE_DB_URL: ctx.TRACE_DB_URL, DISABLE_TRACE: ctx.DISABLE_TRACE }))
412413

414+
const pushResultToHbLogger = logger.child('pushResultToHb')
415+
const pushResultToHb = pushResultToHbWith({
416+
selectNode: cuClient.selectNodeWith({ CU_URL, logger: pushResultToHbLogger }),
417+
fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: pushResultToHbLogger }),
418+
buildAndSign: signerClient.buildAndSignWith({ MU_WALLET, logger: pushResultToHbLogger }),
419+
logger: pushResultToHbLogger,
420+
HB_GRAPHQL_URL,
421+
fetch
422+
})
423+
413424
const pushMsgItemLogger = logger.child('pushMsg')
414425
const pushMsg = pushMsgWith({
415426
selectNode: cuClient.selectNodeWith({ CU_URL, logger: sendDataItemLogger }),
@@ -447,6 +458,7 @@ export const createApis = async (ctx) => {
447458
sendAssign,
448459
fetchCron,
449460
pushMsg,
461+
pushResultToHb,
450462
traceMsgs,
451463
initCronProcs: cronClient.initCronProcsWith({
452464
startMonitoredProcess: startProcessMonitor,

servers/mu/src/routes/index.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import { withRootRoutes } from './root.js'
44
import { withMonitorRoutes } from './monitor.js'
55
import { withMetricRoutes } from './metrics.js'
66
import { withPushRoutes } from './push.js'
7+
import { withPushResultToHbRoutes } from './pushResultToHb.js'
78

89
export const withRoutes = pipe(
910
withMonitorRoutes,
1011
withRootRoutes,
1112
withMetricRoutes,
12-
withPushRoutes
13+
withPushRoutes,
14+
withPushResultToHbRoutes
1315
)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { always, compose, pipe } from 'ramda'
2+
import { of } from 'hyper-async'
3+
import { randomBytes } from 'node:crypto'
4+
5+
import { withMetrics, withMiddleware } from './middleware/index.js'
6+
7+
const withPushResultToHbRoute = (app) => {
8+
app.post(
9+
'/push-result/:id/:number',
10+
compose(
11+
withMiddleware,
12+
withMetrics(),
13+
always(async (req, res) => {
14+
const {
15+
logger: _logger,
16+
domain: { apis: { pushResultToHb } },
17+
params: { id, number },
18+
query: {
19+
'process-id': processId
20+
}
21+
} = req
22+
23+
const logger = _logger.child('POST_push_result_to_hb')
24+
const logId = randomBytes(8).toString('hex')
25+
26+
if (isNaN(Number(number))) {
27+
return res.status(400).send({ error: "'number' parameter must be a valid number" })
28+
}
29+
30+
await of({
31+
tx: { id, processId },
32+
number: Number(number),
33+
logId,
34+
messageId: id,
35+
initialTxId: id
36+
})
37+
.chain(pushResultToHb)
38+
.bimap(
39+
(e) => {
40+
logger({ log: `[push-result] Failed: ${e}`, end: true }, e.cause)
41+
res.status(500).send({ error: String(e) })
42+
},
43+
({ hbRes }) => {
44+
logger({ log: `[push-result] Success for ${id}/${number}`, end: true })
45+
res.status(200).send({ message: 'Result uploaded to HB', id, number: Number(number), hbRes })
46+
}
47+
)
48+
.toPromise()
49+
})
50+
)()
51+
)
52+
53+
return app
54+
}
55+
56+
export const withPushResultToHbRoutes = pipe(
57+
withPushResultToHbRoute
58+
)

0 commit comments

Comments
 (0)