Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/lfx-pcc/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ SUPABASE_URL=https://your-project.supabase.co
POSTGRES_API_KEY=your-supabase-anon-key
SUPABASE_STORAGE_BUCKET=your-supabase-bucket-name

# NATS Configuration
# Internal k8s service DNS for NATS cluster
NATS_URL=nats://lfx-platform-nats.lfx.svc.cluster.local:4222

# E2E Test Configuration (Optional)
# Test user credentials for automated testing
TEST_USERNAME=your-test-username
Expand Down
1 change: 1 addition & 0 deletions apps/lfx-pcc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"dotenv": "^17.2.1",
"express": "^4.18.2",
"express-openid-connect": "^2.19.2",
"nats": "^2.29.3",
"ngx-cookie-service-ssr": "^19.1.2",
"pino-http": "^10.5.0",
"primeng": "^19.1.4",
Expand Down
22 changes: 22 additions & 0 deletions apps/lfx-pcc/src/server/config/nats.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

/**
* NATS configuration constants
*/
export const NATS_CONFIG = {
/**
* Default NATS server URL for Kubernetes cluster
*/
DEFAULT_SERVER_URL: 'nats://lfx-platform-nats.lfx.svc.cluster.local:4222',

/**
* Connection timeout in milliseconds
*/
CONNECTION_TIMEOUT: 5000,

/**
* Request timeout in milliseconds
*/
REQUEST_TIMEOUT: 5000,
} as const;
65 changes: 12 additions & 53 deletions apps/lfx-pcc/src/server/routes/projects.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

import { Project, QueryServiceResponse } from '@lfx-pcc/shared';
import { NextFunction, Request, Response, Router } from 'express';

import { ApiClientService } from '../services/api-client.service';
import { MicroserviceProxyService } from '../services/microservice-proxy.service';
import { NatsService } from '../services/nats.service';
import { ProjectService } from '../services/project.service';
import { SupabaseService } from '../services/supabase.service';

const router = Router();

const supabaseService = new SupabaseService();
const microserviceProxyService = new MicroserviceProxyService(new ApiClientService());
const natsService = new NatsService();
const projectService = new ProjectService(microserviceProxyService, natsService);

router.get('/', async (req: Request, res: Response, next: NextFunction) => {
const startTime = Date.now();
Expand All @@ -25,13 +28,7 @@ router.get('/', async (req: Request, res: Response, next: NextFunction) => {
);

try {
const query = {
...req.query,
type: 'project',
};
const { resources } = await microserviceProxyService.proxyRequest<QueryServiceResponse<Project>>(req, 'LFX_V2_SERVICE', '/query/resources', 'GET', query);

const projects = resources.map((resource) => resource.data);
const projects = await projectService.getProjects(req, req.query as Record<string, any>);

const duration = Date.now() - startTime;

Expand Down Expand Up @@ -91,12 +88,7 @@ router.get('/search', async (req: Request, res: Response, next: NextFunction) =>
});
}

const { resources } = await microserviceProxyService.proxyRequest<QueryServiceResponse<Project>>(req, 'LFX_V2_SERVICE', '/query/resources', 'GET', {
type: 'project',
name: q,
});

const results = resources.map((resource) => resource.data);
const results = await projectService.searchProjects(req, q);

const duration = Date.now() - startTime;

Expand Down Expand Up @@ -132,7 +124,7 @@ router.get('/:slug', async (req: Request, res: Response, next: NextFunction) =>
req.log.info(
{
operation: 'fetch_project_by_slug',
has_project_slug: !!projectSlug,
slug: projectSlug,
},
'Starting project fetch by slug request'
);
Expand All @@ -154,30 +146,14 @@ router.get('/:slug', async (req: Request, res: Response, next: NextFunction) =>
});
}

const project = await supabaseService.getProjectBySlug(projectSlug);

if (!project) {
const duration = Date.now() - startTime;
req.log.warn(
{
operation: 'fetch_project_by_slug',
error: 'Project not found',
duration,
status_code: 404,
},
'Project not found'
);

return res.status(404).json({
error: 'Project not found',
code: 'PROJECT_NOT_FOUND',
});
}
// Use the project service to handle slug resolution and project fetching
const project = await projectService.getProjectBySlug(req, projectSlug);

const duration = Date.now() - startTime;
req.log.info(
{
operation: 'fetch_project_by_slug',
slug: projectSlug,
project_uid: project.uid,
duration,
status_code: 200,
Expand All @@ -192,6 +168,7 @@ router.get('/:slug', async (req: Request, res: Response, next: NextFunction) =>
{
error: error instanceof Error ? error.message : error,
operation: 'fetch_project_by_slug',
slug: projectSlug,
duration,
},
'Failed to fetch project'
Expand Down Expand Up @@ -231,25 +208,7 @@ router.get('/:slug/recent-activity', async (req: Request, res: Response, next: N
}

// Get project to verify it exists and get the project ID
const project = await supabaseService.getProjectBySlug(projectSlug);

if (!project) {
const duration = Date.now() - startTime;
req.log.warn(
{
operation: 'fetch_project_recent_activity',
error: 'Project not found for recent activity fetch',
duration,
status_code: 404,
},
'Project not found for recent activity'
);

return res.status(404).json({
error: 'Project not found',
code: 'PROJECT_NOT_FOUND',
});
}
const project = await projectService.getProjectBySlug(req, projectSlug);

const recentActivity = await supabaseService.getRecentActivityByProject(project.uid, req.query as Record<string, any>);
const duration = Date.now() - startTime;
Expand Down
141 changes: 141 additions & 0 deletions apps/lfx-pcc/src/server/services/nats.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT

import { NatsSubjects, ProjectSlugToIdResponse } from '@lfx-pcc/shared/interfaces';
import { connect, NatsConnection, StringCodec } from 'nats';

import { NATS_CONFIG } from '../config/nats.config';
import { serverLogger } from '../server';

export class NatsService {
private connection: NatsConnection | null = null;
private connectionPromise: Promise<NatsConnection> | null = null;
private codec = StringCodec();

/**
* Get project ID by slug using NATS request-reply pattern
*/
public async getProjectIdBySlug(slug: string): Promise<ProjectSlugToIdResponse> {
const connection = await this.ensureConnection();

try {
const response = await connection.request(NatsSubjects.PROJECT_SLUG_TO_UID, this.codec.encode(slug), { timeout: NATS_CONFIG.REQUEST_TIMEOUT });

const projectId = this.codec.decode(response.data);

// Check if we got a valid project ID
if (!projectId || projectId.trim() === '') {
serverLogger.info({ slug }, 'Project slug not found via NATS');
return {
projectId: '',
slug,
exists: false,
};
}

serverLogger.info({ slug, project_id: projectId }, 'Successfully resolved project slug to ID');

return {
projectId: projectId.trim(),
slug,
exists: true,
};
} catch (error) {
serverLogger.error({ error: error instanceof Error ? error.message : error, slug }, 'Failed to resolve project slug via NATS');

// If it's a timeout or no responder error, treat as not found
if (error instanceof Error && (error.message.includes('timeout') || error.message.includes('503'))) {
return {
projectId: '',
slug,
exists: false,
};
}

throw error;
}
}

/**
* Check if NATS connection is active
*/
public isConnected(): boolean {
return this.connection !== null && !this.connection.isClosed();
}

/**
* Gracefully shutdown NATS connection
*/
public async shutdown(): Promise<void> {
if (this.connection && !this.connection.isClosed()) {
serverLogger.info('Shutting down NATS connection');

try {
await this.connection.drain();
serverLogger.info('NATS connection closed successfully');
} catch (error) {
serverLogger.error({ error: error instanceof Error ? error.message : error }, 'Error during NATS shutdown');
}
}
this.connection = null;
}

/**
* Ensure NATS connection with thread safety (lazy initialization)
*/
private async ensureConnection(): Promise<NatsConnection> {
// Return existing connection if valid
if (this.connection && !this.connection.isClosed()) {
return this.connection;
}

// If already connecting, wait for that connection
if (this.connectionPromise) {
return this.connectionPromise;
}

// Create new connection
this.connectionPromise = this.createConnection();

try {
this.connection = await this.connectionPromise;
return this.connection;
} catch (error) {
// Reset connection promise on failure
this.connectionPromise = null;
throw error;
} finally {
// Reset connection promise after completion
this.connectionPromise = null;
}
}

/**
* Create a new NATS connection
*/
private async createConnection(): Promise<NatsConnection> {
const natsUrl = process.env['NATS_URL'] || NATS_CONFIG.DEFAULT_SERVER_URL;

try {
serverLogger.info({ url: natsUrl }, 'Connecting to NATS server on demand');

const connection = await connect({
servers: [natsUrl],
timeout: NATS_CONFIG.CONNECTION_TIMEOUT,
});

serverLogger.info('Successfully connected to NATS server');
return connection;
} catch (error) {
serverLogger.error(
{
error: error instanceof Error ? error.message : error,
url: natsUrl,
suggestion: 'If running locally, you may need to port-forward NATS: kubectl port-forward -n lfx svc/lfx-platform-nats 4222:4222',
},
'Failed to connect to NATS server'
);
throw error;
}
}
}
Loading
Loading