Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
0a807c1
packages, node-version, api changes libp2p v3
giurgiur99 Dec 17, 2025
4526d5d
lint fix
giurgiur99 Dec 17, 2025
41c138a
fix peer id key generation
giurgiur99 Dec 17, 2025
67c9be9
fix crypt
giurgiur99 Dec 17, 2025
cf42c04
fix integration assert
giurgiur99 Dec 17, 2025
1979209
use v20 for system tests
giurgiur99 Dec 17, 2025
40c1fc2
back to 22.5.1 ci
giurgiur99 Dec 17, 2025
5a24f4d
try force reload config
giurgiur99 Dec 18, 2025
915927c
Revert "try force reload config"
giurgiur99 Dec 18, 2025
334798e
equal check
giurgiur99 Dec 18, 2025
bdf9f2f
log ddo
giurgiur99 Dec 18, 2025
1f8d2c8
run all tests
giurgiur99 Dec 18, 2025
7768c7b
new instantce
giurgiur99 Dec 18, 2025
add6445
Revert "new instantce"
giurgiur99 Dec 18, 2025
bb377b2
log test
giurgiur99 Dec 18, 2025
1eff4bc
logs
giurgiur99 Dec 18, 2025
3d82d28
use correct ddo
giurgiur99 Dec 18, 2025
285dc08
revert changes
giurgiur99 Dec 18, 2025
f7dfe2c
force refresh config
giurgiur99 Dec 18, 2025
3a308e0
change order
giurgiur99 Dec 19, 2025
d08d638
remove logs
giurgiur99 Dec 19, 2025
5282ca6
tear down envs after tests
giurgiur99 Dec 19, 2025
953c65f
tear down
giurgiur99 Dec 19, 2025
4a25e77
new instance
giurgiur99 Dec 19, 2025
865f324
restart indexer
giurgiur99 Jan 5, 2026
193d365
skip db checks
giurgiur99 Jan 5, 2026
6d19c81
global check threads
giurgiur99 Jan 5, 2026
30098e9
cleanup
giurgiur99 Jan 5, 2026
79f1f67
revert p2p changes
giurgiur99 Jan 5, 2026
a701ce4
Revert "revert p2p changes"
giurgiur99 Jan 5, 2026
9555c61
Merge branch 'main' into p2p-v3
giurgiur99 Jan 6, 2026
afb8e09
custom dial logic
giurgiur99 Jan 6, 2026
65e01bd
Merge branch 'main' into p2p-v3
giurgiur99 Jan 6, 2026
9574285
event target and close connection
giurgiur99 Jan 6, 2026
c3c71f7
remove sinks
giurgiur99 Jan 7, 2026
f0f88f1
fix local node command
giurgiur99 Jan 7, 2026
fa27096
Merge branch 'main' into p2p-v3
giurgiur99 Jan 7, 2026
09afd5e
use peerstore if valid
giurgiur99 Jan 7, 2026
7acf2b7
dial multiaddrs instead of peerid
giurgiur99 Jan 8, 2026
076f885
use helper
giurgiur99 Jan 8, 2026
0efd727
streaming instead of buffering
giurgiur99 Jan 8, 2026
9a1a626
map chunk to buffer
giurgiur99 Jan 8, 2026
f2b2bff
lint fix
giurgiur99 Jan 8, 2026
0ec1b6f
no await on mapchunk
giurgiur99 Jan 8, 2026
0187a89
Merge branch 'main' into p2p-v3
giurgiur99 Jan 9, 2026
b0de415
pause the stream
giurgiur99 Jan 12, 2026
86dd427
resume stream on error
giurgiur99 Jan 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,397 changes: 1,090 additions & 1,307 deletions package-lock.json

Large diffs are not rendered by default.

38 changes: 19 additions & 19 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,25 @@
"changelog": "auto-changelog -p"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0",
"@chainsafe/libp2p-yamux": "^6.0.2",
"@chainsafe/libp2p-noise": "^17.0.0",
"@chainsafe/libp2p-yamux": "^8.0.1",
"@elastic/elasticsearch": "^8.14.0",
"@libp2p/autonat": "^2.0.0",
"@libp2p/bootstrap": "^10.1.1",
"@libp2p/circuit-relay-v2": "^1.1.1",
"@libp2p/crypto": "^4.1.5",
"@libp2p/dcutr": "^1.1.1",
"@libp2p/identify": "^2.1.1",
"@libp2p/kad-dht": "^12.1.1",
"@libp2p/mdns": "^10.1.1",
"@libp2p/peer-id": "^4.1.4",
"@libp2p/peer-id-factory": "^4.1.4",
"@libp2p/ping": "^1.1.1",
"@libp2p/pubsub": "^9.0.22",
"@libp2p/pubsub-peer-discovery": "^10.0.2",
"@libp2p/tcp": "^9.1.1",
"@libp2p/upnp-nat": "^1.2.1",
"@libp2p/websockets": "^8.1.1",
"@libp2p/autonat": "^3.0.9",
"@libp2p/bootstrap": "^12.0.10",
"@libp2p/circuit-relay-v2": "^4.1.2",
"@libp2p/crypto": "^5.1.13",
"@libp2p/dcutr": "^3.0.9",
"@libp2p/identify": "^4.0.9",
"@libp2p/kad-dht": "^16.1.2",
"@libp2p/mdns": "^12.0.10",
"@libp2p/peer-id": "^6.0.4",
"@libp2p/peer-id-factory": "^4.2.4",
"@libp2p/ping": "^3.0.9",
"@libp2p/pubsub": "^10.1.18",
"@libp2p/pubsub-peer-discovery": "^12.0.0",
"@libp2p/tcp": "^11.0.9",
"@libp2p/upnp-nat": "^4.0.9",
"@libp2p/websockets": "^10.1.2",
"@multiformats/multiaddr": "^12.2.3",
"@oceanprotocol/contracts": "^2.5.0",
"@oceanprotocol/ddo-js": "^0.1.4",
Expand All @@ -85,7 +85,7 @@
"ipaddr.js": "^2.3.0",
"it-pipe": "^3.0.1",
"jsonwebtoken": "^9.0.2",
"libp2p": "^1.8.0",
"libp2p": "^3.1.2",
"lodash": "^4.17.21",
"lzma-purejs-requirejs": "^1.0.0",
"node-cron": "^3.0.3",
Expand Down
62 changes: 16 additions & 46 deletions src/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import { Database } from './components/database/index.js'
import { Escrow } from './components/core/utils/escrow.js'
import { CoreHandlersRegistry } from './components/core/handler/coreHandlersRegistry.js'
import { OCEAN_NODE_LOGGER } from './utils/logging/common.js'
import { ReadableString } from './components/P2P/handleProtocolCommands.js'
import StreamConcat from 'stream-concat'
import { pipe } from 'it-pipe'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from './utils/logging/Logger.js'
import { BaseHandler } from './components/core/handler/handler.js'
import { C2DEngines } from './components/c2d/compute_engines.js'
Expand Down Expand Up @@ -147,57 +144,30 @@ export class OceanNode {
}

/**
* Use this method to direct calls to the node as node cannot dial into itself
* @param message command message
* @param sink transform function
* v3: Direct protocol command handler - no P2P, just call handler directly
* Returns {status, stream} without buffering
* @param message - JSON command string
*/
async handleDirectProtocolCommand(
message: string,
sink: any
): Promise<P2PCommandResponse> {
async handleDirectProtocolCommand(message: string): Promise<P2PCommandResponse> {
OCEAN_NODE_LOGGER.logMessage('Incoming direct command for ocean peer', true)
let status = null
// let statusStream
let sendStream = null
let response: P2PCommandResponse = null

OCEAN_NODE_LOGGER.logMessage('Performing task: ' + message, true)

try {
const task = JSON.parse(message)
const handler: BaseHandler = this.coreHandlers.getHandler(task.command)
if (handler === null) {
status = {
httpStatus: 501,
error: 'Unknown command or unexisting handler for command: ' + task.command
}
} else {
response = await handler.handle(task)
}

if (response) {
// eslint-disable-next-line prefer-destructuring
status = response.status
sendStream = response.stream
}

const statusStream = new ReadableString(JSON.stringify(status))
if (sendStream == null) {
pipe(statusStream, sink)
} else {
const combinedStream = new StreamConcat([statusStream, sendStream], {
highWaterMark: JSON.stringify(status).length
// the size of the buffer is important!
})
pipe(combinedStream, sink)
if (!handler) {
return {
stream: null,
status: {
httpStatus: 501,
error: 'Unknown command or missing handler for: ' + task.command
}
}
}

return (
response || {
status,
stream: null
}
)
// Return response directly without buffering
return await handler.handle(task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if handler is actually encrypt(1TB_large_file) ?

} catch (err) {
OCEAN_NODE_LOGGER.logMessageWithEmoji(
'handleDirectProtocolCommands Error: ' + err.message,
Expand All @@ -207,8 +177,8 @@ export class OceanNode {
)

return {
status: { httpStatus: 500, error: err.message },
stream: null
stream: null,
status: { httpStatus: 500, error: err.message }
}
}
}
Expand Down
59 changes: 16 additions & 43 deletions src/components/Indexer/processors/BaseProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { getDatabase } from '../../../utils/database.js'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { URLUtils } from '../../../utils/url.js'
import { streamToString } from '../../../utils/util.js'
import { streamToString, streamToUint8Array } from '../../../utils/util.js'
import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC721Template.sol/ERC721Template.json' with { type: 'json' }
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' with { type: 'json' }
Expand Down Expand Up @@ -359,42 +359,7 @@ export abstract class BaseEventProcessor {
} else {
try {
const p2pNode = await node.getP2PNode()
let isBinaryContent = false
const sink = async function (source: any) {
let first = true
for await (const chunk of source) {
if (first) {
first = false
try {
const str = uint8ArrayToString(chunk.subarray()) // Obs: we need to specify the length of the subarrays
const decoded = JSON.parse(str)
if ('headers' in decoded) {
if (str?.toLowerCase().includes('application/octet-stream')) {
isBinaryContent = true
}
}
if (decoded.httpStatus !== 200) {
INDEXER_LOGGER.logMessage(
`Error in sink method : ${decoded.httpStatus} errro: ${decoded.error}`
)
throw new Error('Error in sink method', decoded.error)
}
} catch (e) {
INDEXER_LOGGER.logMessage(
`Error in sink method } error: ${e.message}`
)
throw new Error(`Error in sink method ${e.message}`)
}
} else {
if (isBinaryContent) {
return chunk.subarray()
} else {
const str = uint8ArrayToString(chunk.subarray())
return str
}
}
}
}

const message = {
command: PROTOCOL_COMMANDS.DECRYPT_DDO,
transactionId: txId,
Expand All @@ -406,12 +371,20 @@ export abstract class BaseEventProcessor {
signature,
nonce
}
const response = await p2pNode.sendTo(
decryptorURL,
JSON.stringify(message),
sink
)
ddo = JSON.parse(await streamToString(response.stream as Readable))

const response = await p2pNode.sendTo(decryptorURL, JSON.stringify(message))

if (response.status.httpStatus !== 200) {
throw new Error(`Decrypt failed: ${response.status.error}`)
}

if (!response.stream) {
throw new Error('No data received from decrypt')
}

// Convert stream to Uint8Array
const data = await streamToUint8Array(response.stream as Readable)
ddo = JSON.parse(uint8ArrayToString(data))
} catch (error) {
const message = `Node exception on decrypt DDO. Status: ${error.message}`
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, message)
Expand Down
Loading
Loading