Skip to content

Conversation

alphaprinz
Copy link
Contributor

@alphaprinz alphaprinz commented Jul 20, 2025

Describe the Problem

Reduce db load spikes when all endpoints load from db simultaneously by having only core load from db and endpoints load from core instead.

Explain the Changes

  1. Introduce new RPC that allows endpoints to get system store from core.

Issues: Fixed #xxx / Gap #xxx

Testing Instructions:

  • Doc added/updated
  • Tests added

Summary by CodeRabbit

  • New Features

    • Added an API endpoint to fetch the system store without system-level auth and a way to obtain the latest in-memory DB snapshot.
  • Configuration

    • New SYSTEM_STORE_SOURCE setting (defaults to "DB") to choose data source (DB or CORE).
  • Behavior Changes

    • System store can load from CORE with automatic fallback to DB and a two-phase publish flow; load accepts an explicit core-load step.
  • Tests

    • Added integration test verifying CORE vs DB load parity.

Copy link

coderabbitai bot commented Jul 20, 2025

Walkthrough

Adds a GET API get_system_store returning recent system-store data in RPC buffers; implements a config-driven dual-source SystemStore (DB or CORE) with _read_new_data_from_core() and recent_db_data(); exports SOURCE and decode_json; threads load_from_core_step through inter-process APIs and adds a CORE-load integration test.

Changes

Cohort / File(s) Summary of changes
Public API: system store endpoint
src/api/system_api.js
Added GET get_system_store API method with reply schema including \[RPC_BUFFERS\].data and auth.system: false.
Server RPC handler
src/server/system_services/system_server.js
Imported RPC_BUFFERS; added async get_system_store() that calls system_store.recent_db_data(), JSON-stringifies into a Buffer, returns { [RPC_BUFFERS]: { data: Buffer } }; added error logging and exported the function.
SystemStore dual-source & helpers
src/server/system_services/system_store.js
Added exported SOURCE (DB/CORE); config-driven source selection with endpoint detection and logging; load(since, load_from_core_step) supports reading from CORE via _read_new_data_from_core() (uses server_rpc.client.system.get_system_store and decode_json) or falls back to DB; added recent_db_data(); added _read_new_data_from_core(); defensive table-definition handling; two-phase publish flow in make_changes.
Utilities export
src/util/postgres_client.js
Exported decode_json alongside existing exports.
Configuration
config.js
Added `config.SYSTEM_STORE_SOURCE = process.env.SYSTEM_STORE_SOURCE
Inter-process API param
src/api/server_inter_process_api.js
Extended load_system_store params with load_from_core_step (string enum ['CORE','ENDPOINT']).
Inter-process server flow
src/server/common_services/server_inter_process.js
load_system_store(req) now forwards req?.rpc_params?.load_from_core_step.toUpperCase() as second arg to system_store.load().
Tests
src/test/integration_tests/db/test_system_store.js
Added integration test "Load from core": constructs SystemStore with SOURCE.CORE, loads from DB and CORE and asserts deep equality; exposes SystemStore class for direct instantiation.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant API as system_api
  participant Srv as system_server
  participant Store as system_store

  Client->>API: GET /system/get_system_store
  API->>Srv: get_system_store()
  Srv->>Store: recent_db_data()
  Store-->>Srv: in-memory DB data
  Srv->>Srv: JSON.stringify -> Buffer
  Srv-->>API: { [RPC_BUFFERS]: { data: Buffer } }
  API-->>Client: Response with buffer payload
Loading
sequenceDiagram
  participant Store as SystemStore
  participant Config
  participant Core as server_rpc.client.system
  participant DB as Database
  participant Codec as decode_json
  participant Validator as db_client

  Store->>Config: read SYSTEM_STORE_SOURCE
  alt source == CORE and endpoint allowed
    Store->>Core: get_system_store()
    Core-->>Store: { [RPC_BUFFERS].data: Buffer(JSON) }
    Store->>Codec: decode per-collection JSON
    Codec-->>Store: Decoded docs
    Store->>Validator: validate collections
    Validator-->>Store: ok
    Store->>Store: populate in-memory data from CORE
  else DB or CORE failed
    Store->>DB: _read_new_data_from_db()
    DB-->>Store: rows
    Store->>Store: update old_db_data and data
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • System store phase1 #9148 — Overlaps changes to src/server/system_services/system_store.js concerning constructor/load and multi-step publish/load behavior.

Suggested reviewers

  • tangledbytes
  • dannyzaken
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 24e11b7 and d880a87.

📒 Files selected for processing (5)
  • src/api/system_api.js (1 hunks)
  • src/endpoint/endpoint.js (1 hunks)
  • src/server/system_services/system_server.js (3 hunks)
  • src/server/system_services/system_store.js (8 hunks)
  • src/util/postgres_client.js (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/server/system_services/system_server.js (1)
src/server/system_services/system_store.js (2)
  • require (41-41)
  • require (43-43)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: warp-tests / Warp Tests
  • GitHub Check: ceph-s3-tests / Ceph S3 Tests
  • GitHub Check: run-unit-tests / Unit Tests
🔇 Additional comments (2)
src/util/postgres_client.js (1)

1954-1954: LGTM! Clean export addition for system store RPC support.

This export enables the decode_json function to be used by the system store module for processing JSON data received via RPC from the core system, which aligns perfectly with the PR objectives to reduce database load by centralizing data access.

src/server/system_services/system_store.js (1)

545-565: Review _read_new_data_from_core: adjust error handling and confirm no circular RPC

We’ve confirmed that passing this.data into _read_new_data_from_core is correct, so no change is needed there. However, the method currently has no guard against RPC failures, and it’s worth double-checking that the get_system_store handler doesn’t itself call back into system_store.load() (which would create a circular dependency).

Please address the following:

• Remove the parameter-fix suggestion at line 438—this.data is already the new SystemStoreData instance.
• Add a try/catch around the RPC call to surface failures clearly:

 async _read_new_data_from_core(target) {
     dbg.log3("_read_new_data_from_core begins");
+      try {
         const res = await server_rpc.client.system.get_system_store();
         const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
         dbg.log3("_read_new_data_from_core new system store", ss);
         for (const key of Object.keys(ss)) {
           …  
         }
+      } catch (err) {
+          dbg.error("Failed to read system store from core:", err);
+          throw err;
+      }
 }

• Manually verify that the get_system_store RPC handler (in your core/service code) doesn’t invoke system_store.load() or _read_new_data_from_core, to avoid circular calls.

Comment on lines 173 to 174
// this point (and should not kill the process);
system_store.get_instance().load();
system_store.get_instance({source: system_store.SOURCE.CORE}).load();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider handling the promise rejection to avoid unhandled promise warnings.

The comment states that errors are not relevant, but the .load() promise might still reject and cause unhandled promise warnings. Consider adding a .catch() handler.

-                system_store.get_instance({source: system_store.SOURCE.CORE}).load();
+                system_store.get_instance({source: system_store.SOURCE.CORE}).load().catch(err => {
+                    dbg.warn('Failed to load system store from core:', err);
+                });

Also, verify that this change applies to all endpoint scenarios, not just when LOCAL_MD_SERVER is true.


🏁 Script executed:

#!/bin/bash
# Description: Check if there are other places where endpoints might need to use CORE source
# Look for other system_store.get_instance() calls in endpoint-related files

rg -A 5 'system_store\.get_instance\(' --glob 'src/endpoint/**/*.js'

Length of output: 902


Handle unhandled promise for system_store.load()

The .load() call returns a promise and may reject, triggering unhandled‐promise warnings. Add a .catch() handler to swallow or log errors as intended.

  • File: src/endpoint/endpoint.js
    Lines ~173–174 in the LOCAL_MD_SERVER block

Suggested diff:

-                system_store.get_instance({source: system_store.SOURCE.CORE}).load();
+                system_store.get_instance({source: system_store.SOURCE.CORE})
+                    .load()
+                    .catch(err => {
+                        dbg.warn('Failed to load system store from core:', err);
+                    });

I ran a search for other system_store.get_instance( calls under src/endpoint/ and found only this one use of .load() with CORE—no additional endpoints require the same change.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// this point (and should not kill the process);
system_store.get_instance().load();
system_store.get_instance({source: system_store.SOURCE.CORE}).load();
// this point (and should not kill the process);
system_store.get_instance({source: system_store.SOURCE.CORE})
.load()
.catch(err => {
dbg.warn('Failed to load system store from core:', err);
});
🤖 Prompt for AI Agents
In src/endpoint/endpoint.js around lines 173 to 174, the call to
system_store.get_instance(...).load() returns a promise that may reject, causing
unhandled promise warnings. Fix this by adding a .catch() handler to the load()
call to handle any rejection, either by logging the error or swallowing it as
appropriate to prevent unhandled promise rejections.

Comment on lines 306 to 309
async function get_system_store() {
return {
[RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
};
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add error handling and consider security implications of exposing system store data.

This function exposes the entire system store data without any visible authorization checks or data filtering. Additionally, it lacks error handling.

Consider the following improvements:

  1. Add error handling:
 async function get_system_store() {
+    try {
         return {
             [RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
         };
+    } catch (err) {
+        dbg.error('Failed to get system store data:', err);
+        throw new RpcError('INTERNAL', 'Failed to retrieve system store data');
+    }
 }
  1. Verify authorization requirements:

  2. Consider implementing data filtering to avoid exposing sensitive information like credentials or keys.

  3. For large datasets, consider implementing pagination or streaming to avoid memory issues with JSON serialization.


🏁 Script executed:

#!/bin/bash
# Description: Check how this RPC method is exposed and what authorization it requires
# Look for the API definition and any auth requirements

# Search for get_system_store in API definitions
rg -A 10 'get_system_store' src/api/

# Check if there are any auth decorators or middleware
ast-grep --pattern 'get_system_store: {
  $$$
}'

Length of output: 509


Enforce authorization and add error handling in get_system_store

The get_system_store RPC is currently exposed via HTTP GET with auth: { system: false } and no error handling or data filtering. This allows unauthenticated clients to fetch the entire store and may crash the server on large payloads.

Please address the following:

  • Require system-level auth
    In src/api/system_api.js, change

    get_system_store: {
        method: 'GET',
        auth: { system: false },
    }

    to

    auth: { system: true }

    or integrate your authorization middleware so only permitted clients can call this RPC.

  • Add error handling
    Wrap the data fetch/serialize in try/catch and return a proper RPC error:

     async function get_system_store() {
  • try {
    return {
    [RPC_BUFFERS]: {
    data: Buffer.from(JSON.stringify(await system_store.recent_db_data())),
    },
    };
  • } catch (err) {
  •  dbg.error('get_system_store failed:', err);
    
  •  throw new RpcError('INTERNAL', 'Unable to retrieve system store data');
    
  • }
    }

- **Filter sensitive data**  
Only include non-sensitive fields in the payload (e.g. omit credentials, tokens, or PII).

- **Handle large datasets**  
Consider implementing pagination or streaming (e.g. chunked responses) to avoid high memory/CPU usage.

<details>
<summary>🤖 Prompt for AI Agents</summary>

In src/server/system_services/system_server.js around lines 306 to 310 and in
src/api/system_api.js where get_system_store is defined, update the auth setting
to require system-level authorization by setting auth: { system: true }. In the
get_system_store function, wrap the data retrieval and serialization in a
try/catch block to handle errors gracefully and return an appropriate RPC error
on failure. Modify the data returned to exclude sensitive information such as
credentials, tokens, or PII by filtering the fields before serialization. To
handle large datasets, implement pagination or chunked responses to limit memory
and CPU usage during data transmission.


</details>

<!-- fingerprinting:phantom:medusa:lion -->

<!-- This is an auto-generated comment by CodeRabbit -->

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (2)
src/util/postgres_client.js (1)

1954-1954: Exporting decode_json widens the public API – consider symmetry & documentation

  1. This is the first time an internal helper is exposed from this module. Down-stream code may now rely on the exact traversal semantics (wrapper unwrapping, recursive decode, etc.), making future refactors harder.

  2. encode_json is still internal. External callers that use decode_json almost always need the reverse operation. Exporting only one direction invites accidental misuse or duplicated logic elsewhere.

  3. decode_json silently unwraps SensitiveString, which leaks plain-text secrets to any caller. If that’s intentional, call-sites should at least opt-in explicitly or a comment should warn about it.

Action:

 exports.PostgresClient = PostgresClient;
 exports.instance = PostgresClient.instance;
+// NOTE: exporting both helpers keeps the API symmetrical and explicit.
+exports.encode_json = encode_json;
 exports.decode_json = decode_json;

…and add JSDoc explaining the security implication.

src/server/system_services/system_server.js (1)

306-310: Consider performance implications for large system stores

For systems with many buckets, accounts, or other entities, the system store data could be quite large. Serializing and buffering the entire dataset might impact memory usage and RPC transfer times.

Consider implementing:

  1. Data size limits or pagination
  2. Compression before sending over RPC
  3. Monitoring the size of data being transferred
  4. Caching mechanism to avoid repeated serialization
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 24e11b7 and d880a87.

📒 Files selected for processing (5)
  • src/api/system_api.js (1 hunks)
  • src/endpoint/endpoint.js (1 hunks)
  • src/server/system_services/system_server.js (3 hunks)
  • src/server/system_services/system_store.js (8 hunks)
  • src/util/postgres_client.js (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: ceph-s3-tests / Ceph S3 Tests
🔇 Additional comments (2)
src/endpoint/endpoint.js (1)

174-174: Approved – SOURCE.CORE is properly defined

Verified in src/server/system_services/system_store.js that the SOURCE enum includes

CORE: 'CORE',

and is exported (exports.SOURCE = SOURCE). No further action needed.

src/server/system_services/system_store.js (1)

360-361: Improve source selection logic robustness

The current logic for determining the source has potential issues:

  1. It assumes endpoints have "endpoint" in their hostname
  2. The logic is inverted - it should check if hostname contains "endpoint" to set source to CORE

The condition should be:

-this.source = (process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") === -1) ? SOURCE.DB : SOURCE.CORE;
+this.source = (process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1) ? SOURCE.CORE : SOURCE.DB;

Additionally, consider making this more configurable:

+const source_override = process.env.SYSTEM_STORE_SOURCE;
+this.source = source_override || ((process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1) ? SOURCE.CORE : SOURCE.DB);

Likely an incorrect or invalid review comment.

Comment on lines 464 to 475
get_system_store: {
method: 'GET',
auth: {
system: false
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add reply schema and documentation for the new API method.

The new get_system_store API method is missing essential documentation and schema definition:

  1. Missing reply schema: All other methods in this API define a proper reply schema for validation and documentation purposes.
  2. Missing documentation: The doc field should describe the purpose of this endpoint.
  3. Verify authentication requirements: Confirm that system: false is the correct authentication level for this endpoint.
 get_system_store: {
+    doc: 'Get system store data from core',
     method: 'GET',
+    reply: {
+        type: 'object',
+        // Define the expected structure of system store data
+        additionalProperties: true
+    },
     auth: {
         system: false
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
get_system_store: {
method: 'GET',
auth: {
system: false
}
}
get_system_store: {
doc: 'Get system store data from core',
method: 'GET',
reply: {
type: 'object',
// Define the expected structure of system store data
additionalProperties: true
},
auth: {
system: false
}
}
🤖 Prompt for AI Agents
In src/api/system_api.js around lines 464 to 469, the get_system_store API
method lacks a reply schema and documentation. Add a reply schema that defines
the expected response structure for validation and documentation. Include a doc
field describing the endpoint's purpose clearly. Also, review and confirm if the
auth setting system: false is appropriate for this method's security
requirements, adjusting if necessary.

Comment on lines 306 to 309
async function get_system_store() {
return {
[RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
};
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for potential failures

The function should handle potential errors from recent_db_data() or JSON.stringify() operations to prevent unhandled exceptions.

Consider wrapping the operations in a try-catch block:

 async function get_system_store() {
+    try {
         return {
             [RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
         };
+    } catch (err) {
+        dbg.error('get_system_store: Failed to get system store data', err);
+        throw new RpcError('INTERNAL_ERROR', 'Failed to retrieve system store data');
+    }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function get_system_store() {
return {
[RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
};
}
async function get_system_store() {
try {
return {
[RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
};
} catch (err) {
dbg.error('get_system_store: Failed to get system store data', err);
throw new RpcError('INTERNAL_ERROR', 'Failed to retrieve system store data');
}
}
🤖 Prompt for AI Agents
In src/server/system_services/system_server.js around lines 306 to 310, the
async function get_system_store does not handle errors that may occur during the
call to recent_db_data() or the JSON.stringify operation. To fix this, wrap the
entire operation inside a try-catch block, returning a meaningful error or
fallback value in the catch block to prevent unhandled exceptions and ensure the
function handles failures gracefully.

Comment on lines +545 to +595
async _read_new_data_from_core(target) {
dbg.log3("_read_new_data_from_core begins");
const res = await server_rpc.client.system.get_system_store();
const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
dbg.log3("_read_new_data_from_core new system store", ss);
for (const key of Object.keys(ss)) {
const collection = COLLECTIONS_BY_NAME[key];
if (collection) {
target[key] = [];
_.each(ss[key], item => {
//these two lines will transform string values into appropriately typed objects
//(SensitiveString, ObjectId) according to schema
const after = decode_json(collection.schema, item);
db_client.instance().validate(key, after);
target[key].push(after);
});
} else {
target[key] = ss[key];
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Monitor RPC data transfer and implement operational safeguards

Since endpoints will now fetch system store data via RPC from core, consider implementing:

  1. Metrics to monitor the frequency and size of these RPC calls
  2. Rate limiting to prevent endpoints from overwhelming the core
  3. Circuit breaker pattern for RPC failures
  4. Alerts for unusually large data transfers

Let me verify if there are any existing rate limiting or monitoring mechanisms:


🏁 Script executed:

#!/bin/bash
# Search for rate limiting or monitoring in RPC client code
rg -A 5 "rate.*limit|throttle|circuit.*breaker" --glob "src/rpc/**/*.js"

# Check if there are any metrics collection for RPC calls
rg -A 5 "metrics|monitor|stats.*rpc" --glob "src/rpc/**/*.js"

Length of output: 138


Implement RPC Monitoring, Rate Limiting, and Resilience Patterns

No existing rate-limit, metrics, or circuit-breaker logic was found around your RPC client (verification via rg returned no matches), so let’s add operational safeguards:

• In src/server/system_services/system_store.js (around the _read_new_data_from_core call at lines 545–565):
– Instrument a counter and histogram (e.g. Prometheus) for calls to server_rpc.client.system.get_system_store() (call count, payload size, latency).
– Emit alerts if payload size or latency exceeds thresholds.

• In your RPC client layer (e.g. src/rpc/client/*.js):
– Implement rate limiting or token bucket to cap calls per second.
– Wrap the get_system_store invocation in a circuit-breaker (e.g. with a library like opossum) to fail fast on repeated errors.

• Add configuration knobs for thresholds and integrate with your observability stack (alerts, dashboards).

These changes will help you detect abnormal patterns, protect the core from overload, and degrade gracefully on failures.

🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 545 to 565, add
Prometheus metrics instrumentation for the call to
server_rpc.client.system.get_system_store(), including a counter for call count,
a histogram for payload size and latency, and emit alerts if thresholds are
exceeded. Additionally, in the RPC client layer (e.g., src/rpc/client/*.js),
implement rate limiting (such as a token bucket) to cap calls per second and
wrap the get_system_store call in a circuit breaker (using a library like
opossum) to fail fast on repeated errors. Add configuration options for these
thresholds and integrate the metrics and alerts with the observability stack for
monitoring and graceful degradation.

⚠️ Potential issue

Add comprehensive error handling and improve validation

The method needs error handling for RPC calls and JSON parsing. Also, validation should happen before data transformation.

 async _read_new_data_from_core(target) {
     dbg.log3("_read_new_data_from_core begins");
+    try {
         const res = await server_rpc.client.system.get_system_store();
+        if (!res || !res[RPC_BUFFERS] || !res[RPC_BUFFERS].data) {
+            throw new Error('Invalid response from core system store');
+        }
         const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
         dbg.log3("_read_new_data_from_core new system store", ss);
         for (const key of Object.keys(ss)) {
             const collection = COLLECTIONS_BY_NAME[key];
             if (collection) {
                 target[key] = [];
                 _.each(ss[key], item => {
+                    // Validate before transformation
+                    db_client.instance().validate(key, item, 'warn');
                     //these two lines will transform string values into appropriately typed objects
                     //(SensitiveString, ObjectId) according to schema
                     const after = decode_json(collection.schema, item);
-                    db_client.instance().validate(key, after);
+                    // Validate again after transformation to ensure correctness
+                    db_client.instance().validate(key, after, 'error');
                     target[key].push(after);
                 });
             } else {
                 target[key] = ss[key];
             }
         }
+    } catch (err) {
+        dbg.error('_read_new_data_from_core: Failed to read data from core', err);
+        throw new RpcError('INTERNAL_ERROR', `Failed to load system store from core: ${err.message}`);
+    }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async _read_new_data_from_core(target) {
dbg.log3("_read_new_data_from_core begins");
const res = await server_rpc.client.system.get_system_store();
const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
dbg.log3("_read_new_data_from_core new system store", ss);
for (const key of Object.keys(ss)) {
const collection = COLLECTIONS_BY_NAME[key];
if (collection) {
target[key] = [];
_.each(ss[key], item => {
//these two lines will transform string values into appropriately typed objects
//(SensitiveString, ObjectId) according to schema
const after = decode_json(collection.schema, item);
db_client.instance().validate(key, after);
target[key].push(after);
});
} else {
target[key] = ss[key];
}
}
}
async _read_new_data_from_core(target) {
dbg.log3("_read_new_data_from_core begins");
try {
const res = await server_rpc.client.system.get_system_store();
if (!res || !res[RPC_BUFFERS] || !res[RPC_BUFFERS].data) {
throw new Error('Invalid response from core system store');
}
const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
dbg.log3("_read_new_data_from_core new system store", ss);
for (const key of Object.keys(ss)) {
const collection = COLLECTIONS_BY_NAME[key];
if (collection) {
target[key] = [];
_.each(ss[key], item => {
// Validate before transformation
db_client.instance().validate(key, item, 'warn');
// these two lines will transform string values into appropriately typed objects
// (SensitiveString, ObjectId) according to schema
const after = decode_json(collection.schema, item);
// Validate again after transformation to ensure correctness
db_client.instance().validate(key, after, 'error');
target[key].push(after);
});
} else {
target[key] = ss[key];
}
}
} catch (err) {
dbg.error('_read_new_data_from_core: Failed to read data from core', err);
throw new RpcError('INTERNAL_ERROR', `Failed to load system store from core: ${err.message}`);
}
}
🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 545 to 565, add
try-catch blocks to handle errors from the RPC call and JSON parsing to prevent
crashes. Move the validation step to occur before the decode_json transformation
to ensure data integrity early. Log or handle any errors appropriately within
the catch block to maintain robustness.

@alphaprinz alphaprinz force-pushed the system_store_phase2 branch from d880a87 to 766302a Compare July 24, 2025 11:56
@alphaprinz alphaprinz force-pushed the system_store_phase2 branch from 766302a to 63954aa Compare August 15, 2025 16:46
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
src/server/system_services/system_store.js (3)

431-431: Avoid allocating new_data when not reading from DB

new_data is created unconditionally (Line 431) but is only used in the DB path. Instantiate it only when needed.

-                const new_data = new SystemStoreData();
+                // new_data is only needed when reading from DB
+                let new_data;
@@
-                if (this.source === SOURCE.DB || from_core_failure) {
-                    await this._read_new_data_from_db(new_data);
+                if (this.source === SOURCE.DB || from_core_failure) {
+                    new_data = new SystemStoreData();
+                    await this._read_new_data_from_db(new_data);
                 }

Also applies to: 446-448


554-574: Harden _read_new_data_from_core with payload validation and error handling

Validate the RPC payload shape and JSON parsing; pre/post-validate items. Currently, a malformed response will throw TypeError/JSON errors without clear context.

-    async _read_new_data_from_core(target) {
-        dbg.log3("_read_new_data_from_core begins");
-        const res = await server_rpc.client.system.get_system_store();
-        const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
-        dbg.log3("_read_new_data_from_core new system store", ss);
-        for (const key of Object.keys(ss)) {
-            const collection = COLLECTIONS_BY_NAME[key];
-            if (collection) {
-                target[key] = [];
-                _.each(ss[key], item => {
-                    //these two lines will transform string values into appropriately typed objects
-                    //(SensitiveString, ObjectId) according to schema
-                    const after = decode_json(collection.schema, item);
-                    db_client.instance().validate(key, after);
-                    target[key].push(after);
-                });
-            } else {
-                target[key] = ss[key];
-            }
-        }
-    }
+    async _read_new_data_from_core(target) {
+        dbg.log3('_read_new_data_from_core begins');
+        try {
+            const res = await server_rpc.client.system.get_system_store();
+            if (!res || !res[RPC_BUFFERS] || !res[RPC_BUFFERS].data) {
+                throw new Error('Invalid CORE reply (missing buffer payload)');
+            }
+            const buf = res[RPC_BUFFERS].data;
+            const ss = JSON.parse(buf.toString());
+            dbg.log3('_read_new_data_from_core new system store', ss);
+            for (const key of Object.keys(ss)) {
+                const collection = COLLECTIONS_BY_NAME[key];
+                if (collection) {
+                    target[key] = [];
+                    _.each(ss[key], item => {
+                        // Validate before and after decode to surface issues early
+                        db_client.instance().validate(key, item, 'warn');
+                        const after = decode_json(collection.schema, item);
+                        db_client.instance().validate(key, after);
+                        target[key].push(after);
+                    });
+                } else {
+                    target[key] = ss[key];
+                }
+            }
+        } catch (err) {
+            dbg.error('_read_new_data_from_core failed', err);
+            throw err;
+        }
+    }

484-487: Handle CORE source in recent_db_data to avoid returning undefined

As written, recent_db_data returns undefined when source is CORE and no DB fallback has populated old_db_data, which can break the server handler that serializes this value. Either throw clearly or load a DB snapshot on-demand. The latter keeps the CORE RPC functional even if someone misconfigures the core process.

-    //return the latest copy of in-memory data
-    async recent_db_data() {
-        return this._load_serial.surround(async () => this.old_db_data);
-    }
+    // return the latest copy of in-memory DB-backed data
+    async recent_db_data() {
+        return this._load_serial.surround(async () => {
+            if (this.source !== SOURCE.DB && !this.old_db_data) {
+                // Ensure we can still serve a snapshot for RPC callers
+                const snapshot = new SystemStoreData();
+                await this._read_data_from_db(snapshot);
+                this.old_db_data = snapshot;
+            }
+            return this.old_db_data || {};
+        });
+    }

Run to verify call sites and potential serialization issues:

#!/bin/bash
# Inspect where recent_db_data() is used and how it's serialized
rg -n -C3 '\brecent_db_data\s*\(' src

# Inspect the system RPC handler for get_system_store
rg -n -C5 '\bget_system_store\s*\(' src/server/system_services
🧹 Nitpick comments (3)
config.js (1)

253-253: Normalize and validate SYSTEM_STORE_SOURCE early

Lowercase the env value once at definition to avoid locale pitfalls downstream and make comparisons simple. Consider warning on invalid values.

-config.SYSTEM_STORE_SOURCE = process.env.SYSTEM_STORE_SOURCE || "db";
+// Accepts: 'db' | 'core' (case-insensitive). Defaults to 'db'.
+config.SYSTEM_STORE_SOURCE = (process.env.SYSTEM_STORE_SOURCE || 'db').toLowerCase();
src/server/system_services/system_store.js (2)

360-362: Use toLowerCase and guard unknown values when selecting source

toLocaleLowerCase can behave unexpectedly on certain locales (e.g., Turkish). Also, explicitly handle unexpected values.

-        this.source = config.SYSTEM_STORE_SOURCE.toLocaleLowerCase() === 'core' ? SOURCE.CORE : SOURCE.DB;
-        dbg.log0("system store source is", this.source);
+        const src = String(config.SYSTEM_STORE_SOURCE || 'db').toLowerCase();
+        if (src !== 'core' && src !== 'db') {
+            dbg.warn('SYSTEM_STORE_SOURCE has unknown value:', src, '— defaulting to DB');
+        }
+        this.source = (src === 'core') ? SOURCE.CORE : SOURCE.DB;
+        dbg.log0('system store source is', this.source);

453-456: Fix fetch logging to reflect the actual loaded dataset

When loading from CORE successfully, logging size/data based on new_data is misleading (often empty). Use the dataset actually fetched.

-                    dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
-                    dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
+                    const fetched = (this.source === SOURCE.DB || from_core_failure) ? new_data : this.data;
+                    dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(fetched).length));
+                    dbg.log1('SystemStore: fetch data', util.inspect(fetched, {
                         depth: 4
                     }));
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 766302a and 63954aa.

📒 Files selected for processing (5)
  • config.js (1 hunks)
  • src/api/system_api.js (1 hunks)
  • src/server/system_services/system_server.js (3 hunks)
  • src/server/system_services/system_store.js (8 hunks)
  • src/util/postgres_client.js (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/util/postgres_client.js
  • src/api/system_api.js
  • src/server/system_services/system_server.js
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-08T13:10:36.141Z
Learnt from: naveenpaul1
PR: noobaa/noobaa-core#9182
File: src/server/system_services/pool_server.js:1314-1317
Timestamp: 2025-08-08T13:10:36.141Z
Learning: In src/server/system_services/pool_server.js (and config usage), the constant config.INTERNAL_STORAGE_POOL_NAME has been removed from the system. Future logic should not depend on this constant and should instead use config.DEFAULT_POOL_NAME or structural markers (e.g., pool.resource_type === 'INTERNAL' or pool.mongo_info) to identify internal/mongo pools.

Applied to files:

  • config.js
🧬 Code Graph Analysis (2)
config.js (19)
src/endpoint/endpoint.js (1)
  • config (18-18)
src/server/node_services/nodes_monitor.js (1)
  • config (15-15)
src/sdk/namespace_fs.js (1)
  • config (13-13)
src/util/http_utils.js (2)
  • config (19-19)
  • process (41-41)
src/manage_nsfs/nc_lifecycle.js (1)
  • config (10-10)
src/util/notifications_util.js (1)
  • config (5-5)
src/util/fork_utils.js (1)
  • config (11-11)
src/cmd/manage_nsfs.js (1)
  • config (18-18)
src/sdk/bucketspace_fs.js (1)
  • config (9-9)
src/endpoint/s3/s3_rest.js (1)
  • config (15-15)
src/util/native_fs_utils.js (1)
  • config (10-10)
src/manage_nsfs/manage_nsfs_glacier.js (1)
  • config (6-6)
src/cmd/nsfs.js (1)
  • config (20-20)
src/util/debug_module.js (1)
  • config (24-24)
src/agent/block_store_services/block_store_base.js (1)
  • config (9-9)
src/server/bg_workers.js (1)
  • config (16-16)
src/server/web_server.js (1)
  • process (243-248)
src/server/common_services/auth_server.js (1)
  • process (178-185)
src/server/bg_services/server_monitor.js (1)
  • process (51-51)
src/server/system_services/system_store.js (3)
src/server/system_services/system_server.js (5)
  • require (19-19)
  • require (20-20)
  • require (22-22)
  • require (45-45)
  • _ (6-6)
src/util/postgres_client.js (5)
  • require (13-13)
  • require (14-14)
  • require (26-26)
  • _ (5-5)
  • res (255-255)
src/server/system_services/master_key_manager.js (2)
  • _ (4-4)
  • db_client (8-8)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build Noobaa Image
  • GitHub Check: run-jest-unit-tests
  • GitHub Check: run-package-lock-validation
🔇 Additional comments (4)
src/server/system_services/system_store.js (4)

41-44: LGTM: Imports for RPC buffers and decode_json

The new imports fit the added CORE path. Ensure decode_json remains available even if DB_TYPE changes in future.


156-159: LGTM: Introduced SOURCE enum

Clear separation of sources improves readability and intent.


459-462: LGTM: DB path updates in-memory snapshots correctly

old_db_data and data are updated only for DB source or CORE fallback, preserving CORE-sourced data when appropriate.


904-904: LGTM: Exporting SOURCE

Exporting SOURCE is useful for external consumers/tests to assert behavior.

@pull-request-size pull-request-size bot added size/L and removed size/M labels Aug 19, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
src/server/system_services/system_store.js (2)

486-490: recent_db_data should not silently return undefined on CORE source

Guard the method to be DB-only (as the name implies), return a safe default, or throw. Throwing a clear RpcError is preferable; callers can fallback.

-    async recent_db_data() {
-        return this._load_serial.surround(async () => this.old_db_data);
-    }
+    async recent_db_data() {
+        return this._load_serial.surround(async () => {
+            if (this.source !== SOURCE.DB) {
+                throw new RpcError('BAD_REQUEST', 'recent_db_data is only available when source is DB');
+            }
+            return this.old_db_data || {};
+        });
+    }

556-576: Harden _read_new_data_from_core: validate response, handle parse errors, and relax validation severity

Add robust error handling and align validation severity with DB load (warn) to avoid spurious failures on benign mismatches.

-    async _read_new_data_from_core(target) {
-        dbg.log3("_read_new_data_from_core begins");
-        const res = await server_rpc.client.system.get_system_store();
-        const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
-        dbg.log3("_read_new_data_from_core new system store", ss);
-        for (const key of Object.keys(ss)) {
-            const collection = COLLECTIONS_BY_NAME[key];
-            if (collection) {
-                target[key] = [];
-                _.each(ss[key], item => {
-                    //these two lines will transform string values into appropriately typed objects
-                    //(SensitiveString, ObjectId) according to schema
-                    const after = decode_json(collection.schema, item);
-                    db_client.instance().validate(key, after);
-                    target[key].push(after);
-                });
-            } else {
-                target[key] = ss[key];
-            }
-        }
-    }
+    async _read_new_data_from_core(target) {
+        dbg.log3("_read_new_data_from_core begins");
+        try {
+            const res = await server_rpc.client.system.get_system_store();
+            if (!res || !res[RPC_BUFFERS] || !res[RPC_BUFFERS].data) {
+                throw new Error('Invalid response from core system store');
+            }
+            const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
+            dbg.log3("_read_new_data_from_core new system store", ss);
+            for (const key of Object.keys(ss)) {
+                const collection = COLLECTIONS_BY_NAME[key];
+                if (collection) {
+                    target[key] = [];
+                    _.each(ss[key], item => {
+                        // Transform string values into appropriately typed objects
+                        const after = decode_json(collection.schema, item);
+                        // Use warn severity to match DB ingestion behavior
+                        db_client.instance().validate(key, after, 'warn');
+                        target[key].push(after);
+                    });
+                } else {
+                    target[key] = ss[key];
+                }
+            }
+        } catch (err) {
+            dbg.error('_read_new_data_from_core: Failed to read/parse data from core', err);
+            throw new RpcError('INTERNAL_ERROR', `Failed to load system store from core: ${err.message}`);
+        }
+    }
🧹 Nitpick comments (2)
src/server/common_services/server_inter_process.js (1)

20-28: Avoid duplicating endpoint detection logic across files

Extract a single helper (e.g., util/process_type.js) to determine endpoint role or use a stable indicator (process.title or a config/env flag). This prevents drift between implementations here and in system_store.js.

src/server/system_services/system_store.js (1)

676-690: Make config guard safe and fix comment typos

Ensure safe toLowerCase usage and correct comment typos to avoid confusion.

-                //if endpoints are loading system store from core, we need to wait until
-                //above publish_to_cluster() will update core's in-memory db.
-                //the next publist_to_cluster() will make endpoints load the updated
-                //system store from core
-                if (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core') {
+                // If endpoints are loading system store from core, wait until
+                // the above publish_to_cluster() updates core's in-memory DB.
+                // The next publish_to_cluster() will make endpoints load the updated
+                // system store from core.
+                if (String(config.SYSTEM_STORE_SOURCE || 'db').toLowerCase() === 'core') {
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 63954aa and 9fe5000.

📒 Files selected for processing (3)
  • src/api/server_inter_process_api.js (1 hunks)
  • src/server/common_services/server_inter_process.js (1 hunks)
  • src/server/system_services/system_store.js (9 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/server/system_services/system_store.js (3)
src/server/system_services/system_server.js (7)
  • require (19-19)
  • require (20-20)
  • require (22-22)
  • require (45-45)
  • config (18-18)
  • dbg (16-16)
  • _ (6-6)
src/util/postgres_client.js (8)
  • require (13-13)
  • require (14-14)
  • require (26-26)
  • config (29-29)
  • dbg (17-17)
  • time_utils (28-28)
  • _ (5-5)
  • res (255-255)
src/server/common_services/server_inter_process.js (2)
  • is_endpoint (25-25)
  • dbg (12-12)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build Noobaa Image
  • GitHub Check: run-jest-unit-tests
  • GitHub Check: run-package-lock-validation
🔇 Additional comments (2)
src/server/system_services/system_store.js (2)

156-159: SOURCE enum addition looks good

Freezing the enum is a solid choice to prevent accidental mutations.


41-44: Imports LGTM

Bringing in decode_json and RPC_BUFFERS is appropriate for the CORE-source path.

Comment on lines 20 to 28
//if endpoints load from core, and this load is for core
//(ie, the first load_system_store() out of two),
//then endpoints skip it.
//endpoints will be updated in the next load_system_store()
//once core's in memory system store is updated.
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint");
if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Bug: is_endpoint detection is broken (indexOf truthiness) — will skip core load and stall propagation

Using indexOf returns -1 (truthy) when not found and 0 (falsy) when found at start. This flips the logic and can make the core skip loading on the 'core' step, preventing endpoints from ever updating.

Apply this fix:

-    const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint");
-    if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
-            return;
-    }
+    const is_endpoint = (process.env.HOSTNAME || '').includes('endpoint');
+    if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
+        dbg.log1('load_system_store: skipping on endpoint during core step');
+        return;
+    }

Run to locate similar fragile checks across the repo:


🏁 Script executed:

#!/bin/bash
# Find all usages of indexOf("endpoint") on HOSTNAME to fix truthiness bugs
rg -n -C2 "process\.env\.HOSTNAME.*indexOf\(\s*['\"]endpoint['\"]\s*\)"

Length of output: 1303


Fix endpoint detection logic across services

The indexOf check returns –1 (truthy) when “endpoint” is not found and 0 (falsy) when it’s at the start, flipping the intended logic. This can cause core loads to be skipped or misclassify system‐store sources. Replace with includes and add a debug log in server_inter_process.js.

Files to update:

  • src/server/common_services/server_inter_process.js (lines 23–27)
  • src/server/system_services/system_store.js (around line 361)

Proposed changes:

--- a/src/server/common_services/server_inter_process.js
+++ b/src/server/common_services/server_inter_process.js
@@ -23,7 +23,9 @@
     //once core's in memory system store is updated.
-    const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint");
+    const is_endpoint = (process.env.HOSTNAME || '').includes('endpoint');
+    if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
+        dbg.log1('load_system_store: skipping on endpoint during core step');
         return;
     }
--- a/src/server/system_services/system_store.js
+++ b/src/server/system_services/system_store.js
@@ -358,8 +358,8 @@ class SystemStore {
         this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
         //load from core if enabled and this is an endpoint
-        const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint");
-        this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;
+        const is_endpoint = (process.env.HOSTNAME || '').includes('endpoint');
+        this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;
         dbg.log0("system store source is", this.source);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
//if endpoints load from core, and this load is for core
//(ie, the first load_system_store() out of two),
//then endpoints skip it.
//endpoints will be updated in the next load_system_store()
//once core's in memory system store is updated.
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint");
if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
return;
}
//if endpoints load from core, and this load is for core
//(ie, the first load_system_store() out of two),
//then endpoints skip it.
//endpoints will be updated in the next load_system_store()
//once core's in memory system store is updated.
const is_endpoint = (process.env.HOSTNAME || '').includes('endpoint');
if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
dbg.log1('load_system_store: skipping on endpoint during core step');
return;
}
Suggested change
//if endpoints load from core, and this load is for core
//(ie, the first load_system_store() out of two),
//then endpoints skip it.
//endpoints will be updated in the next load_system_store()
//once core's in memory system store is updated.
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint");
if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
return;
}
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
//load from core if enabled and this is an endpoint
const is_endpoint = (process.env.HOSTNAME || '').includes('endpoint');
this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;
dbg.log0("system store source is", this.source);
🤖 Prompt for AI Agents
In src/server/common_services/server_inter_process.js around lines 20–28, the
current endpoint detection uses indexOf which returns -1 when not found (truthy)
and 0 when at start (falsy), flipping logic; replace the indexOf-based check
with a boolean includes check (e.g., process.env.HOSTNAME?.includes("endpoint"))
and ensure the condition reads correctly (if hostname includes "endpoint" AND
req?.rpc_params?.load_from_core_step === 'core' then return); also add a debug
log immediately before the return to record HOSTNAME and rpc_params for
troubleshooting. In src/server/system_services/system_store.js around line 361
apply the same hostname detection fix (use includes instead of indexOf and
correct the conditional) and add a similar debug log there so both services
consistently detect endpoints and log relevant context.

@alphaprinz alphaprinz force-pushed the system_store_phase2 branch from 9fe5000 to 4605bc0 Compare August 19, 2025 23:01
@alphaprinz alphaprinz requested a review from dannyzaken August 19, 2025 23:36
@alphaprinz alphaprinz force-pushed the system_store_phase2 branch 2 times, most recently from 4605bc0 to 5cc71b2 Compare August 25, 2025 20:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
src/server/system_services/system_store.js (4)

433-446: Fix misleading fetch-size logging and avoid redundant allocations in CORE path

When CORE path succeeds, new_data remains empty while this.data holds the payload, so the logged size/data are misleading. Also avoid double instantiation.

[suggested fix below]

-                const new_data = new SystemStoreData();
+                let new_data = new SystemStoreData();
@@
-                if (this.source === SOURCE.CORE) {
+                if (this.source === SOURCE.CORE) {
                     try {
-                        this.data = new SystemStoreData();
-                        await this._read_new_data_from_core(this.data);
+                        this.data = new SystemStoreData();
+                        await this._read_new_data_from_core(this.data);
+                        // Keep logging consistent with DB path
+                        new_data = this.data;
                     } catch (e) {
                         dbg.error("Failed to load system store from core. Will load from db.", e);
                         from_core_failure = true;
                     }
                 }
@@
-                if (this.source === SOURCE.DB || from_core_failure) {
+                if (this.source === SOURCE.DB || from_core_failure) {
                     this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
                     this.data = _.cloneDeep(this.old_db_data);
                 }

Also applies to: 461-464


360-364: Guard against undefined config and use safer endpoint detection

config.SYSTEM_STORE_SOURCE.toLowerCase() will throw if the config is unset. Also prefer .includes() for readability. Safe fallback to DB by default.

[suggested fix below]

-        //load from core if enabled and this is an endpoint
-        const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
-        this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;
+        // load from core if enabled and this is an endpoint
+        const is_endpoint = (process.env.HOSTNAME || '').includes('endpoint');
+        const source_cfg = String(config.SYSTEM_STORE_SOURCE || 'db').toLowerCase();
+        this.source = (source_cfg === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;

Optionally honor an explicit override if passed via constructor options (aligns with previous feedback to pass source from outside).

Run to find any other fragile usages of indexOf/endpoint or unguarded toLowerCase on SYSTEM_STORE_SOURCE:

#!/bin/bash
rg -n -C2 "HOSTNAME.*indexOf\(['\"]endpoint['\"]\)" src
rg -n -C2 "SYSTEM_STORE_SOURCE\.toLowerCase\(\)" src

671-692: Second-phase publish guard: avoid crash when SYSTEM_STORE_SOURCE is unset; typo in comment

  1. toLowerCase() on an undefined SYSTEM_STORE_SOURCE will throw.
  2. Tiny docstring nit: “publist_to_cluster” → “publish_to_cluster”.

[suggested fix below]

-                if (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core') {
+                if (String(config.SYSTEM_STORE_SOURCE || 'db').toLowerCase() === 'core') {
                     dbg.log2("second phase publish");
                     await server_rpc.client.redirector.publish_to_cluster({
                         method_api: 'server_inter_process_api',
                         method_name: 'load_system_store',
                         target: '',
                         request_params: { since: last_update, load_from_core_step: 'endpoint' }
                     });
                 }

And in the preceding comment block:

-                //the next publist_to_cluster() will make endpoints load the updated
+                //the next publish_to_cluster() will make endpoints load the updated

556-576: Harden RPC decode path: validate response shape, catch JSON errors, and wrap as RpcError

Add defensive checks and error classification so callers (and metrics/alerts) can distinguish transport/parse issues. Current code will throw but with less context.

[suggested fix below]

-    async _read_new_data_from_core(target) {
-        dbg.log3("_read_new_data_from_core begins");
-        const res = await server_rpc.client.system.get_system_store();
-        const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
-        dbg.log3("_read_new_data_from_core new system store", ss);
-        for (const key of Object.keys(ss)) {
-            const collection = COLLECTIONS_BY_NAME[key];
-            if (collection) {
-                target[key] = [];
-                _.each(ss[key], item => {
-                    //these two lines will transform string values into appropriately typed objects
-                    //(SensitiveString, ObjectId) according to schema
-                    const after = decode_json(collection.schema, item);
-                    db_client.instance().validate(key, after);
-                    target[key].push(after);
-                });
-            } else {
-                target[key] = ss[key];
-            }
-        }
-    }
+    async _read_new_data_from_core(target) {
+        dbg.log3("_read_new_data_from_core begins");
+        try {
+            const res = await server_rpc.client.system.get_system_store();
+            const payload = res && res[RPC_BUFFERS] && res[RPC_BUFFERS].data;
+            if (!payload || !Buffer.isBuffer(payload)) {
+                throw new Error('Invalid response shape (missing RPC buffer data)');
+            }
+            const ss = JSON.parse(payload.toString('utf8'));
+            dbg.log3("_read_new_data_from_core new system store", ss);
+            for (const key of Object.keys(ss)) {
+                const collection = COLLECTIONS_BY_NAME[key];
+                if (collection) {
+                    target[key] = [];
+                    _.each(ss[key], item => {
+                        // transform string values into appropriately typed objects according to schema
+                        const after = decode_json(collection.schema, item);
+                        db_client.instance().validate(key, after);
+                        target[key].push(after);
+                    });
+                } else {
+                    target[key] = ss[key];
+                }
+            }
+        } catch (err) {
+            dbg.error('_read_new_data_from_core: failed to load/parse system store from core', err);
+            throw new RpcError('INTERNAL_ERROR', `get_system_store failed: ${err.message}`);
+        }
+    }

Operational advice:

  • Add basic metrics around this call (counter, latency histogram, payload size) and a simple rate-limit/circuit-breaker at the RPC client layer to avoid cascading failure if many endpoints retry simultaneously.
🧹 Nitpick comments (3)
src/util/postgres_client.js (1)

1954-1954: Decoding helper export is fine, but consider moving it out of the Postgres layer

Exporting decode_json from postgres_client introduces a DB-specific dependency for modules that only need JSON-to-schema decoding. Consider relocating decode_json into a small shared codec module (e.g., util/json_codec.js) or re-exporting it via util/db_client.js to keep higher layers decoupled from a specific DB client.

src/server/system_services/system_store.js (2)

41-44: Avoid tight coupling on a specific DB client for schema decoding

Importing decode_json from ../../util/postgres_client couples SystemStore to the Postgres implementation. Since decoding is schema-driven and DB-agnostic, prefer exposing the decoder from a neutral module (e.g., util/json_codec) or a generic db_client façade to preserve modularity.


486-489: Make recent_db_data safe outside core/DB source

If called on an instance with SOURCE.CORE, old_db_data can be undefined. Either restrict usage or return a safe default. Since the RPC responder runs on core, throwing a clear error when not on DB source is safer.

[suggested fix below]

-    async recent_db_data() {
-        return this._load_serial.surround(async () => this.old_db_data);
-    }
+    async recent_db_data() {
+        return this._load_serial.surround(async () => {
+            if (this.source !== SOURCE.DB) {
+                throw new RpcError('BAD_REQUEST', 'recent_db_data is only available on DB source (core)');
+            }
+            return this.old_db_data || {};
+        });
+    }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between cfa28ed and 5cc71b2.

📒 Files selected for processing (7)
  • config.js (1 hunks)
  • src/api/server_inter_process_api.js (1 hunks)
  • src/api/system_api.js (1 hunks)
  • src/server/common_services/server_inter_process.js (1 hunks)
  • src/server/system_services/system_server.js (3 hunks)
  • src/server/system_services/system_store.js (9 hunks)
  • src/util/postgres_client.js (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/server/system_services/system_server.js
  • src/api/server_inter_process_api.js
  • config.js
  • src/server/common_services/server_inter_process.js
  • src/api/system_api.js
🧰 Additional context used
🧬 Code graph analysis (1)
src/server/system_services/system_store.js (3)
src/server/system_services/system_server.js (6)
  • require (19-19)
  • require (20-20)
  • require (22-22)
  • require (45-45)
  • _ (6-6)
  • server_rpc (26-26)
src/util/postgres_client.js (5)
  • require (13-13)
  • require (14-14)
  • require (26-26)
  • _ (5-5)
  • res (255-255)
src/server/common_services/server_inter_process.js (2)
  • is_endpoint (25-25)
  • server_rpc (14-14)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: run-jest-unit-tests
  • GitHub Check: Build Noobaa Image
  • GitHub Check: run-package-lock-validation
🔇 Additional comments (2)
src/server/system_services/system_store.js (2)

156-159: SOURCE enum addition: LGTM

Clear, frozen enum improves readability and avoids magic strings.


921-921: Exporting SOURCE: LGTM

Good to expose for external consumers/tests instead of re-declaring string literals.

@@ -352,6 +357,10 @@ class SystemStore extends EventEmitter {
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
//load from core if enabled and this is an endpoint
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? I think we should only look at config.SYSTEM_STORE_SOURCE, regardless of the hostname. we don't want to restrict it to endpoints, as we might have other scale-out services outside of core that we might want to load from core (e.g.: some bg_workers, etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core's publish behaviour is different in case load-from-core - there are two steps instead of one.
This means core needs to know endpoint's system store source.
So some parameter in core needs to indicate which publish to use.
Currently I'm reusing the same config.SYSTEM_STORE_SOURCE param.
We can switch core to use a different, new param.
I think that would be more confusing - client will need to enable two different params in two different kube objects (endpoint deployment and core stateful set).
We can discuss this further, or I can just add a new param to tell core to use two steps load.

//above publish_to_cluster() will update core's in-memory db.
//the next publist_to_cluster() will make endpoints load the updated
//system store from core
if (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second phase should not be conditional. e.g., we can have make_changes in the core, where config.SYSTEM_STORE_SOURCE == 'db'. we want to notify on this change to any process that should load from core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first phase is not conditional and always happen, whether we are in one or two steps publish.
The second phase is only for endpoints in case we use two steps publish.
Or have I not understood you?

Comment on lines 25 to 28
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should perform this check inside load, and returen if req?.rpc_params?.load_source !== this.source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the check, but I think your check will skip core's load?

@alphaprinz alphaprinz force-pushed the system_store_phase2 branch from 5cc71b2 to 3828879 Compare August 28, 2025 16:00
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
src/server/system_services/system_store.js (3)

452-469: Fix misleading fetch-size/data logs when source=CORE.
When CORE path succeeds, new_data remains empty, so logs misreport size and payload. Use the actual fetched reference.

                 const new_data = new SystemStoreData();
                 let millistamp = time_utils.millistamp();
                 await this._register_for_changes();
                 let from_core_failure = false;

                 if (this.source === SOURCE.CORE) {
                     try {
                         this.data = new SystemStoreData();
                         await this._read_new_data_from_core(this.data);
                     } catch (e) {
                         dbg.error("Failed to load system store from core. Will load from db.", e);
                         from_core_failure = true;
                     }
                 }

                 if (this.source === SOURCE.DB || from_core_failure) {
                     await this._read_new_data_from_db(new_data);
                 }

                 const secret = await os_utils.read_server_secret();
                 this._server_secret = secret;
                 if (dbg.should_log(1)) { //param should match below logs' level
                     dbg.log1('SystemStore: fetch took', time_utils.millitook(millistamp));
-                    dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
-                    dbg.log1('SystemStore: fetch data', util.inspect(new_data, {
+                    const fetched = (this.source === SOURCE.CORE && !from_core_failure) ? this.data : new_data;
+                    dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(fetched).length));
+                    dbg.log1('SystemStore: fetch data', util.inspect(fetched, {
                         depth: 4
                     }));
                 }
                 if (this.source === SOURCE.DB || from_core_failure) {
                     this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
                     this.data = _.cloneDeep(this.old_db_data);
                 }

Also applies to: 475-483


505-508: Guard recent_db_data for CORE source (and avoid undefined).
For CORE instances old_db_data may be undefined. Either throw or return a safe value.

-    async recent_db_data() {
-        return this._load_serial.surround(async () => this.old_db_data);
-    }
+    async recent_db_data() {
+        return this._load_serial.surround(async () => {
+            if (this.source !== SOURCE.DB) {
+                throw new RpcError('BAD_REQUEST', 'recent_db_data is only available when source is DB');
+            }
+            return this.old_db_data || {};
+        });
+    }

575-595: Harden _read_new_data_from_core with validation and error handling.
Wrap RPC/JSON in try/catch, validate before and after decode_json, and emit RpcError on failure.

 async _read_new_data_from_core(target) {
-        dbg.log3("_read_new_data_from_core begins");
-        const res = await server_rpc.client.system.get_system_store();
-        const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
-        dbg.log3("_read_new_data_from_core new system store", ss);
-        for (const key of Object.keys(ss)) {
-            const collection = COLLECTIONS_BY_NAME[key];
-            if (collection) {
-                target[key] = [];
-                _.each(ss[key], item => {
-                    //these two lines will transform string values into appropriately typed objects
-                    //(SensitiveString, ObjectId) according to schema
-                    const after = decode_json(collection.schema, item);
-                    db_client.instance().validate(key, after);
-                    target[key].push(after);
-                });
-            } else {
-                target[key] = ss[key];
-            }
-        }
+        dbg.log3("_read_new_data_from_core begins");
+        try {
+            const buf = (await server_rpc.client.system.get_system_store())?.[RPC_BUFFERS]?.data;
+            if (!buf || !Buffer.isBuffer(buf)) {
+                throw new Error('Invalid RPC payload');
+            }
+            const ss = JSON.parse(buf.toString());
+            dbg.log3("_read_new_data_from_core new system store", ss);
+            for (const key of Object.keys(ss)) {
+                const collection = COLLECTIONS_BY_NAME[key];
+                if (collection) {
+                    target[key] = [];
+                    _.each(ss[key], item => {
+                        // Pre-validate basic shape
+                        db_client.instance().validate(key, item, 'warn');
+                        // Transform stringified fields to typed objects
+                        const after = decode_json(collection.schema, item);
+                        // Validate transformed object strictly
+                        db_client.instance().validate(key, after);
+                        target[key].push(after);
+                    });
+                } else {
+                    target[key] = ss[key];
+                }
+            }
+        } catch (err) {
+            dbg.error('_read_new_data_from_core: failed to load/parse/validate', err);
+            throw new RpcError('INTERNAL_ERROR', `Failed to load system store from core: ${err.message}`);
+        }
 }
🧹 Nitpick comments (3)
src/server/system_services/system_store.js (2)

368-376: Define collections with guarded re-definition handling — OK.
Catching “table already defined” is pragmatic here. Consider switching to an error-code check if available in db_client to avoid string matching in the future.


429-437: Skip-first-phase guard is correct; minor readability nit.
Compare against the enum to avoid extra toUpperCase.

-        if (this.source.toUpperCase() === 'CORE' && load_from_core_step && load_from_core_step.toUpperCase() === 'CORE') {
+        if (this.source === SOURCE.CORE && String(load_from_core_step).toUpperCase() === 'CORE') {
src/test/integration_tests/db/test_system_store.js (1)

8-8: Prefer importing SOURCE and using the enum over string literals.

-const { SystemStore } = require('../../../server/system_services/system_store');
+const { SystemStore, SOURCE } = require('../../../server/system_services/system_store');
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3828879 and 9f1b2e9.

📒 Files selected for processing (2)
  • src/server/system_services/system_store.js (11 hunks)
  • src/test/integration_tests/db/test_system_store.js (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
src/test/**/*.*

⚙️ CodeRabbit configuration file

src/test/**/*.*: Ensure that the PR includes tests for the changes.

Files:

  • src/test/integration_tests/db/test_system_store.js
🧬 Code graph analysis (2)
src/test/integration_tests/db/test_system_store.js (1)
src/server/system_services/system_store.js (2)
  • require (41-41)
  • require (43-43)
src/server/system_services/system_store.js (2)
src/util/postgres_client.js (5)
  • require (13-13)
  • require (14-14)
  • require (26-26)
  • _ (5-5)
  • res (255-255)
src/server/system_services/system_server.js (5)
  • require (19-19)
  • require (20-20)
  • require (22-22)
  • require (45-45)
  • _ (6-6)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build Noobaa Image
  • GitHub Check: run-package-lock-validation
  • GitHub Check: run-jest-unit-tests
🔇 Additional comments (3)
src/server/system_services/system_store.js (3)

41-43: Imports look correct and consistent with dependents.
decode_json and RPC_BUFFERS are imported from the expected modules. No issues.


156-159: Good: explicit SOURCE enum.
Freezing the enum is appropriate and prevents accidental mutation.


940-940: Exporting SOURCE is useful for tests and callers.
No issues.

Comment on lines +345 to 349
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
this.source = options.source ||
((config.SYSTEM_STORE_SOURCE.toUpperCase() === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB);
SystemStore._instance = SystemStore._instance || new SystemStore({ standalone });
return SystemStore._instance;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: get_instance computes the source but never passes it to the instance (sets a static field instead).
As written, endpoints will construct the singleton without a source, falling back to DB in the constructor and defeating the PR’s goal.

Apply this fix to compute the source and pass it into the constructor; also prefer a safe HOSTNAME check:

-        //load from core if enabled and this is an endpoint
-        const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
-        this.source = options.source ||
-                      ((config.SYSTEM_STORE_SOURCE.toUpperCase() === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB);
-        SystemStore._instance = SystemStore._instance || new SystemStore({ standalone });
+        // Determine desired source and pass it to the instance
+        const hostname = process.env.HOSTNAME || '';
+        const is_endpoint = hostname.includes('endpoint');
+        const cfg = String(config.SYSTEM_STORE_SOURCE || 'DB').toUpperCase();
+        const source = options.source || ((cfg === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB);
+        SystemStore._instance = SystemStore._instance || new SystemStore({ ...options, standalone, source });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
this.source = options.source ||
((config.SYSTEM_STORE_SOURCE.toUpperCase() === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB);
SystemStore._instance = SystemStore._instance || new SystemStore({ standalone });
return SystemStore._instance;
// Determine desired source and pass it to the instance
const hostname = process.env.HOSTNAME || '';
const is_endpoint = hostname.includes('endpoint');
const cfg = String(config.SYSTEM_STORE_SOURCE || 'DB').toUpperCase();
const source = options.source || ((cfg === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB);
SystemStore._instance = SystemStore._instance || new SystemStore({ ...options, standalone, source });
return SystemStore._instance;
🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 345 to 349,
get_instance computes the intended source but never passes it into the
SystemStore constructor (leaving the singleton to default to DB); change the
HOSTNAME check to safely handle undefined (e.g., use (process.env.HOSTNAME ||
'').indexOf("endpoint") !== -1), compute the source into a variable, and pass
that source into the new SystemStore call (e.g., new SystemStore({ standalone,
source })) so the constructed singleton receives the correct source.

Comment on lines +364 to 366
this.source = options.source || SOURCE.DB;
dbg.log0("system store source is", this.source);
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm propagation of source to constructor.
After fixing get_instance to pass { source }, this default is fine. Please verify no other call sites construct SystemStore directly without passing a source (tests do, but that’s intentional).

Run to find direct constructions and ensure they’re deliberate:


🏁 Script executed:

#!/bin/bash
rg -nP "new\s+SystemStore\s*\(" -C2

Length of output: 991


Restore passing of source in get_instance constructor
In src/server/system_services/system_store.js line 348, update

new SystemStore({ standalone })

to include the original options (e.g. new SystemStore(options) or at least { standalone, source, is_endpoint }) so that options.source propagates correctly. Only other direct instantiation is in test_system_store.js (intentional).

🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 364 to 366, the
SystemStore is being instantiated with only { standalone } which drops the
original options.source (and other flags); change the constructor call to pass
the original options object (e.g. new SystemStore(options)) or at minimum
include the missing keys (e.g. { standalone, source: options.source,
is_endpoint: options.is_endpoint }) so options.source propagates to the
SystemStore instance.

Comment on lines 142 to 153
mocha.it("Load from core", async function() {

const system_store_from_core = new SystemStore({
source: 'CORE'
});

const from_db = await system_store.load();
const from_core = await system_store_from_core.load(undefined, 'ENDPOINT');

assert.deepStrictEqual(from_db, from_core);

});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make the “Load from core” assertion stable by sanitizing dynamic fields and ordering.
Deep-equality will be flaky due to time and potential array order differences between DB and CORE paths.

Apply this diff within the test and add lodash at the top (additional snippet below):

 mocha.it("Load from core", async function() {
 
     const system_store_from_core = new SystemStore({
-        source: 'CORE'
+        source: SOURCE.CORE
     });
 
     const from_db = await system_store.load();
     const from_core = await system_store_from_core.load(undefined, 'ENDPOINT');
-
-    assert.deepStrictEqual(from_db, from_core);
+    // Sanitize dynamic fields and sort arrays by _id for stable comparison
+    const sanitize = d => {
+        const c = _.cloneDeep(d);
+        delete c.time;
+        const maybe_sort = col => Array.isArray(c[col]) && (c[col] = _.sortBy(c[col], x => String(x._id)));
+        [
+            'clusters','namespace_resources','systems','roles','accounts',
+            'buckets','tieringpolicies','tiers','pools','chunk_configs','agent_configs'
+        ].forEach(maybe_sort);
+        return c;
+    };
+    assert.deepStrictEqual(sanitize(from_db), sanitize(from_core));
 
 });

Add lodash import near the top of the file:

// at top-level imports
const _ = require('lodash');

@alphaprinz alphaprinz force-pushed the system_store_phase2 branch from 9f1b2e9 to 203d2b5 Compare August 28, 2025 18:06
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/server/system_services/system_store.js (1)

452-483: Fix CORE-path logging/new_data mismatch.

When loading from CORE, new_data remains empty so fetch-size and later logs are misleading. Align by reusing it.

-                const new_data = new SystemStoreData();
+                let new_data = new SystemStoreData();
@@
-                if (this.source === SOURCE.CORE) {
+                if (this.source === SOURCE.CORE) {
                     try {
                         this.data = new SystemStoreData();
                         await this._read_new_data_from_core(this.data);
+                        new_data = this.data; // keep logging and downstream calc consistent
                     } catch (e) {
                         dbg.error("Failed to load system store from core. Will load from db.", e);
                         from_core_failure = true;
                     }
                 }
♻️ Duplicate comments (5)
src/server/system_services/system_store.js (5)

364-366: Constructor default will mask CORE until get_instance passes source.

Once the fix above lands, this is fine; otherwise it forces DB.

#!/bin/bash
# Ensure get_instance is the only entry point to construct SystemStore in prod code.
rg -nP "exports\.get_instance|SystemStore\.get_instance|new\s+SystemStore\(" src | sed -n '1,200p'

505-508: Guard recent_db_data for CORE instances.

For CORE source, old_db_data is undefined; callers will get undefined.

 async recent_db_data() {
-  return this._load_serial.surround(async () => this.old_db_data);
+  return this._load_serial.surround(async () => {
+    if (this.source !== SOURCE.DB) {
+      throw new RpcError('BAD_REQUEST', 'recent_db_data is only available when source is DB');
+    }
+    return this.old_db_data || {};
+  });
 }
#!/bin/bash
# Verify only CORE (system server) calls recent_db_data to serve RPC.
rg -n "recent_db_data\s*\(" -C3 src | sed -n '1,200p'

690-711: Second-phase publish shouldn’t depend on config flag.

If any processes load-from-core, endpoints need the ENDPOINT phase regardless of core’s local setting.

- if (config.SYSTEM_STORE_SOURCE.toUpperCase() === 'CORE') {
-   dbg.log2("second phase publish");
-   await server_rpc.client.redirector.publish_to_cluster({
-     method_api: 'server_inter_process_api',
-     method_name: 'load_system_store',
-     target: '',
-     request_params: { since: last_update, load_from_core_step: 'ENDPOINT' }
-   });
- }
+ dbg.log2("second phase publish");
+ await server_rpc.client.redirector.publish_to_cluster({
+   method_api: 'server_inter_process_api',
+   method_name: 'load_system_store',
+   target: '',
+   request_params: { since: last_update, load_from_core_step: 'ENDPOINT' }
+ });
#!/bin/bash
# Sanity-check: load() is idempotent for DB sources so double-publish won't break.
rg -n "load_from_core_step" -C3 src | sed -n '1,200p'

575-595: Harden RPC read path: validate, bound payload, and robust errors.

Handle invalid/missing buffers, large payloads, and JSON parse errors; validate before and after transform.

 async _read_new_data_from_core(target) {
-  dbg.log3("_read_new_data_from_core begins");
-  const res = await server_rpc.client.system.get_system_store();
-  const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
-  dbg.log3("_read_new_data_from_core new system store", ss);
-  for (const key of Object.keys(ss)) {
+  dbg.log3("_read_new_data_from_core begins");
+  try {
+    const res = await server_rpc.client.system.get_system_store();
+    const buf = res?.[RPC_BUFFERS]?.data;
+    if (!Buffer.isBuffer(buf)) throw new Error('invalid response (missing RPC buffer)');
+    const max = config.MAX_SYSTEM_STORE_RPC_BYTES || 25 * 1024 * 1024;
+    if (buf.byteLength > max) throw new Error(`payload too large: ${buf.byteLength} > ${max}`);
+    let ss;
+    try {
+      ss = JSON.parse(buf.toString('utf8'));
+    } catch {
+      throw new Error('invalid JSON from core');
+    }
+    dbg.log3("_read_new_data_from_core new system store", { keys: Object.keys(ss), approx_size: buf.byteLength });
+    for (const key of Object.keys(ss)) {
       const collection = COLLECTIONS_BY_NAME[key];
-      if (collection) {
+      if (collection) {
         target[key] = [];
         _.each(ss[key], item => {
-          //these two lines will transform string values into appropriately typed objects
-          //(SensitiveString, ObjectId) according to schema
-          const after = decode_json(collection.schema, item);
-          db_client.instance().validate(key, after);
+          db_client.instance().validate(key, item, 'warn'); // pre-transform
+          const after = decode_json(collection.schema, item);
+          db_client.instance().validate(key, after); // strict post-transform
           target[key].push(after);
         });
       } else {
         target[key] = ss[key];
       }
-  }
+    }
+  } catch (err) {
+    dbg.error('_read_new_data_from_core: Failed to read data from core', err);
+    throw new RpcError('INTERNAL_ERROR', `Failed to load system store from core: ${err.message}`);
+  }
 }
  • Add metrics around RPC: call count, latency, payload size; alert on spikes.
  • Add client-side timeout and retry with backoff for get_system_store.
  • Consider a circuit breaker to protect core on repeated failures.

345-349: Pass computed source into the SystemStore constructor in get_instance
get_instance currently calculates source but then does new SystemStore({ standalone }), so the instance’s this.source always falls back to SOURCE.DB. Change the instantiation to include the computed source:

 static get_instance(options = {}) {
   const { standalone } = options;
   // load from core if enabled and this is an endpoint
   const is_endpoint = (process.env.HOSTNAME || '').includes('endpoint');
   const cfg = String(config.SYSTEM_STORE_SOURCE || 'DB').toUpperCase();
   const source = options.source || ((cfg === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB);
-  SystemStore._instance = SystemStore._instance || new SystemStore({ standalone });
+  SystemStore._instance = SystemStore._instance || new SystemStore({ ...options, source });
   return SystemStore._instance;
 }
🧹 Nitpick comments (2)
src/server/system_services/system_store.js (2)

345-347: Optional: don’t key behavior off HOSTNAME.

Prefer deriving source solely from config/override so non-endpoint scale-out services can also load-from-core.

-  const hostname = process.env.HOSTNAME || '';
-  const is_endpoint = hostname.includes('endpoint');
-  const cfg = String(config.SYSTEM_STORE_SOURCE || 'DB').toUpperCase();
-  const source = options.source ||
-    ((cfg === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB);
+  const cfg = String(config.SYSTEM_STORE_SOURCE || 'DB').toUpperCase();
+  const source = options.source || (cfg === 'CORE' ? SOURCE.CORE : SOURCE.DB);

474-479: Avoid dumping full system store to logs; emit counts instead.

Full dumps risk PII/secret leakage and heavy logs.

- dbg.log1('SystemStore: fetch size', size_utils.human_size(JSON.stringify(new_data).length));
- dbg.log1('SystemStore: fetch data', util.inspect(new_data, { depth: 4 }));
+ const per_col_counts = _.fromPairs(COLLECTIONS.map(c => [c.name, (new_data[c.name] || []).length]));
+ dbg.log1('SystemStore: fetch size', size_utils.human_size(Buffer.byteLength(JSON.stringify(new_data))));
+ dbg.log1('SystemStore: fetch counts', per_col_counts);
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9f1b2e9 and ba00298.

📒 Files selected for processing (8)
  • config.js (1 hunks)
  • src/api/server_inter_process_api.js (1 hunks)
  • src/api/system_api.js (1 hunks)
  • src/server/common_services/server_inter_process.js (1 hunks)
  • src/server/system_services/system_server.js (3 hunks)
  • src/server/system_services/system_store.js (11 hunks)
  • src/test/integration_tests/db/test_system_store.js (2 hunks)
  • src/util/postgres_client.js (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/server/common_services/server_inter_process.js
  • src/test/integration_tests/db/test_system_store.js
  • config.js
  • src/api/server_inter_process_api.js
  • src/util/postgres_client.js
  • src/server/system_services/system_server.js
  • src/api/system_api.js
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build Noobaa Image
  • GitHub Check: run-package-lock-validation
  • GitHub Check: run-jest-unit-tests
🔇 Additional comments (5)
src/server/system_services/system_store.js (5)

41-44: LGTM on new imports.

decode_json, RpcError, and RPC_BUFFERS are correctly introduced and used below.


156-159: Good: explicit SOURCE enum.

Frozen enum with 'DB' and 'CORE' is clear and exported later.


368-376: Idempotent define_collection handling looks fine.

Catching “already defined” avoids throw during hot paths.


443-443: LGTM: expanded debug log context.


940-941: Exporting SOURCE is correct.

Comment on lines +429 to +437
async load(since, load_from_core_step) {
//if endpoints load from core, and this load is for core
//(ie, the first load_system_store() out of two with load_from_core_step === 'CORE'),
//then endpoints skip it.
//endpoints will be updated in the next load_system_store()
//once core's in memory system store is updated.
if (this.source.toUpperCase() === 'CORE' && load_from_core_step && load_from_core_step.toUpperCase() === 'CORE') {
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Keep load() return type consistent on endpoint skip.

Returning undefined on the CORE phase breaks callers expecting SystemStoreData. Return this.data.

- if (this.source.toUpperCase() === 'CORE' && load_from_core_step && load_from_core_step.toUpperCase() === 'CORE') {
-   return;
- }
+ if (this.source.toUpperCase() === 'CORE' &&
+     load_from_core_step &&
+     load_from_core_step.toUpperCase() === 'CORE') {
+   return this.data;
+ }

🏁 Script executed:

#!/bin/bash
# Check that server_inter_process_api.load_system_store ignores the return value.
rg -nP "load_system_store\s*\(" -C3 src | sed -n '1,200p'

Length of output: 2175


🏁 Script executed:

#!/bin/bash
sed -n '400,470p' src/server/system_services/system_store.js

Length of output: 3522


Return this.data in the CORE skip branch to maintain a consistent SystemStoreData return.

- if (this.source.toUpperCase() === 'CORE' &&
-     load_from_core_step &&
-     load_from_core_step.toUpperCase() === 'CORE') {
-   return;
- }
+ if (this.source.toUpperCase() === 'CORE' &&
+     load_from_core_step &&
+     load_from_core_step.toUpperCase() === 'CORE') {
+   return this.data;
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async load(since, load_from_core_step) {
//if endpoints load from core, and this load is for core
//(ie, the first load_system_store() out of two with load_from_core_step === 'CORE'),
//then endpoints skip it.
//endpoints will be updated in the next load_system_store()
//once core's in memory system store is updated.
if (this.source.toUpperCase() === 'CORE' && load_from_core_step && load_from_core_step.toUpperCase() === 'CORE') {
return;
}
async load(since, load_from_core_step) {
// if endpoints load from core, and this load is for core
// (ie, the first load_system_store() out of two with load_from_core_step === 'CORE'),
// then endpoints skip it.
// endpoints will be updated in the next load_system_store()
// once core's in memory system store is updated.
if (this.source.toUpperCase() === 'CORE' &&
load_from_core_step &&
load_from_core_step.toUpperCase() === 'CORE') {
return this.data;
}
// …rest of load implementation…
}
🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 429 to 437, the early
exit for CORE when load_from_core_step === 'CORE' currently does a bare return
which breaks the expected SystemStoreData return contract; replace the bare
return with "return this.data" (or Promise.resolve(this.data) if the function is
expected to always return a Promise) so the method consistently returns the
current SystemStoreData in that skip branch.

@alphaprinz alphaprinz force-pushed the system_store_phase2 branch 2 times, most recently from 79c2111 to 3484621 Compare August 29, 2025 17:07
@alphaprinz alphaprinz force-pushed the system_store_phase2 branch from 3484621 to 873d75c Compare August 29, 2025 17:42
@alphaprinz alphaprinz force-pushed the system_store_phase2 branch from 873d75c to cd4b2f6 Compare August 29, 2025 18:53
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/test/integration_tests/db/test_system_store.js (2)

8-8: Import SOURCE to avoid magic strings.

Use the exported SOURCE enum instead of the raw 'CORE' string.

-const { SystemStore } = require('../../../server/system_services/system_store');
+const { SystemStore, SOURCE } = require('../../../server/system_services/system_store');

142-153: Fix assertion (wrong .data usage) and make it stable; also use SOURCE.CORE.

load() returns the data object directly (see earlier tests in this file), so comparing .data will fail. Additionally, deep-equality will be flaky due to dynamic fields and array ordering. Switch to SOURCE.CORE and sanitize before comparing.

 mocha.it("Load from core", async function() {

-        const system_store_from_core = new SystemStore({
-            source: 'CORE'
-        });
+        const system_store_from_core = new SystemStore({
+            source: SOURCE.CORE
+        });

         const from_db = await system_store.load();
         const from_core = await system_store_from_core.load(undefined, 'ENDPOINT');

-        assert.deepStrictEqual(from_db.data, from_core.data);
+        // Sanitize dynamic fields and sort arrays by _id for stable comparison
+        const sanitize = d => {
+            const c = _.cloneDeep(d);
+            delete c.time;
+            const maybe_sort = col =>
+                Array.isArray(c[col]) && (c[col] = _.sortBy(c[col], x => String(x._id)));
+            [
+                'clusters','namespace_resources','systems','roles','accounts',
+                'buckets','tieringpolicies','tiers','pools','chunk_configs','agent_configs'
+            ].forEach(maybe_sort);
+            return c;
+        };
+        assert.deepStrictEqual(sanitize(from_db), sanitize(from_core));

Add this import at top-level (outside the changed hunk):

const _ = require('lodash');
🧹 Nitpick comments (1)
src/test/integration_tests/db/test_system_store.js (1)

142-153: Broaden coverage: add delta-load and error-path cases.

Consider adding:

  • since-based incremental load parity (DB vs CORE after inserts/updates).
  • RPC failure path (e.g., core unavailable → verify error is surfaced/mapped).
  • Authorization mismatch (endpoint not allowed) if applicable to the RPC.

I can draft these tests if helpful.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 873d75c and cd4b2f6.

📒 Files selected for processing (3)
  • config.js (1 hunks)
  • src/server/system_services/system_store.js (11 hunks)
  • src/test/integration_tests/db/test_system_store.js (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • config.js
  • src/server/system_services/system_store.js
🧰 Additional context used
📓 Path-based instructions (1)
src/test/**/*.*

⚙️ CodeRabbit configuration file

src/test/**/*.*: Ensure that the PR includes tests for the changes.

Files:

  • src/test/integration_tests/db/test_system_store.js
🧬 Code graph analysis (1)
src/test/integration_tests/db/test_system_store.js (1)
src/server/system_services/system_store.js (2)
  • require (41-41)
  • require (43-43)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: run-package-lock-validation
  • GitHub Check: Build Noobaa Image
  • GitHub Check: run-jest-unit-tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants