Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frontend/src/components/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const FEATURE_FLAGS = {
enableApiKeyConfigurationAgent: false,
enableDataplaneObservabilityServerless: false,
enableDataplaneObservability: false,
enableNewPipelineLogs: false,
};

// Cloud-managed tag keys for service account integration
Expand Down
12 changes: 10 additions & 2 deletions frontend/src/components/pages/rp-connect/pipeline/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import { Spinner } from 'components/redpanda-ui/components/spinner';
import { Tabs, TabsContent, TabsContents, TabsList, TabsTrigger } from 'components/redpanda-ui/components/tabs';
import { Heading } from 'components/redpanda-ui/components/typography';
import { cn } from 'components/redpanda-ui/lib/utils';
import { LogExplorer } from 'components/ui/connect/log-explorer';
import { LintHintList } from 'components/ui/lint-hint/lint-hint-list';
import { YamlEditorCard } from 'components/ui/yaml/yaml-editor-card';
import { isFeatureFlagEnabled, isServerless } from 'config';
import { useDebounce } from 'hooks/use-debounce';
import { useDebouncedValue } from 'hooks/use-debounced-value';
import type { editor } from 'monaco-editor';
Expand Down Expand Up @@ -256,7 +258,9 @@ export default function PipelinePage() {

// Derive lint hints from response (replaces useEffect + setState)
const responseLintHints = useMemo(() => {
if (!lintResponse) return {};
if (!lintResponse) {
return {};
}
const hints: Record<string, LintHint> = {};
for (const [idx, hint] of Object.entries(lintResponse.lintHints || [])) {
hints[`hint_${idx}`] = hint;
Expand Down Expand Up @@ -554,7 +558,11 @@ export default function PipelinePage() {
<TabsContents>
<TabsContent value="configuration">{content}</TabsContent>
<TabsContent value="logs">
<LogsTab pipeline={pipeline} />
{isFeatureFlagEnabled('enableNewPipelineLogs') ? (
<LogExplorer pipeline={pipeline} serverless={isServerless()} />
) : (
<LogsTab pipeline={pipeline} />
)}
</TabsContent>
</TabsContents>
</Tabs>
Expand Down
65 changes: 25 additions & 40 deletions frontend/src/components/pages/rp-connect/pipelines-details.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import { ConnectError } from '@connectrpc/connect';
import { Alert, AlertIcon, Box, Button, createStandaloneToast, DataTable, Flex, SearchField } from '@redpanda-data/ui';
import { Link } from '@tanstack/react-router';
import type { ColumnDef, SortingState } from '@tanstack/react-table';
import { Button as RegistryButton } from 'components/redpanda-ui/components/button';
import { isEmbedded, isFeatureFlagEnabled } from 'config';
import { RefreshCcw } from 'lucide-react';
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { toast as sonnerToast } from 'sonner';
import { formatToastErrorMessageGRPC } from 'utils/toast.utils';
Expand All @@ -40,6 +42,7 @@ import {
import type { TopicMessage } from '../../../state/rest-interfaces';
import { PartitionOffsetOrigin } from '../../../state/ui';
import { sanitizeString } from '../../../utils/filter-helper';
import { isFilterMatch } from '../../../utils/message-table-helpers';
import { DefaultSkeleton, QuickTable, TimestampDisplay } from '../../../utils/tsx-utils';
import { decodeURIComponentPercents, delay, encodeBase64 } from '../../../utils/utils';
import PageContent from '../../misc/page-content';
Expand Down Expand Up @@ -259,7 +262,7 @@ const PipelineEditor = (p: { pipeline: Pipeline }) => {
);
};

export const LogsTab = (p: { pipeline: Pipeline }) => {
export const LogsTab = ({ pipeline, variant = 'card' }: { pipeline: Pipeline; variant?: 'ghost' | 'card' }) => {
const topicName = '__redpanda.connect.logs';
const topic = api.topics?.first((x) => x.topicName === topicName);

Expand All @@ -273,18 +276,19 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
const searchRef = useRef<MessageSearch | null>(null);
const [refreshCount, setRefreshCount] = useState(0);

// biome-ignore lint/correctness/useExhaustiveDependencies: intentional to force message search to re-run when pipeline.id and refreshCount changes
useEffect(() => {
searchRef.current?.stopSearch();
const search = createMessageSearch();
searchRef.current = search;
queueMicrotask(() => setLogState({ messages: [], isComplete: false }));
executeMessageSearch(search, topicName, p.pipeline.id).finally(() => {
executeMessageSearch(search, topicName, pipeline.id).finally(() => {
setLogState({ messages: [...search.messages], isComplete: true });
});
return () => {
search.stopSearch();
};
}, [refreshCount]);
}, [refreshCount, pipeline.id]);

useEffect(() => {
const interval = setInterval(() => {
Expand Down Expand Up @@ -320,7 +324,9 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
if (loadedMessages && loadedMessages.length === 1) {
setLogState((prev) => {
const idx = prev.messages.findIndex((x) => x.partitionID === partitionID && x.offset === offset);
if (idx === -1) return prev;
if (idx === -1) {
return prev;
}
const updated = [...prev.messages];
updated[idx] = loadedMessages[0];
return { ...prev, messages: updated };
Expand Down Expand Up @@ -348,11 +354,7 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
header: 'Value',
accessorKey: 'value',
cell: ({ row: { original } }) => (
<MessagePreview
isCompactTopic={isCompactTopic}
msg={original}
previewFields={() => []}
/>
<MessagePreview isCompactTopic={isCompactTopic} msg={original} previewFields={() => []} />
),
size: Number.MAX_SAFE_INTEGER,
},
Expand All @@ -364,11 +366,7 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
({ row: { original } }: { row: { original: TopicMessage } }) => (
<ExpandedMessage
loadLargeMessage={() =>
loadLargeMessage(
searchRef.current?.searchRequest?.topicName ?? '',
original.partitionID,
original.offset
)
loadLargeMessage(searchRef.current?.searchRequest?.topicName ?? '', original.partitionID, original.offset)
}
msg={original}
/>
Expand All @@ -377,26 +375,27 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
);

const filteredMessages = useMemo(
() => messages.filter((x) => {
if (!logsQuickSearch) {
return true;
}
return isFilterMatch(logsQuickSearch, x);
}),
() =>
messages.filter((x) => {
if (!logsQuickSearch) {
return true;
}
return isFilterMatch(logsQuickSearch, x);
}),
[messages, logsQuickSearch]
);

return (
<>
<Box my="1rem">The logs below are for the last five hours.</Box>

<Section minWidth="800px">
<Flex mb="6">
<Section borderColor={variant === 'ghost' ? 'transparent' : undefined} minWidth="800px" overflowY="auto">
<div className="mb-6 flex items-center justify-between gap-2">
Comment on lines +392 to +393
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding variant so we can embed legacy version when feature flag for new log explorer is off

<SearchField searchText={logsQuickSearch} setSearchText={setLogsQuickSearch} width="230px" />
<Button ml="auto" onClick={() => setRefreshCount((c) => c + 1)} variant="outline">
Refresh logs
</Button>
</Flex>
<RegistryButton onClick={() => setRefreshCount((c) => c + 1)} size="icon" variant="ghost">
<RefreshCcw />
</RegistryButton>
</div>

<DataTable<TopicMessage>
columns={messageTableColumns}
Expand All @@ -415,20 +414,6 @@ export const LogsTab = (p: { pipeline: Pipeline }) => {
);
};

function isFilterMatch(str: string, m: TopicMessage) {
const lowerStr = str.toLowerCase();
if (m.offset.toString().toLowerCase().includes(lowerStr)) {
return true;
}
if (m.keyJson?.toLowerCase().includes(lowerStr)) {
return true;
}
if (m.valueJson?.toLowerCase().includes(lowerStr)) {
return true;
}
return false;
}
Comment on lines -418 to -430
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted to shared util


function executeMessageSearch(search: MessageSearch, topicName: string, pipelineId: string) {
const filterCode: string = `return key == "${pipelineId}";`;

Expand Down
47 changes: 1 addition & 46 deletions frontend/src/components/pages/topics/Tab.Messages/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ import { appGlobal } from '../../../../state/app-global';
import { useTopicSettingsStore } from '../../../../stores/topic-settings-store';
import { IsDev } from '../../../../utils/env';
import { sanitizeString, wrapFilterFragment } from '../../../../utils/filter-helper';
import { trimSlidingWindow } from '../../../../utils/message-table-helpers';
import { sortingParser } from '../../../../utils/sorting-parser';
import { getTopicFilters, setTopicFilters } from '../../../../utils/topic-filters-session';
import {
Expand Down Expand Up @@ -327,52 +328,6 @@ async function loadLargeMessage({
}
}

/**
* Pure function for sliding-window trimming of messages.
* Keeps at most maxResults + pageSize messages in the window,
* trimming only pages before the user's current view.
*/
function trimSlidingWindow({
messages,
maxResults,
pageSize,
currentGlobalPage,
windowStartPage,
virtualStartIndex,
}: {
messages: TopicMessage[];
maxResults: number;
pageSize: number;
currentGlobalPage: number;
windowStartPage: number;
virtualStartIndex: number;
}): { messages: TopicMessage[]; windowStartPage: number; virtualStartIndex: number; trimCount: number } {
const maxWindowSize = maxResults + pageSize;

if (maxResults < pageSize || messages.length <= maxWindowSize) {
return { messages, windowStartPage, virtualStartIndex, trimCount: 0 };
}

const excess = messages.length - maxWindowSize;
const currentLocalPage = Math.max(0, currentGlobalPage - windowStartPage);

// Never trim the page the user is currently viewing or the one before it
const maxPagesToTrim = Math.max(0, currentLocalPage - 1);
const pagesToTrim = Math.min(Math.floor(excess / pageSize), maxPagesToTrim);
const trimCount = pagesToTrim * pageSize;

if (trimCount === 0) {
return { messages, windowStartPage, virtualStartIndex, trimCount: 0 };
}

return {
messages: messages.slice(trimCount),
windowStartPage: windowStartPage + pagesToTrim,
virtualStartIndex: virtualStartIndex + trimCount,
trimCount,
};
}

Comment on lines -335 to -375
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted to shared util

// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: this is because of the refactoring effort, the scope will be minimised eventually
export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
'use no memo';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,7 @@ describe('topic message rendering', () => {
const msg = buildMessage();

render(
<ExpandedMessage
loadLargeMessage={() => Promise.resolve()}
msg={msg}
onDownloadRecord={onDownloadRecord}
/>
<ExpandedMessage loadLargeMessage={() => Promise.resolve()} msg={msg} onDownloadRecord={onDownloadRecord} />
);

const downloadButton = screen.getByRole('button', { name: /download record/i });
Expand Down
Loading
Loading