Skip to content

Commit 45448fc

Browse files
committed
feat(cu): mem reset in eval
1 parent 1f07fb8 commit 45448fc

File tree

2 files changed

+87
-10
lines changed

2 files changed

+87
-10
lines changed

servers/cu/src/effects/worker/evaluate.js

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ export function evaluateWith ({
1212
* is passed in. Eventually remove usage and injection
1313
*/
1414
loadWasmModule,
15+
locateScheduler,
1516
wasmInstanceCache,
1617
bootstrapWasmInstance,
1718
saveEvaluation,
1819
addExtension,
1920
ARWEAVE_URL,
21+
ENABLE_MEMORY_RESET,
2022
logger
2123
}) {
2224
loadWasmModule = fromPromise(loadWasmModule)
@@ -94,7 +96,9 @@ export function evaluateWith ({
9496
*/
9597
Memory: ifElse(
9698
pathOr(undefined, ['Error']),
97-
always(prevMemory),
99+
(res) => {
100+
return prevMemory
101+
},
98102
(res) => {
99103
const output = cond([
100104
[is(String), identity],
@@ -128,7 +132,6 @@ export function evaluateWith ({
128132
Messages: pathOr([], ['Messages']),
129133
Assignments: pathOr([], ['Assignments']),
130134
Spawns: pathOr([], ['Spawns']),
131-
Patches: pathOr([], ['Patches']),
132135
Output: pipe(
133136
pathOr('', ['Output']),
134137
/**
@@ -146,6 +149,48 @@ export function evaluateWith ({
146149
})
147150
)
148151

152+
async function evalInitialMemory ({ wasmInstance, AoGlobal }) {
153+
logger('Initializing fresh memory for process "%s"', AoGlobal.Process.Id)
154+
155+
const scheduler = await locateScheduler(
156+
AoGlobal.Process.Id,
157+
'_GQ33BkPtZrqxA84vM8Zk-N2aO0toNNu_C-l-rawrBA'
158+
)
159+
160+
const processMessageFetch = await fetch(
161+
`${scheduler.url}/${AoGlobal.Process.Id}?limit=1`
162+
).then(res => res.json())
163+
const processMessage = processMessageFetch.edges[0].node
164+
165+
const initMessage = {
166+
Id: AoGlobal.Process.Id,
167+
Signature: processMessage.message.signature,
168+
Data: processMessage.message.data,
169+
Owner: processMessage.message.owner.address,
170+
Target: AoGlobal.Process.Id,
171+
Anchor: processMessage.message.anchor,
172+
From: processMessage.message.owner.address,
173+
'Forwarded-By': undefined,
174+
Tags: processMessage.message.tags,
175+
Epoch: 0,
176+
Nonce: 0,
177+
Timestamp: parseInt(processMessage.assignment.tags.find(
178+
(t) => t.name === 'Timestamp'
179+
).value),
180+
'Block-Height': parseInt(processMessage.assignment.tags.find(
181+
(t) => t.name === 'Block-Height'
182+
).value),
183+
'Hash-Chain': processMessage.assignment.tags.find(
184+
(t) => t.name === 'Hash-Chain'
185+
).value,
186+
Cron: false,
187+
'Read-Only': false
188+
}
189+
const mem = new Uint8Array(null)
190+
const result = await wasmInstance(mem, initMessage, AoGlobal)
191+
return result
192+
}
193+
149194
/**
150195
* Evaluate a message using the handler that wraps the WebAssembly.Instance,
151196
* identified by the streamId.
@@ -164,22 +209,39 @@ export function evaluateWith ({
164209
* Finally, evaluates the message and returns the result of the evaluation.
165210
*/
166211
return ({ streamId, moduleId, wasmModule, moduleOptions, processId, noSave, name, deepHash, cron, ordinate, isAssignment, Memory, message, AoGlobal }) => {
212+
const shouldReset = ENABLE_MEMORY_RESET && message.Tags && message.Tags.some(t =>
213+
t.name === 'Action' &&
214+
t.value === 'Reset-Memory' &&
215+
AoGlobal.Process.Owner === message.Owner
216+
)
217+
218+
if (shouldReset) {
219+
wasmInstanceCache.delete(streamId)
220+
}
221+
167222
/**
168223
* Dynamically load the module, either from cache,
169224
* or from a file
170225
*/
171226
return maybeCachedInstance({ streamId, moduleId, wasmModule, moduleOptions, name, processId, Memory, message, AoGlobal })
172227
.bichain(loadInstance, Resolved)
228+
.chain(fromPromise(async (wasmInstance) => {
229+
const maybeSwappedResult = shouldReset
230+
? await evalInitialMemory({ wasmInstance, message, AoGlobal })
231+
: null
232+
233+
return { wasmInstance, Memory, maybeSwappedResult }
234+
}))
173235
/**
174236
* Perform the evaluation
175237
*/
176-
.chain((wasmInstance) =>
177-
of(wasmInstance)
178-
.map((wasmInstance) => {
238+
.chain(({ wasmInstance, Memory, maybeSwappedResult }) =>
239+
of({ wasmInstance, Memory, maybeSwappedResult })
240+
.map(({ wasmInstance, Memory, maybeSwappedResult }) => {
179241
logger('Evaluating message "%s" to process "%s"', name, processId)
180-
return wasmInstance
242+
return { wasmInstance, Memory, maybeSwappedResult }
181243
})
182-
.chain(fromPromise(async (wasmInstance) =>
244+
.chain(fromPromise(async ({ wasmInstance, Memory, maybeSwappedResult }) => {
183245
/**
184246
* AoLoader requires Memory to be a View, so that it can set the WebAssembly.Instance
185247
* memory.
@@ -196,8 +258,12 @@ export function evaluateWith ({
196258
*
197259
* TODO: check if any performance implications in using set() within AoLoaderrs
198260
*/
199-
wasmInstance(ArrayBuffer.isView(Memory) ? Memory : new Uint8Array(Memory), message, AoGlobal)
200-
))
261+
if (maybeSwappedResult) {
262+
return maybeSwappedResult
263+
}
264+
const mem = ArrayBuffer.isView(Memory) ? Memory : new Uint8Array(Memory)
265+
return wasmInstance(mem, message, AoGlobal)
266+
}))
201267
.bichain(
202268
/**
203269
* Map thrown error to a result.error. In this way, the Worker should _never_

servers/cu/src/effects/worker/evaluator/main.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { connect } from '@permaweb/ao-scheduler-utils'
2+
13
import * as WasmClient from '../../wasm.js'
24
import * as AoEvaluationClient from '../../ao-evaluation.js'
35
import * as DbClient from '../../db.js'
@@ -7,6 +9,13 @@ import { evaluateWith } from '../evaluate.js'
79
export const createApis = async (ctx) => {
810
const db = await DbClient.createDbClient({ url: ctx.DB_URL, bootstrap: false, max: 5 })
911
const wasmInstanceCache = WasmClient.createWasmInstanceCache({ MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE })
12+
const { locate } = connect({
13+
cacheSize: 100,
14+
GRAPHQL_URL: ctx.GRAPHQL_URL,
15+
followRedirects: true,
16+
GRAPHQL_MAX_RETRIES: 5,
17+
GRAPHQL_RETRY_BACKOFF: 300
18+
})
1019

1120
const close = async (streamId) => wasmInstanceCache.delete(streamId)
1221

@@ -24,7 +33,9 @@ export const createApis = async (ctx) => {
2433
logger: ctx.logger
2534
}),
2635
ARWEAVE_URL: ctx.ARWEAVE_URL,
27-
logger: ctx.logger
36+
locateScheduler: locate,
37+
logger: ctx.logger,
38+
ENABLE_MEMORY_RESET: true
2839
})
2940

3041
return { evaluate, close }

0 commit comments

Comments
 (0)