Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 141 additions & 17 deletions src/loaders/utils/flinkAiModelsQuery.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,165 @@
/** CCLoudResourceLoader Flink statement utils for AI Models */
import { Logger } from "../../logging";
import { FlinkAIModel } from "../../models/flinkAiModel";
import type { CCloudFlinkDbKafkaCluster } from "../../models/kafkaCluster";

const logger = new Logger("flinkAiModelsQuery");

/**
* Generate the query to list all available Flink AI models for a given Flink catalog+database.
* @see https://docs.confluent.io/cloud/current/flink/reference/statements/show.html#flink-sql-show-models
* Uses INFORMATION_SCHEMA to get detailed model information including default version, version count, and comment.
* @see https://docs.confluent.io/cloud/current/flink/reference/flink-sql-information-schema.html#models
*/
export function getFlinkAIModelsQuery(database: CCloudFlinkDbKafkaCluster): string {
return `SHOW MODELS FROM \`${database.environmentId}\`.\`${database.id}\``;
return `
-- Model toplevel definitions
select
'model' as \`rowType\`,
\`MODEL_NAME\` as \`modelName\`,
\`DEFAULT_VERSION\` as \`defaultVersion\`,
\`VERSION_COUNT\` as \`versionCount\`,
\`COMMENT\` as \`comment\`,
CAST(NULL AS STRING) as \`optionKey\`,
CAST(NULL AS STRING) as \`optionValue\`,
CAST(NULL AS STRING) as \`version\`
from \`INFORMATION_SCHEMA\`.\`MODELS\`
where \`MODEL_SCHEMA_ID\` = '${database.id}'

union all

-- Model options (WITH clause configuration)
select
'modelOption' as \`rowType\`,
\`MODEL_NAME\` as \`modelName\`,
CAST(NULL AS STRING) as \`defaultVersion\`,
CAST(NULL AS INT) as \`versionCount\`,
CAST(NULL AS STRING) as \`comment\`,
\`OPTION_KEY\` as \`optionKey\`,
\`OPTION_VALUE\` as \`optionValue\`,
\`VERSION\` as \`version\`
from \`INFORMATION_SCHEMA\`.\`MODEL_OPTIONS\`
where \`MODEL_SCHEMA_ID\` = '${database.id}'
`;
}

/** Describes rows from the models query describing the model as a whole */
export interface RawModelRow {
rowType: "model";
modelName: string;
defaultVersion: string;
versionCount: number;
comment: string | null;
optionKey: null;
optionValue: null;
version: null;
}

/** Describes rows from the models query describing a single option for a model */
export interface RawModelOptionRow {
rowType: "modelOption";
modelName: string;
defaultVersion: null;
versionCount: null;
comment: null;
optionKey: string;
optionValue: string;
version: string;
}

/** Raw results type corresponding to `SHOW MODELS` query */
export type RawFlinkAIModelRow = {
"Model Name": string;
};
/** Raw results type corresponding to the models INFORMATION_SCHEMA query */
export type RawFlinkAIModelRow = RawModelRow | RawModelOptionRow;

/**
* Transform raw model rows from the `SHOW MODELS` query into basic {@link FlinkAIModel} objects.
* Transform raw model rows from the INFORMATION_SCHEMA models query into {@link FlinkAIModel} objects.
* Processes mixed model definition rows and model option rows to build complete model objects with their options.
*
* @param database What cluster these models belong to
* @param rawResults The raw rows from the `SHOW MODELS` query
* @param rawResults The raw rows from the INFORMATION_SCHEMA models query (both model and modelOption rows)
* @returns Array of {@link FlinkAIModel} objects, sorted by name.
*/
export function transformRawFlinkAIModelRows(
database: CCloudFlinkDbKafkaCluster,
rawResults: RawFlinkAIModelRow[],
): FlinkAIModel[] {
const models: FlinkAIModel[] = rawResults.map((row) => {
return new FlinkAIModel({
environmentId: database.environmentId,
provider: database.provider,
region: database.region,
databaseId: database.id,
name: row["Model Name"],
});
});
logger.debug(
`Transforming ${rawResults.length} raw model rows for cluster ${database.name} (${database.id})`,
);

// Sort rows to ensure model definition comes before its options
sortRawModelRows(rawResults);

const models: FlinkAIModel[] = [];
let currentModel: FlinkAIModel | null = null;
const seenModelNames = new Set<string>();

for (const row of rawResults) {
if (row.rowType === "model") {
// Create new model
if (seenModelNames.has(row.modelName)) {
throw new Error(`Duplicate model name ${row.modelName} in INFORMATION_SCHEMA results`);
}
seenModelNames.add(row.modelName);

currentModel = new FlinkAIModel({
environmentId: database.environmentId,
provider: database.provider,
region: database.region,
databaseId: database.id,
name: row.modelName,
defaultVersion: row.defaultVersion,
versionCount: row.versionCount,
comment: row.comment,
options: new Map(),
});

models.push(currentModel);
} else {
// Model option row
if (currentModel === null || currentModel.name !== row.modelName) {
throw new Error(
`Unexpected model option row for model ${row.modelName} when current model is ${currentModel?.name}`,
);
}

// Add option to current model's options map, keyed by version
const versionKey = row.version || "default";
if (!currentModel.options.has(versionKey)) {
currentModel.options.set(versionKey, new Map());
}
const optionsMap = currentModel.options.get(versionKey)!;
optionsMap.set(row.optionKey, row.optionValue);
}
}

logger.debug(`Transformed to ${models.length} FlinkAIModel objects`);

// Sort models by name
models.sort((a, b) => a.name.localeCompare(b.name));
return models;
}

/**
* Sorts RawFlinkAIModelRow[] by modelName, then by rowType (model rows first).
* This ensures that each model's definition row comes before its option rows.
*/
function sortRawModelRows(rows: RawFlinkAIModelRow[]): void {
rows.sort((a, b) => {
// First sort by model name
if (a.modelName !== b.modelName) {
return a.modelName.localeCompare(b.modelName);
}

// Then sort by row type (model rows first)
return rowRank(a) - rowRank(b);
});
}

/** Assist in sorting the row types: model rows come before option rows */
function rowRank(row: RawFlinkAIModelRow): number {
switch (row.rowType) {
case "model":
return 0;
case "modelOption":
return 1;
}
}
46 changes: 44 additions & 2 deletions src/models/flinkAiModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,40 @@ export class FlinkAIModel implements IResourceBase, IdItem, ISearchable {
databaseId: string;

name: string;
defaultVersion: string;
versionCount: number;
comment: string | null;
/** Map of option keys to their values, keyed by version (if version-specific) or 'default' for unversioned options */
options: Map<string, Map<string, string>>;

// https://github.com/confluentinc/vscode/issues/2989
iconName: IconNames = IconNames.PLACEHOLDER;

constructor(
props: Pick<FlinkAIModel, "environmentId" | "provider" | "region" | "databaseId" | "name">,
props: Pick<
FlinkAIModel,
| "environmentId"
| "provider"
| "region"
| "databaseId"
| "name"
| "defaultVersion"
| "versionCount"
| "comment"
| "options"
>,
) {
this.environmentId = props.environmentId;
this.provider = props.provider;
this.region = props.region;
this.databaseId = props.databaseId;

this.name = props.name;
this.defaultVersion = props.defaultVersion;
this.versionCount = props.versionCount;
this.comment = props.comment;
// Handle rehydration from cache where Map may have been serialized to a plain object
this.options = props.options instanceof Map ? props.options : new Map();
}

get id(): string {
Expand Down Expand Up @@ -58,7 +79,28 @@ export class FlinkAIModelTreeItem extends TreeItem {
export function createFlinkModelToolTip(resource: FlinkAIModel): CustomMarkdownString {
const tooltip = new CustomMarkdownString()
.addHeader("Flink AI Model", resource.iconName)
.addField("Name", resource.name);
.addField("Name", resource.name)
.addField("Default Version", resource.defaultVersion)
.addField("Version Count", resource.versionCount.toString());

if (resource.comment) {
tooltip.addField("Comment", resource.comment);
}

// Add model options if present
if (resource.options.size > 0) {
tooltip.addDivider();
tooltip.appendMarkdown("\n\n**Options:**");
for (const [version, optionsMap] of resource.options) {
if (optionsMap.size > 0) {
const versionLabel = version === "default" ? "Default" : `Version ${version}`;
tooltip.appendMarkdown(`\n\n_${versionLabel}_:`);
for (const [key, value] of optionsMap) {
tooltip.addField(` ${key}`, value);
}
}
}
}

return tooltip;
}
8 changes: 8 additions & 0 deletions tests/unit/testResources/flinkAIModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,20 @@ import { TEST_CCLOUD_FLINK_DB_KAFKA_CLUSTER } from "./kafkaCluster";
export function createFlinkAIModel(
name: string,
database: CCloudFlinkDbKafkaCluster = TEST_CCLOUD_FLINK_DB_KAFKA_CLUSTER,
defaultVersion: string = "1",
versionCount: number = 1,
comment: string | null = null,
options: Map<string, Map<string, string>> = new Map(),
): FlinkAIModel {
return new FlinkAIModel({
environmentId: database.environmentId,
provider: database.provider,
region: database.region,
databaseId: database.id,
name: name,
defaultVersion: defaultVersion,
versionCount: versionCount,
comment: comment,
options: options,
});
}