Skip to content
This repository was archived by the owner on Jan 6, 2026. It is now read-only.

Commit 7b4444c

Browse files
authored
Add implementation. Closes #1 (#2)
* docs * add get job from orchestrator * style * add fetch car via saturn gateway * add submit success to ingestor * fix submit retrieval * log * refactor * add loop * more stable loop * call `Zinnia.jobCompleted()` * collect ttfb * add response status * abort early if res not ok * refactor * add read timeout * report error activity * clean up * refactor * refactor * start test suite * add ci * refactor * test * add retries to integration test * fix ci * add unit test * simplify, add unit test * add unit test * todo * todo * more tests * add submit more fields * fmt * module.js -> main.js * ci: add lint * docs * add receive headers timeout * add `ActivityState` * move `test/all.js` to `test.js` * add missing throw in integration test * refactor using async iterator syntax * fix lint * allow more attempts for integration tests * add test order, arguments and length of `fetch` calls * fix submitRetrieval + test * fix lint * fix tests and condition * update `spark-api` schema * add check retrieval from api * include status code in error message * `startTimeout` -> `resetTimeout` * make `stats` also accessible if retrieval fails * use activity class * add assert res.ok * fix lint
1 parent 48d9d59 commit 7b4444c

File tree

7 files changed

+260
-0
lines changed

7 files changed

+260
-0
lines changed

.github/workflows/ci.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
name: CI
2+
on: [push]
3+
jobs:
4+
build:
5+
runs-on: ubuntu-latest
6+
steps:
7+
- uses: actions/checkout@v3
8+
- run: curl -L https://github.com/filecoin-station/zinnia/releases/download/v0.10.0/zinnia-linux-x64.tar.gz | tar -xz
9+
- uses: actions/setup-node@v3
10+
- run: npx standard
11+
- run: ./zinnia run test.js

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,16 @@ SP Retrieval Checker Module
33

44
- [Roadmap](https://pl-strflt.notion.site/SPARK-Roadmap-ac729c11c49b409fbec54751d1bc6c8a)
55
- [API](https://github.com/filecoin-station/spark-api)
6+
7+
## Development
8+
9+
Install [Zinnia CLI](https://github.com/filecoin-station/zinnia).
10+
11+
```bash
12+
$ # Lint
13+
$ npx standard
14+
$ # Run module
15+
$ zinnia run main.js
16+
$ # Test module
17+
$ zinnia run test.js
18+
```

lib/spark.js

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/* global Zinnia */
2+
3+
const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))
4+
5+
// Create activity events when we bacome operational or fail, but only once
6+
export class ActivityState {
7+
#ok = true
8+
9+
onError () {
10+
if (this.#ok) {
11+
this.#ok = false
12+
Zinnia.activity.error('SPARK failed reporting retrieval')
13+
}
14+
}
15+
16+
onSuccess () {
17+
if (!this.#ok) {
18+
this.#ok = true
19+
Zinnia.activity.success('SPARK retrieval reporting resumed')
20+
}
21+
}
22+
}
23+
24+
export default class Spark {
25+
#fetch
26+
#activity = new ActivityState()
27+
28+
constructor ({ fetch = globalThis.fetch } = {}) {
29+
this.#fetch = fetch
30+
}
31+
32+
async getRetrieval () {
33+
console.log('Geting retrieval...')
34+
const res = await this.#fetch('https://spark.fly.dev/retrievals', {
35+
method: 'POST'
36+
})
37+
const retrieval = await res.json()
38+
console.log({ retrieval })
39+
return retrieval
40+
}
41+
42+
async fetchCAR (url, stats) {
43+
console.log('Fetching CAR...')
44+
45+
// Abort if no progress was made for 10 seconds
46+
const controller = new AbortController()
47+
const { signal } = controller
48+
let timeout
49+
const resetTimeout = () => {
50+
if (timeout) {
51+
clearTimeout(timeout)
52+
}
53+
timeout = setTimeout(() => controller.abort(), 10_000)
54+
}
55+
56+
try {
57+
resetTimeout()
58+
const res = await this.#fetch(url, { signal })
59+
stats.statusCode = res.status
60+
61+
if (res.ok) {
62+
resetTimeout()
63+
for await (const value of res.body) {
64+
if (stats.firstByteAt === null) {
65+
stats.firstByteAt = new Date()
66+
}
67+
stats.byteLength += value.byteLength
68+
resetTimeout()
69+
}
70+
}
71+
} finally {
72+
clearTimeout(timeout)
73+
}
74+
75+
stats.endAt = new Date()
76+
console.log(stats)
77+
}
78+
79+
async submitRetrieval (id, stats) {
80+
console.log('Submitting retrieval...')
81+
const res = await this.#fetch(`https://spark.fly.dev/retrievals/${id}`, {
82+
method: 'PATCH',
83+
body: JSON.stringify({
84+
...stats,
85+
walletAddress: Zinnia.walletAddress
86+
}),
87+
headers: {
88+
'Content-Type': 'application/json'
89+
}
90+
})
91+
if (res.status !== 200) {
92+
let body
93+
try {
94+
body = await res.text()
95+
} catch {}
96+
throw new Error(`Failed to submit retrieval (${res.status}): ${body}`)
97+
}
98+
console.log('Retrieval submitted')
99+
}
100+
101+
async nextRetrieval () {
102+
const retrieval = await this.getRetrieval()
103+
104+
let success = false
105+
const stats = {
106+
startAt: new Date(),
107+
firstByteAt: null,
108+
endAt: null,
109+
byteLength: 0,
110+
statusCode: null
111+
}
112+
const url = `https://strn.pl/ipfs/${retrieval.cid}`
113+
try {
114+
await this.fetchCAR(url, stats)
115+
success = true
116+
this.#activity.onSuccess()
117+
} catch (err) {
118+
console.error(`Failed to fetch ${url}`)
119+
console.error(err)
120+
this.#activity.onError()
121+
}
122+
123+
await this.submitRetrieval(retrieval.id, { success, ...stats })
124+
Zinnia.jobCompleted()
125+
return retrieval.id
126+
}
127+
128+
async run () {
129+
while (true) {
130+
try {
131+
await this.nextRetrieval()
132+
} catch (err) {
133+
this.#activity.onError()
134+
console.error(err)
135+
}
136+
await sleep(1_000)
137+
}
138+
}
139+
}

main.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import Spark from './lib/spark.js'
2+
3+
const spark = new Spark()
4+
await spark.run()

test.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
import './test/integration.js'
2+
import './test/spark.js'

test/integration.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import Spark from '../lib/spark.js'
2+
import { test } from 'zinnia:test'
3+
import { assert } from 'zinnia:assert'
4+
5+
test('integration', async () => {
6+
const spark = new Spark()
7+
const id = await spark.nextRetrieval()
8+
const res = await fetch(`https://spark.fly.dev/retrievals/${id}`)
9+
assert(res.ok)
10+
const retrieval = await res.json()
11+
assert(retrieval.startAt)
12+
assert(retrieval.finishedAt)
13+
})

test/spark.js

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/* global Zinnia */
2+
3+
import Spark from '../lib/spark.js'
4+
import { test } from 'zinnia:test'
5+
import { assertInstanceOf, assertEquals } from 'zinnia:assert'
6+
7+
test('getRetrieval', async () => {
8+
const retrieval = { retrieval: 'retrieval' }
9+
const requests = []
10+
const fetch = async (url, opts) => {
11+
requests.push({ url, opts })
12+
return {
13+
async json () {
14+
return retrieval
15+
}
16+
}
17+
}
18+
const spark = new Spark({ fetch })
19+
assertEquals(await spark.getRetrieval(), retrieval)
20+
assertEquals(requests, [{
21+
url: 'https://spark.fly.dev/retrievals',
22+
opts: { method: 'POST' }
23+
}])
24+
})
25+
26+
// TODO: test more cases
27+
test('fetchCAR', async () => {
28+
const URL = 'url'
29+
const requests = []
30+
const fetch = async url => {
31+
requests.push({ url })
32+
return {
33+
status: 200,
34+
ok: true,
35+
body: (async function * () {
36+
yield new Uint8Array([1, 2, 3])
37+
})()
38+
}
39+
}
40+
const spark = new Spark({ fetch })
41+
const stats = {
42+
startAt: new Date(),
43+
firstByteAt: null,
44+
endAt: null,
45+
byteLength: 0,
46+
statusCode: null
47+
}
48+
await spark.fetchCAR(URL, stats)
49+
assertInstanceOf(stats.startAt, Date)
50+
assertInstanceOf(stats.firstByteAt, Date)
51+
assertInstanceOf(stats.endAt, Date)
52+
assertEquals(stats.byteLength, 3)
53+
assertEquals(stats.statusCode, 200)
54+
assertEquals(requests, [{ url: URL }])
55+
})
56+
57+
test('submitRetrieval', async () => {
58+
const requests = []
59+
const fetch = async (url, opts) => {
60+
requests.push({ url, opts })
61+
return { status: 200 }
62+
}
63+
const spark = new Spark({ fetch })
64+
await spark.submitRetrieval(0, { success: true })
65+
assertEquals(requests, [
66+
{
67+
url: 'https://spark.fly.dev/retrievals/0',
68+
opts: {
69+
method: 'PATCH',
70+
body: JSON.stringify({
71+
success: true,
72+
walletAddress: Zinnia.walletAddress
73+
}),
74+
headers: { 'Content-Type': 'application/json' }
75+
}
76+
}
77+
])
78+
})

0 commit comments

Comments
 (0)