- 
                Notifications
    You must be signed in to change notification settings 
- Fork 4
Add alternative provider retrieval check #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
aedec80
              233cc1f
              83e7f31
              afe30dd
              23ee203
              4bc1076
              63424ff
              8a94f4e
              c4350b6
              edfdef1
              dbf0fd7
              d33f276
              97bee91
              4b6d0bc
              97fcc28
              5121a49
              4065784
              74f06e9
              ea8cce4
              f9afe34
              9959b50
              a2da050
              9759d80
              820e8a3
              5b13287
              3c14f84
              fe0f1f5
              ad8a8e8
              31019d0
              3710910
              c61a196
              d1f62fa
              05bd1c2
              3451eff
              8b8db36
              59d3d22
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -34,3 +34,6 @@ export { | |
| export { assertOkResponse } from 'https://cdn.skypack.dev/[email protected]/?dts' | ||
| import pRetry from 'https://cdn.skypack.dev/[email protected]/?dts' | ||
| export { pRetry } | ||
|  | ||
| import Prando from 'https://cdn.jsdelivr.net/npm/[email protected]/+esm' | ||
| export { Prando } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| /* global Zinnia */ | ||
|  | ||
| /** @import { Provider } from './ipni-client.js' */ | ||
| import { ActivityState } from './activity-state.js' | ||
| import { | ||
| SPARK_VERSION, | ||
|  | @@ -18,6 +19,7 @@ import { | |
| CarBlockIterator, | ||
| encodeHex, | ||
| HashMismatchError, | ||
| Prando, | ||
| UnsupportedHashError, | ||
| validateBlock, | ||
| } from '../vendor/deno-deps.js' | ||
|  | @@ -41,16 +43,17 @@ export default class Spark { | |
|  | ||
| async getRetrieval() { | ||
| const retrieval = await this.#tasker.next() | ||
| if (retrieval) { | ||
| if (retrieval.retrievalTask) { | ||
|          | ||
| console.log({ retrieval }) | ||
| } | ||
|         
                  pyropy marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
|  | ||
| return retrieval | ||
| } | ||
|  | ||
| async executeRetrievalCheck(retrieval, stats) { | ||
| console.log(`Calling Filecoin JSON-RPC to get PeerId of miner ${retrieval.minerId}`) | ||
| async executeRetrievalCheck({ retrievalTask, stats, randomness }) { | ||
| console.log(`Calling Filecoin JSON-RPC to get PeerId of miner ${retrievalTask.minerId}`) | ||
| try { | ||
| const peerId = await this.#getIndexProviderPeerId(retrieval.minerId) | ||
| const peerId = await this.#getIndexProviderPeerId(retrievalTask.minerId) | ||
| console.log(`Found peer id: ${peerId}`) | ||
| stats.providerId = peerId | ||
| } catch (err) { | ||
|  | @@ -70,19 +73,39 @@ export default class Spark { | |
| throw err | ||
| } | ||
|  | ||
| console.log(`Querying IPNI to find retrieval providers for ${retrieval.cid}`) | ||
| const { indexerResult, provider } = await queryTheIndex(retrieval.cid, stats.providerId) | ||
| console.log(`Querying IPNI to find retrieval providers for ${retrievalTask.cid}`) | ||
| const { indexerResult, provider, alternativeProviders } = await queryTheIndex( | ||
| retrievalTask.cid, | ||
| stats.providerId, | ||
| ) | ||
| stats.indexerResult = indexerResult | ||
|  | ||
| const providerFound = indexerResult === 'OK' || indexerResult === 'HTTP_NOT_ADVERTISED' | ||
| if (!providerFound) return | ||
| const noValidAdvertisement = indexerResult === 'NO_VALID_ADVERTISEMENT' | ||
|  | ||
| // In case index lookup failed we will not perform any retrieval | ||
| if (!providerFound && !noValidAdvertisement) return | ||
|  | ||
| // In case we fail to find a valid advertisement for the provider | ||
| // we will try to perform network wide retrieval from other providers | ||
| if (noValidAdvertisement) { | ||
| console.log( | ||
| 'No valid advertisement found. Trying to retrieve from an alternative provider...', | ||
| ) | ||
| stats.alternativeProviderCheck = await this.checkRetrievalFromAlternativeProvider({ | ||
| alternativeProviders, | ||
| randomness, | ||
| cid: retrievalTask.cid, | ||
| }) | ||
| return | ||
| } | ||
|  | ||
| stats.protocol = provider.protocol | ||
| stats.providerAddress = provider.address | ||
|  | ||
| await this.fetchCAR(provider.protocol, provider.address, retrieval.cid, stats) | ||
| await this.fetchCAR(provider.protocol, provider.address, retrievalTask.cid, stats) | ||
| if (stats.protocol === 'http') { | ||
| await this.testHeadRequest(provider.address, retrieval.cid, stats) | ||
| await this.testHeadRequest(provider.address, retrievalTask.cid, stats) | ||
| } | ||
| } | ||
|  | ||
|  | @@ -202,6 +225,31 @@ export default class Spark { | |
| } | ||
| } | ||
|  | ||
| async checkRetrievalFromAlternativeProvider({ alternativeProviders, randomness, cid }) { | ||
| if (!alternativeProviders.length) { | ||
| console.info('No alternative providers found for this CID.') | ||
| return | ||
| } | ||
|  | ||
| const randomProvider = pickRandomProvider(alternativeProviders, randomness) | ||
| if (!randomProvider) { | ||
| console.warn( | ||
| 'No providers serving the content via HTTP or Graphsync found. Skipping network-wide retrieval check.', | ||
| ) | ||
| return | ||
| } | ||
|          | ||
|  | ||
| const alternativeProviderRetrievalStats = newAlternativeProviderCheckStats() | ||
| await this.fetchCAR( | ||
| randomProvider.protocol, | ||
| randomProvider.address, | ||
| cid, | ||
| alternativeProviderRetrievalStats, | ||
| ) | ||
|  | ||
| return alternativeProviderRetrievalStats | ||
| } | ||
|  | ||
| async submitMeasurement(task, stats) { | ||
| console.log('Submitting measurement...') | ||
| const payload = { | ||
|  | @@ -228,17 +276,17 @@ export default class Spark { | |
| } | ||
|  | ||
| async nextRetrieval() { | ||
| const retrieval = await this.getRetrieval() | ||
| if (!retrieval) { | ||
| const { retrievalTask, randomness } = await this.getRetrieval() | ||
| if (!retrievalTask) { | ||
| console.log('Completed all tasks for the current round. Waiting for the next round to start.') | ||
| return | ||
| } | ||
|  | ||
| const stats = newStats() | ||
|  | ||
| await this.executeRetrievalCheck(retrieval, stats) | ||
| await this.executeRetrievalCheck({ retrievalTask, randomness, stats }) | ||
|  | ||
| const measurementId = await this.submitMeasurement(retrieval, { ...stats }) | ||
| const measurementId = await this.submitMeasurement(retrievalTask, { ...stats }) | ||
| Zinnia.jobCompleted() | ||
| return measurementId | ||
| } | ||
|  | @@ -315,6 +363,17 @@ export function newStats() { | |
| carChecksum: null, | ||
| statusCode: null, | ||
| headStatusCode: null, | ||
| alternativeProviderCheck: null, | ||
| } | ||
| } | ||
|  | ||
| function newAlternativeProviderCheckStats() { | ||
| return { | ||
| statusCode: null, | ||
| timeout: false, | ||
| endAt: null, | ||
| carTooLarge: false, | ||
| providerId: null, | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this also have  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or are we consciously omitting them? If so, could you please add a code comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if we're supposed to have them; I think it wouldn't be a big deal to add those fields. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't have them it means we could have a successful retrieval (using the alternative provider method) but not know the byte length, car checksum and head status code. @bajtos wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It depends on what do we want to use the alternative retrieval check measurement for. As I understand it, we want to calculate network-wide RSR for retrievals that include alternative providers so that we can show this RSR on the leaderboard. I don't see how we need  I'd say YAGNI, exclude these fields for now, and wait until we need them. | ||
| } | ||
| } | ||
|  | ||
|  | @@ -395,3 +454,62 @@ function mapErrorToStatusCode(err) { | |
| // Fallback code for unknown errors | ||
| return 600 | ||
| } | ||
|  | ||
| /** | ||
| * Picks a random provider based on their priority and supplied randomness. | ||
| * | ||
| * Providers are prioritized in the following order: | ||
| * | ||
| * 1. HTTP Providers with Piece Info advertisted in their ContextID. | ||
| * 2. Graphsync Providers with Piece Info advertisted in their ContextID. | ||
|         
                  pyropy marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| * 3. HTTP Providers. | ||
| * 4. Graphsync Providers. | ||
| * | ||
| * @param {Provider[]} providers | ||
| * @param {number} randomness | ||
| * @returns {Provider | undefined} | ||
| */ | ||
| export function pickRandomProvider(providers, randomness) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 | ||
| const rng = new Prando(randomness) | ||
|  | ||
| const filterByProtocol = (items, protocol) => | ||
| items.filter((provider) => provider.protocol === protocol) | ||
|  | ||
| const pickRandomItem = (items) => { | ||
| if (!items.length) return undefined | ||
| return items[Math.floor(rng.next() * items.length)] | ||
|         
                  juliangruber marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved          | ||
| } | ||
|  | ||
| const providersWithPieceInfoContextID = providers.filter( | ||
| (p) => p.contextId.startsWith('ghsA') && p.protocol !== 'bitswap', | ||
| ) | ||
|  | ||
| // Priority 1: HTTP providers with ContextID containing PieceCID | ||
| const httpProvidersWithPieceInfoContextID = filterByProtocol( | ||
| providersWithPieceInfoContextID, | ||
| 'http', | ||
| ) | ||
| if (httpProvidersWithPieceInfoContextID.length) { | ||
| return pickRandomItem(httpProvidersWithPieceInfoContextID, randomness) | ||
| } | ||
|  | ||
| // Priority 2: Graphsync providers with ContextID containing PieceCID | ||
| const graphsyncProvidersWithPieceInfoContextID = filterByProtocol( | ||
| providersWithPieceInfoContextID, | ||
| 'graphsync', | ||
| ) | ||
| if (graphsyncProvidersWithPieceInfoContextID.length) { | ||
| return pickRandomItem(graphsyncProvidersWithPieceInfoContextID, randomness) | ||
| } | ||
|  | ||
| // Priority 3: HTTP providers | ||
| const httpProviders = filterByProtocol(providers, 'http') | ||
| if (httpProviders.length) return pickRandomItem(httpProviders, randomness) | ||
|  | ||
| // Priority 4: Graphsync providers | ||
| const graphsyncProviders = filterByProtocol(providers, 'graphsync') | ||
| if (graphsyncProviders.length) return pickRandomItem(graphsyncProviders, randomness) | ||
|  | ||
| // No valid providers found | ||
| return undefined | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -15,6 +15,7 @@ export class Tasker { | |
| #remainingRoundTasks | ||
| #fetch | ||
| #activity | ||
| #randomness | ||
|  | ||
| /** | ||
| * @param {object} args | ||
|  | @@ -35,11 +36,12 @@ export class Tasker { | |
| } | ||
|  | ||
| /** | ||
| * @returns {Task | undefined} | ||
| * @returns {{retrievalTask?: RetrievalTask; randomness: number; }} | ||
| */ | ||
| async next() { | ||
| await this.#updateCurrentRound() | ||
| return this.#remainingRoundTasks.pop() | ||
| const retrievalTask = this.#remainingRoundTasks.pop() | ||
| return { retrievalTask, randomness: this.#randomness } | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We somehow need to export the round randomness so I have opted for returning object with  Maybe adding the  | ||
| } | ||
|  | ||
| async #updateCurrentRound() { | ||
|  | @@ -72,13 +74,13 @@ export class Tasker { | |
| console.log(' %s retrieval tasks', retrievalTasks.length) | ||
| this.maxTasksPerRound = maxTasksPerNode | ||
|  | ||
| const randomness = await getRandomnessForSparkRound(round.startEpoch) | ||
| console.log(' randomness: %s', randomness) | ||
| this.#randomness = await getRandomnessForSparkRound(round.startEpoch) | ||
| console.log(' randomness: %s', this.#randomness) | ||
|  | ||
| this.#remainingRoundTasks = await pickTasksForNode({ | ||
| tasks: retrievalTasks, | ||
| maxTasksPerRound: this.maxTasksPerRound, | ||
| randomness, | ||
| randomness: this.#randomness, | ||
| stationId: Zinnia.stationId, | ||
| }) | ||
|  | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have opted for using package instead of the custom implementation for the pRNG. There's lack of good packages for pRNG so I have settled in the end for Prando. I also wanted to use Deno's random package but from what I realize they have added it to newer versions of the std package which we don't use yet.
This may be a good thing to update in the future.