Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ _Variables in bold are required._
| P2P_CONNECTION_HANDLER_MAX_INBOUND_STREAMS | `1024` | p2p handler max incoming streams limit at the same time on each connection |
| P2P_CONNECTION_HANDLER_MAX_OUTBOUND_STREAMS | `1024` | p2p handler max outgoing streams limit at the same time on each connection |
| P2P_CONNECTION_TAGGED_PEERS_VALUE | `100` | p2p tagged peers default value, see [tagged peers](#tagged-peers). |
| TELEMETRY_PORT | `3001` | The telemetry port number for the OpenTelemetry server to listen on. |
| ALLOW_READINESS_TWEAK | `false` | Allow to tewak readiness state - for dev and testing only. |
| READINESS_MAX_CONNECTIONS | `30` | Limit for readiness on active connections |
| READINESS_MAX_PENDING_REQUEST_BLOCKS | `5000` | Limit for readiness on pending request blocks |
| READINESS_MAX_EVENT_LOOP_UTILIZATION | `0.7` | Limit for readiness on Event Loop Utilization |
| HTTP_PORT | `3001` | The telemetry port number for the OpenTelemetry server to listen on. |
| NODE_DEBUG | | If it contains `aws-ipfs`, debug mode is enabled. |
| LOG_LEVEL | `info` | Logging level. |
| LOG_PRETTY | `false` | Enable pretty logging. |
Expand Down Expand Up @@ -110,22 +112,13 @@ The list is accepted as `all-or-nothing`, to avoid runtime issues; so if a singl
### Readiness

The `/readiness` endpoint on the http server is used by the load balancer to determine if the service is healthy or not.
The readiness state is set by the last DynamoDB an S3 request, and it's served instantly when called.
In case of state of error, the `/readiness` will perform calls to the DynamoDB and S3 services and will return the result state.

For testing purposes only, it's possible to set the readiness state by enabling `ALLOW_READINESS_TWEAK` and calling the `/readiness/tweak` endpoint passing the readiness state, for example:
Readiness considers:
- amount of active connections `bitswap-active-connections`
- pending request blocks `bitswap-pending-entries`
- event loop utilization (ELU) `bitswap-elu`

```bash
ALLOW_READINESS_TWEAK=true node src/index.js
```

so the state can be set calling

```bash
curl http://localhost:3001/readiness/tweak?dynamo=false&s3=true
```

will set the state of DynamoDB to error, and the following call to `/readiness` will return the error state
They are gathered on telemetry as gauges, and never resetted.

## Issues

Expand Down
3,567 changes: 1,978 additions & 1,589 deletions package-lock.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bitswap-peer",
"version": "0.14.0",
"version": "0.15.0",
"description": "Elastic IPFS BitSwap Peer",
"homepage": "https://github.com/elastic-ipfs/bitswap-peer",
"license": "(Apache-2.0 AND MIT)",
Expand All @@ -13,16 +13,16 @@
"lint": "eslint src test utils"
},
"dependencies": {
"@aws-sdk/client-sqs": "3.231.0",
"@aws-sdk/client-sqs": "3.241.0",
"@aws-sdk/node-http-handler": "3.226.0",
"@chainsafe/libp2p-noise": "10.2.0",
"@libp2p/mplex": "7.1.0",
"@libp2p/peer-id": "1.1.17",
"@libp2p/peer-id-factory": "1.0.19",
"@libp2p/websockets": "5.0.1",
"@libp2p/mplex": "7.1.1",
"@libp2p/peer-id": "1.1.18",
"@libp2p/peer-id-factory": "1.0.20",
"@libp2p/websockets": "5.0.2",
"dotenv": "16.0.3",
"e-ipfs-core-lib": "0.5.0",
"it-length-prefixed": "8.0.3",
"it-length-prefixed": "8.0.4",
"it-pipe": "2.0.5",
"libp2p": "0.41.0",
"lru-cache": "7.14.1",
Expand All @@ -40,7 +40,7 @@
"@ipld/car": "5.0.1",
"@ipld/dag-pb": "3.0.1",
"c8": "7.12.0",
"eslint": "8.29.0",
"eslint": "8.31.0",
"eslint-config-standard": "17.0.0",
"eslint-plugin-import": "2.26.0",
"eslint-plugin-node": "11.1.0",
Expand Down
11 changes: 7 additions & 4 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ export function makeConfig () {

peerAnnounceAddr: process.env.PEER_ANNOUNCE_ADDR,
port: process.env.PORT ? parseInt(process.env.PORT) : 3000,
httpPort: process.env.HTTP_PORT ? parseInt(process.env.PORT) : 3001,
httpPort: process.env.HTTP_PORT ? parseInt(process.env.HTTP_PORT) : 3001,

// readiness
readinessMaxConnections: process.env.READINESS_MAX_CONNECTIONS ? parseInt(process.env.READINESS_MAX_CONNECTIONS) : 30,
readinessMaxPendingRequestBlocks: process.env.READINESS_MAX_PENDING_REQUEST_BLOCKS ? parseInt(process.env.READINESS_MAX_PENDING_REQUEST_BLOCKS) : 5e3,
readinessMaxEventLoopUtilization: process.env.READINESS_MAX_EVENT_LOOP_UTILIZATION ? parseFloat(process.env.READINESS_MAX_EVENT_LOOP_UTILIZATION) : 0.7, // 0 to 1

// p2p
p2pConnectionMaxConnections: process.env.P2P_CONNECTION_MAX_CONNECTIONS ? parseInt(process.env.P2P_CONNECTION_MAX_CONNECTIONS) : 10e3,
Expand All @@ -71,9 +76,7 @@ export function makeConfig () {
dynamoMaxRetries: process.env.DYNAMO_MAX_RETRIES ? parseInt(process.env.DYNAMO_MAX_RETRIES) : 3,
dynamoRetryDelay: process.env.DYNAMO_RETRY_DELAY ? parseInt(process.env.DYNAMO_RETRY_DELAY) : 100, // ms
s3MaxRetries: process.env.S3_MAX_RETRIES ? parseInt(process.env.S3_MAX_RETRIES) : 3,
s3RetryDelay: process.env.S3_RETRY_DELAY ? parseInt(process.env.S3_RETRY_DELAY) : 100, // ms

allowReadinessTweak: process.env.ALLOW_READINESS_TWEAK === 'true'
s3RetryDelay: process.env.S3_RETRY_DELAY ? parseInt(process.env.S3_RETRY_DELAY) : 100 // ms
}
}

Expand Down
63 changes: 31 additions & 32 deletions src/health-check.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
import config from './config.js'
import { telemetry } from './telemetry.js'

import { getReadiness, setReadiness } from './storage.js'

const SUCCESS_CODE = 200
const ERROR_CODE = 503
/**
* `bitswap-request-duration` is been reset every telemetry.export call (in /metrics)
*/
export function getHealthCheckValues () {
return {
connections: telemetry.getGaugeValue('bitswap-active-connections'),
pendingRequestBlocks: telemetry.getGaugeValue('bitswap-pending-entries'),
eventLoopUtilization: telemetry.getGaugeValue('bitswap-elu')
}
}

export async function checkReadiness ({ awsClient, readinessConfig, allowReadinessTweak, logger }) {
const state = getReadiness()
/**
* called every 1 second
*/
export function checkReadiness (logger) {
const resources = getHealthCheckValues()

if (state.s3 && state.dynamo) {
return SUCCESS_CODE
if (resources.connections > config.readinessMaxConnections) {
logger.warn({ connections: resources.connections, maxConnections: config.readinessMaxConnections },
'Service is not ready due to max connections')
return false
}

if (allowReadinessTweak) {
// note success is already returned above, with or without allowReadinessTweak
return ERROR_CODE
if (resources.pendingRequestBlocks > config.readinessMaxPendingRequestBlocks) {
logger.warn({ pendingRequestBlocks: resources.pendingRequestBlocks, maxPendingRequestBlocks: config.readinessMaxPendingRequestBlocks },
'Service is not ready due to max pending request blocks')
return false
}

try {
logger.info('Readiness Probe Check')
await Promise.all([
awsClient.dynamoQueryBySortKey({
table: readinessConfig.dynamo.table,
keyName: readinessConfig.dynamo.keyName,
keyValue: readinessConfig.dynamo.keyValue
}),
awsClient.s3Fetch({
region: readinessConfig.s3.region,
bucket: readinessConfig.s3.bucket,
key: readinessConfig.s3.key
})
])

logger.info('Readiness Probe Succeed')
setReadiness({ s3: true, dynamo: true })
return SUCCESS_CODE
} catch (err) {
logger.error({ err }, 'Readiness Probe Failed')
return ERROR_CODE
if (resources.eventLoopUtilization > config.readinessMaxEventLoopUtilization) {
logger.warn({ eventLoopUtilization: resources.eventLoopUtilization, maxEventLoopUtilization: config.readinessMaxEventLoopUtilization },
'Service is not ready due to max event loop utilization')
return false
}

return true
}
41 changes: 8 additions & 33 deletions src/http-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import { createServer } from 'node:http'
import { URL } from 'node:url'
import { logger } from './logging.js'
import { checkReadiness } from './health-check.js'
import { setReadiness } from './storage.js'
import { getHealthCheckValues, checkReadiness } from './health-check.js'
import { telemetry } from './telemetry.js'
import { version } from './util.js'

const SUCCESS_CODE = 200
const ERROR_CODE = 503

class HttpServer {
startServer ({ port, awsClient, readinessConfig, allowReadinessTweak }) {
startServer ({ port }) {
if (this.server) {
return this.server
}
Expand All @@ -21,43 +23,16 @@ class HttpServer {
res.end()
break
case '/readiness': {
checkReadiness({ awsClient, readinessConfig, allowReadinessTweak, logger })
.then(httpStatus => {
res.writeHead(httpStatus).end()
})
break
}
case '/readiness/tweak': {
if (!allowReadinessTweak) {
res.writeHead(404).end()
break
}
try {
const dynamo = url.searchParams.get('dynamo')
const s3 = url.searchParams.get('s3')
setReadiness({
dynamo: dynamo ? dynamo === 'true' : undefined,
s3: s3 ? s3 === 'true' : undefined
})
res.writeHead(200).end()
} catch {
res.writeHead(400).end()
}
res.writeHead(checkReadiness(logger) ? SUCCESS_CODE : ERROR_CODE)
.end()
break
}
case '/load': {
res.writeHead(200, {
connection: 'close',
'content-type': 'application/json'
})

const resources = {
connections: telemetry.getGaugeValue('bitswap-active-connections'),
pendingRequestBlocks: telemetry.getGaugeValue('bitswap-pending-entries'),
eventLoopUtilization: telemetry.getGaugeValue('bitswap-elu'),
// note: duration it's been reset every /metrics call
responseDuration: telemetry.getHistogramValue('bitswap-request-duration') ?? -1
}
const resources = getHealthCheckValues()

res.end(JSON.stringify(resources))
break
Expand Down
31 changes: 5 additions & 26 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,6 @@ import { getPeerId } from './peer-id.js'
import { createConnectionConfig } from './util.js'

async function boot () {
const readinessConfig = {
dynamo: {
table: config.linkTableV1,
keyName: config.linkTableBlockKey,
keyValue: 'readiness'
},
s3: {
region: config.peerIdS3Region,
bucket: config.peerIdS3Bucket,
key: config.peerIdJsonFile
}
}

try {
const awsClient = await createAwsClient(config, logger)

Expand All @@ -32,25 +19,12 @@ async function boot () {
peerIdJsonPath: config.peerIdJsonPath
})

await awsClient.dynamoQueryBySortKey({
table: readinessConfig.dynamo.table,
keyName: readinessConfig.dynamo.keyName,
keyValue: readinessConfig.dynamo.keyValue
})

const taggedPeers = await awsClient.dynamoGetItem({
table: config.dynamoConfigTable,
keyName: config.dynamoConfigTableKey,
keyValue: config.dynamoConfigTableTaggedPeersKey
})

await httpServer.startServer({
port: config.httpPort,
awsClient,
readinessConfig,
allowReadinessTweak: config.allowReadinessTweak
})

process.nextTick(() => startService({
awsClient,
port: config.port,
Expand All @@ -59,6 +33,11 @@ async function boot () {
connectionConfig: createConnectionConfig(config),
taggedPeers: JSON.parse(taggedPeers.value)
}))

await httpServer.startServer({
port: config.httpPort,
awsClient
})
} catch (err) {
logger.fatal({ err }, 'Cannot start the service')
}
Expand Down
4 changes: 2 additions & 2 deletions src/peer-id.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ async function getPeerId ({ awsClient, peerIdS3Region, peerIdS3Bucket, peerIdJso
logger.info('peerId loaded from JSON ' + peerIdJsonPath)
return peer
} catch (err) {
logger.error({ err }, 'cant load peer file from ' + peerIdJsonPath)
return createEd25519PeerId()
logger.fatal({ err }, 'cant load peer file from ' + peerIdJsonPath)
throw new Error('cant load peer file from ' + peerIdJsonPath)
}
}

Expand Down
22 changes: 0 additions & 22 deletions src/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,6 @@ import LRU from 'lru-cache'
const blockInfoCache = config.cacheBlockInfo ? new LRUCache(config.cacheBlockInfoSize) : null
const blockDataCache = config.cacheBlockData ? new LRUCache(config.cacheBlockDataSize) : null

const readinessState = {
dynamo: true,
s3: true
}

export function getReadiness () {
return readinessState
}

/**
* allow to set readiness state, for testing purpose only
* @see README#readiness
*/
export function setReadiness (state) {
if (state.dynamo !== undefined) { readinessState.dynamo = state.dynamo }
if (state.s3 !== undefined) { readinessState.s3 = state.s3 }
}

async function searchCarInDynamoV1 ({
awsClient,
table = config.linkTableV1,
Expand Down Expand Up @@ -150,11 +132,9 @@ async function fetchBlockData ({ block, logger, awsClient }) {
block.data = { content, found: true }
telemetry.increaseLabelCount('bitswap-block', [TELEMETRY_TYPE_DATA, TELEMETRY_RESULT_HITS])
config.cacheBlockData && blockDataCache.set(cacheKey, content)
readinessState.s3 = true
return
} catch (error) {
telemetry.increaseLabelCount('bitswap-block', [TELEMETRY_TYPE_DATA, TELEMETRY_RESULT_ERROR])
readinessState.s3 = false
}

block.data = { notFound: true }
Expand Down Expand Up @@ -200,10 +180,8 @@ async function fetchBlockInfo ({ block, logger, awsClient }) {
config.cacheBlockInfo && blockInfoCache.set(block.key, info)
return
}
readinessState.dynamo = true
} catch (error) {
telemetry.increaseLabelCount('bitswap-block', [TELEMETRY_TYPE_INFO, TELEMETRY_RESULT_ERROR])
readinessState.dynamo = false
}

block.info = { notFound: true }
Expand Down
Loading