Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frontend/src/components/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const FEATURE_FLAGS = {
enableApiKeyConfigurationAgent: false,
enableDataplaneObservabilityServerless: false,
enableDataplaneObservability: false,
enableNewPipelineLogs: false,
};

// Cloud-managed tag keys for service account integration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const createMockMessage = (): ChatMessage => ({

const makeArtifactEvent = (
text: string,
opts: { append?: boolean; lastChunk?: boolean } = {},
opts: { append?: boolean; lastChunk?: boolean } = {}
): TaskArtifactUpdateEvent => ({
kind: 'artifact-update',
contextId: 'test-context',
Expand Down Expand Up @@ -84,7 +84,7 @@ describe('handleStatusUpdateEvent', () => {
const statusBlocks = state.contentBlocks.filter((b) => b.type === 'task-status-update');
expect(statusBlocks).toHaveLength(1);
expect(statusBlocks[0].type === 'task-status-update' && statusBlocks[0].text).toContain(
'Artifact created successfully',
'Artifact created successfully'
);
});
});
Expand Down Expand Up @@ -127,11 +127,9 @@ describe('handleArtifactUpdateEvent', () => {

// Text must accumulate
expect(block1?.type === 'artifact' && block1.parts[0]?.kind === 'text' && block1.parts[0].text).toBe('Hello');
expect(block2?.type === 'artifact' && block2.parts[0]?.kind === 'text' && block2.parts[0].text).toBe(
'Hello world',
);
expect(block2?.type === 'artifact' && block2.parts[0]?.kind === 'text' && block2.parts[0].text).toBe('Hello world');
expect(block3?.type === 'artifact' && block3.parts[0]?.kind === 'text' && block3.parts[0].text).toBe(
'Hello world!',
'Hello world!'
);
});

Expand All @@ -147,7 +145,7 @@ describe('handleArtifactUpdateEvent', () => {
makeArtifactEvent('', { append: true, lastChunk: true }),
state,
assistantMessage,
onMessageUpdate,
onMessageUpdate
);

// activeTextBlock should be cleared
Expand All @@ -156,8 +154,8 @@ describe('handleArtifactUpdateEvent', () => {
// Artifact should be persisted in contentBlocks with accumulated text
const artifacts = state.contentBlocks.filter((b) => b.type === 'artifact');
expect(artifacts).toHaveLength(1);
expect(artifacts[0].type === 'artifact' && artifacts[0].parts[0]?.kind === 'text' && artifacts[0].parts[0].text).toBe(
'Hello world',
);
expect(
artifacts[0].type === 'artifact' && artifacts[0].parts[0]?.kind === 'text' && artifacts[0].parts[0].text
).toBe('Hello world');
});
});
8 changes: 7 additions & 1 deletion frontend/src/components/pages/rp-connect/pipeline/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import { Spinner } from 'components/redpanda-ui/components/spinner';
import { Tabs, TabsContent, TabsContents, TabsList, TabsTrigger } from 'components/redpanda-ui/components/tabs';
import { Heading } from 'components/redpanda-ui/components/typography';
import { cn } from 'components/redpanda-ui/lib/utils';
import { LogExplorer } from 'components/ui/connect/log-explorer';
import { LintHintList } from 'components/ui/lint-hint/lint-hint-list';
import { YamlEditorCard } from 'components/ui/yaml/yaml-editor-card';
import { isFeatureFlagEnabled, isServerless } from 'config';
import { useDebounce } from 'hooks/use-debounce';
import { useDebouncedValue } from 'hooks/use-debounced-value';
import type { editor } from 'monaco-editor';
Expand Down Expand Up @@ -543,7 +545,11 @@ export default function PipelinePage() {
<TabsContents>
<TabsContent value="configuration">{content}</TabsContent>
<TabsContent value="logs">
<LogsTab pipeline={pipeline} />
{isFeatureFlagEnabled('enableNewPipelineLogs') ? (
<LogExplorer pipeline={pipeline} serverless={isServerless()} />
) : (
<LogsTab pipeline={pipeline} />
)}
</TabsContent>
</TabsContents>
</Tabs>
Expand Down
33 changes: 11 additions & 22 deletions frontend/src/components/pages/rp-connect/pipelines-details.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import { ConnectError } from '@connectrpc/connect';
import { Alert, AlertIcon, Box, Button, createStandaloneToast, DataTable, Flex, SearchField } from '@redpanda-data/ui';
import { Link } from '@tanstack/react-router';
import type { ColumnDef, SortingState } from '@tanstack/react-table';
import { Button as RegistryButton } from 'components/redpanda-ui/components/button';
import { isEmbedded, isFeatureFlagEnabled } from 'config';
import { RefreshCcw } from 'lucide-react';
import { useEffect, useRef, useState } from 'react';
import { toast as sonnerToast } from 'sonner';
import { formatToastErrorMessageGRPC } from 'utils/toast.utils';
Expand All @@ -40,6 +42,7 @@ import {
import type { TopicMessage } from '../../../state/rest-interfaces';
import { PartitionOffsetOrigin } from '../../../state/ui';
import { sanitizeString } from '../../../utils/filter-helper';
import { isFilterMatch } from '../../../utils/message-table-helpers';
import { DefaultSkeleton, QuickTable, TimestampDisplay } from '../../../utils/tsx-utils';
import { decodeURIComponentPercents, delay, encodeBase64 } from '../../../utils/utils';
import PageContent from '../../misc/page-content';
Expand Down Expand Up @@ -259,7 +262,7 @@ const PipelineEditor = (p: { pipeline: Pipeline }) => {
);
};

export const LogsTab = (p: { pipeline: Pipeline }) => {
export const LogsTab = ({ pipeline, variant = 'card' }: { pipeline: Pipeline; variant?: 'ghost' | 'card' }) => {
const topicName = '__redpanda.connect.logs';
const topic = api.topics?.first((x) => x.topicName === topicName);

Expand All @@ -276,7 +279,7 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
searchRef.current = search;
setMessages([]);
setIsComplete(false);
executeMessageSearch(search, topicName, p.pipeline.id).finally(() => {
executeMessageSearch(search, topicName, pipeline.id).finally(() => {
setIsComplete(true);
setMessages([...search.messages]);
});
Expand Down Expand Up @@ -361,13 +364,13 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
<>
<Box my="1rem">The logs below are for the last five hours.</Box>

<Section minWidth="800px">
<Flex mb="6">
<Section borderColor={variant === 'ghost' ? 'transparent' : undefined} minWidth="800px" overflowY="auto">
<div className="mb-6 flex items-center justify-between gap-2">
Comment on lines +392 to +393
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding variant so we can embed legacy version when feature flag for new log explorer is off

<SearchField searchText={logsQuickSearch} setSearchText={setLogsQuickSearch} width="230px" />
<Button ml="auto" onClick={() => setRefreshCount((c) => c + 1)} variant="outline">
Refresh logs
</Button>
</Flex>
<RegistryButton onClick={() => setRefreshCount((c) => c + 1)} size="icon" variant="ghost">
<RefreshCcw />
</RegistryButton>
</div>

<DataTable<TopicMessage>
columns={messageTableColumns}
Expand Down Expand Up @@ -397,20 +400,6 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
);
};

function isFilterMatch(str: string, m: TopicMessage) {
const lowerStr = str.toLowerCase();
if (m.offset.toString().toLowerCase().includes(lowerStr)) {
return true;
}
if (m.keyJson?.toLowerCase().includes(lowerStr)) {
return true;
}
if (m.valueJson?.toLowerCase().includes(lowerStr)) {
return true;
}
return false;
}
Comment on lines -418 to -430
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted to shared util


function executeMessageSearch(search: MessageSearch, topicName: string, pipelineId: string) {
const filterCode: string = `return key == "${pipelineId}";`;

Expand Down
47 changes: 1 addition & 46 deletions frontend/src/components/pages/topics/Tab.Messages/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import { appGlobal } from '../../../../state/app-global';
import { useTopicSettingsStore } from '../../../../stores/topic-settings-store';
import { IsDev } from '../../../../utils/env';
import { sanitizeString, wrapFilterFragment } from '../../../../utils/filter-helper';
import { trimSlidingWindow } from '../../../../utils/message-table-helpers';
import { sortingParser } from '../../../../utils/sorting-parser';
import { getTopicFilters, setTopicFilters } from '../../../../utils/topic-filters-session';
import {
Expand Down Expand Up @@ -314,52 +315,6 @@ async function loadLargeMessage({
}
}

/**
* Pure function for sliding-window trimming of messages.
* Keeps at most maxResults + pageSize messages in the window,
* trimming only pages before the user's current view.
*/
function trimSlidingWindow({
messages,
maxResults,
pageSize,
currentGlobalPage,
windowStartPage,
virtualStartIndex,
}: {
messages: TopicMessage[];
maxResults: number;
pageSize: number;
currentGlobalPage: number;
windowStartPage: number;
virtualStartIndex: number;
}): { messages: TopicMessage[]; windowStartPage: number; virtualStartIndex: number; trimCount: number } {
const maxWindowSize = maxResults + pageSize;

if (maxResults < pageSize || messages.length <= maxWindowSize) {
return { messages, windowStartPage, virtualStartIndex, trimCount: 0 };
}

const excess = messages.length - maxWindowSize;
const currentLocalPage = Math.max(0, currentGlobalPage - windowStartPage);

// Never trim the page the user is currently viewing or the one before it
const maxPagesToTrim = Math.max(0, currentLocalPage - 1);
const pagesToTrim = Math.min(Math.floor(excess / pageSize), maxPagesToTrim);
const trimCount = pagesToTrim * pageSize;

if (trimCount === 0) {
return { messages, windowStartPage, virtualStartIndex, trimCount: 0 };
}

return {
messages: messages.slice(trimCount),
windowStartPage: windowStartPage + pagesToTrim,
virtualStartIndex: virtualStartIndex + trimCount,
trimCount,
};
}

Comment on lines -335 to -375
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted to shared util

// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: this is because of the refactoring effort, the scope will be minimised eventually
export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
const toast = useToast();
Expand Down
147 changes: 147 additions & 0 deletions frontend/src/components/ui/connect/log-explorer.test.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { render, screen, waitFor } from 'test-utils';
import { userEvent } from '@testing-library/user-event';
import { TooltipProvider } from 'components/redpanda-ui/components/tooltip';
import type { Pipeline } from '../../../protogen/redpanda/api/dataplane/v1/pipeline_pb';
import type { TopicMessage } from '../../../state/rest-interfaces';

const mockRefresh = vi.fn();
let mockReturn: {
messages: TopicMessage[];
phase: string | null;
error: string | null;
refresh: () => void;
};

vi.mock('../../../react-query/api/logs', () => ({
useLogSearch: () => mockReturn,
}));

import { LogExplorer } from './log-explorer';

function makeMessage(overrides: Partial<TopicMessage> & { valuePayload?: Record<string, unknown> }): TopicMessage {
const { valuePayload, ...rest } = overrides;
const payload = valuePayload ?? { message: 'test log', level: 'INFO', path: 'root.input' };
return {
partitionID: 0,
offset: 0,
timestamp: Date.now(),
compression: 'uncompressed',
isTransactional: false,
headers: [],
key: { payload: '', isPayloadNull: false, encoding: 'text', schemaId: 0, size: 0 },
value: { payload, isPayloadNull: false, encoding: 'json', schemaId: 0, size: 100 },
valueJson: JSON.stringify(payload),
valueBinHexPreview: '',
keyJson: 'pipeline-1',
keyBinHexPreview: '',
...rest,
} as TopicMessage;
}

const pipeline = { id: 'pipeline-1', displayName: 'Test Pipeline' } as unknown as Pipeline;

function renderExplorer() {
return render(
<TooltipProvider>
<LogExplorer pipeline={pipeline} />
</TooltipProvider>,
);
}

describe('LogExplorer', () => {
beforeEach(() => {
vi.clearAllMocks();
mockReturn = {
messages: [],
phase: null,
error: null,
refresh: mockRefresh,
};
});

test('shows spinner while searching with no messages', () => {
mockReturn.phase = 'Searching...';
renderExplorer();
expect(screen.getByTestId('log-loading-spinner')).toBeInTheDocument();
});

test('shows empty state when no messages and search complete', () => {
renderExplorer();
expect(screen.getByText('No messages')).toBeInTheDocument();
});

test('renders table with log entries', () => {
mockReturn.messages = [
makeMessage({ offset: 1, valuePayload: { message: 'First log', level: 'INFO', path: 'root.input' } }),
makeMessage({ offset: 2, valuePayload: { message: 'Second log', level: 'ERROR', path: 'root.output' } }),
];
renderExplorer();
expect(screen.getByText(/First log/)).toBeInTheDocument();
expect(screen.getByText(/Second log/)).toBeInTheDocument();
});

test('displays timestamps for each entry', () => {
const ts = new Date('2025-01-15T12:00:00Z').getTime();
mockReturn.messages = [makeMessage({ offset: 1, timestamp: ts })];
renderExplorer();
const rows = screen.getAllByRole('row');
// header row + 1 data row
expect(rows.length).toBeGreaterThanOrEqual(2);
});

test('displays log level badges', () => {
mockReturn.messages = [
makeMessage({ offset: 1, valuePayload: { message: 'err', level: 'ERROR', path: 'x' } }),
makeMessage({ offset: 2, valuePayload: { message: 'warn', level: 'WARN', path: 'x' } }),
];
renderExplorer();
expect(screen.getByText('ERROR')).toBeInTheDocument();
expect(screen.getByText('WARN')).toBeInTheDocument();
});

test('shows error alert when error is set', () => {
mockReturn.error = 'Connection refused';
renderExplorer();
expect(screen.getByText('Failed to load logs')).toBeInTheDocument();
expect(screen.getByText('Connection refused')).toBeInTheDocument();
});

test('refresh button calls refresh', async () => {
const user = userEvent.setup();
renderExplorer();
const refreshBtn = screen.getByTestId('log-refresh-button');
await user.click(refreshBtn);
expect(mockRefresh).toHaveBeenCalledTimes(1);
});

test('clicking a row opens detail sheet', async () => {
const user = userEvent.setup();
mockReturn.messages = [
makeMessage({ offset: 42, valuePayload: { message: 'Detail test', level: 'INFO', path: 'root.input' } }),
];
renderExplorer();
const dataRow = screen.getAllByRole('row')[1];
await user.click(dataRow);
await waitFor(() => {
expect(screen.getByText('Partition')).toBeInTheDocument();
});
});

test('paginates with default 10 per page', () => {
mockReturn.messages = Array.from({ length: 15 }, (_, i) =>
makeMessage({ offset: i, valuePayload: { message: `Log ${i}`, level: 'INFO', path: 'x' } }),
);
renderExplorer();
// Should show 10 data rows on first page (+ 1 header row)
const rows = screen.getAllByRole('row');
expect(rows.length).toBe(11); // 1 header + 10 data
});

test('table has expected column headers', () => {
renderExplorer();
expect(screen.getByText('Timestamp')).toBeInTheDocument();
expect(screen.getByText('Level')).toBeInTheDocument();
expect(screen.getByText('Component')).toBeInTheDocument();
expect(screen.getByText('Message')).toBeInTheDocument();
});
});
Loading
Loading