diff --git a/src/apis/metadata/index.ts b/src/apis/metadata/index.ts index f0a1bb7..ad363ec 100644 --- a/src/apis/metadata/index.ts +++ b/src/apis/metadata/index.ts @@ -4,3 +4,4 @@ export * as findCoordinatorV4 from './find-coordinator-v4.ts' export * as findCoordinatorV5 from './find-coordinator-v5.ts' export * as findCoordinatorV6 from './find-coordinator-v6.ts' export * as metadataV12 from './metadata-v12.ts' +export * as metadataV8 from './metadata-v8.ts' diff --git a/src/apis/metadata/metadata-v8.ts b/src/apis/metadata/metadata-v8.ts new file mode 100644 index 0000000..f68f9d3 --- /dev/null +++ b/src/apis/metadata/metadata-v8.ts @@ -0,0 +1,151 @@ +import { ResponseError } from '../../errors.ts' +import { type NullableString } from '../../protocol/definitions.ts' +import { type Reader } from '../../protocol/reader.ts' +import { Writer } from '../../protocol/writer.ts' +import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts' + +export type MetadataRequest = Parameters + +export interface MetadataResponsePartition { + errorCode: number + partitionIndex: number + leaderId: number + leaderEpoch: number + replicaNodes: number[] + isrNodes: number[] + offlineReplicas: number[] +} + +export interface MetadataResponseTopic { + errorCode: number + name: NullableString + isInternal: boolean + partitions: MetadataResponsePartition[] + topicAuthorizedOperations: number +} + +export interface MetadataResponseBroker { + nodeId: number + host: string + port: number + rack: NullableString +} + +export interface MetadataResponse { + throttleTimeMs: number + brokers: MetadataResponseBroker[] + clusterId: NullableString + controllerId: number + topics: MetadataResponseTopic[] + clusterAuthorizedOperations: number +} + +/* + Metadata Request (Version: 8) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations + topics => name + name => STRING + allow_auto_topic_creation => BOOLEAN + include_cluster_authorized_operations => BOOLEAN + include_topic_authorized_operations => BOOLEAN +*/ +export function createRequest ( + topics: string[] | null, + allowAutoTopicCreation = false, + includeTopicAuthorizedOperations = false, + includeClusterAuthorizedOperations = false +): Writer { + return Writer.create() + .appendArray(topics, (w, topic) => w.appendString(topic, false), false, false) + .appendBoolean(allowAutoTopicCreation) + .appendBoolean(includeClusterAuthorizedOperations) + .appendBoolean(includeTopicAuthorizedOperations) +} + +/* + Metadata Response (Version: 8) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations + throttle_time_ms => INT32 + brokers => node_id host port rack + node_id => INT32 + host => STRING + port => INT32 + rack => NULLABLE_STRING + cluster_id => NULLABLE_STRING + controller_id => INT32 + topics => error_code name is_internal [partitions] topic_authorized_operations + error_code => INT16 + name => STRING + is_internal => BOOLEAN + partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] + error_code => INT16 + partition_index => INT32 + leader_id => INT32 + leader_epoch => INT32 + replica_nodes => INT32 + isr_nodes => INT32 + offline_replicas => INT32 + topic_authorized_operations => INT32 + cluster_authorized_operations => INT32 +*/ +export function parseResponse ( + _correlationId: number, + apiKey: number, + apiVersion: number, + reader: Reader +): MetadataResponse { + const errors: ResponseErrorWithLocation[] = [] + + const response: MetadataResponse = { + throttleTimeMs: reader.readInt32(), + brokers: reader.readArray( + (r: Reader) => ({ + nodeId: r.readInt32(), + host: r.readString(false), + port: r.readInt32(), + rack: r.readNullableString(false) + }), + false, + false + ), + clusterId: reader.readNullableString(false), + controllerId: reader.readInt32(), + topics: reader.readArray( + (r, i) => { + const ec = r.readInt16() + if (ec !== 0) errors.push([`/topics/${i}`, ec]) + return { + errorCode: ec, + name: r.readString(false), + isInternal: r.readBoolean(), + partitions: r.readArray( + (r2, j) => { + const pec = r2.readInt16() + if (pec !== 0) errors.push([`/topics/${i}/partitions/${j}`, pec]) + return { + errorCode: pec, + partitionIndex: r2.readInt32(), + leaderId: r2.readInt32(), + leaderEpoch: r2.readInt32(), + replicaNodes: r2.readArray(() => r2.readInt32(), false, false)!, + isrNodes: r2.readArray(() => r2.readInt32(), false, false)!, + offlineReplicas: r2.readArray(() => r2.readInt32(), false, false)! + } + }, + false, + false + ), + topicAuthorizedOperations: r.readInt32() + } + }, + false, + false + ), + clusterAuthorizedOperations: reader.readInt32() + } + + if (errors.length) { + throw new ResponseError(apiKey, apiVersion, Object.fromEntries(errors), response) + } + return response +} + +export const api = createAPI(3, 8, createRequest, parseResponse, false, false) diff --git a/test/apis/metadata/metadata-v8.test.ts b/test/apis/metadata/metadata-v8.test.ts new file mode 100644 index 0000000..8f24b30 --- /dev/null +++ b/test/apis/metadata/metadata-v8.test.ts @@ -0,0 +1,439 @@ +import { deepStrictEqual, ok, throws } from 'node:assert' +import test from 'node:test' +import { metadataV8, Reader, ResponseError, Writer } from '../../../src/index.ts' + +const { createRequest, parseResponse } = metadataV8 + +test('createRequest serializes request parameters correctly', () => { + // Values for the request + const topics = ['topic-1', 'topic-2'] + const allowAutoTopicCreation = true + const includeTopicAuthorizedOperations = true + const includeClusterAuthorizedOperations = false + const writer = createRequest( + topics, + allowAutoTopicCreation, + includeTopicAuthorizedOperations, + includeClusterAuthorizedOperations + ) + + // Verify it returns a Writer + ok(writer instanceof Writer) + + // Read the serialized data to verify correctness + const reader = Reader.from(writer) + + // Read the entire request structure and verify in one assertion + const serialized = { + topics: reader.readArray(r => r.readString(false), false, false), + allowAutoTopicCreation: reader.readBoolean(), + includeClusterAuthorizedOperations: reader.readBoolean(), + includeTopicAuthorizedOperations: reader.readBoolean() + } + + deepStrictEqual( + serialized, + { + topics: ['topic-1', 'topic-2'], + allowAutoTopicCreation, + includeClusterAuthorizedOperations, + includeTopicAuthorizedOperations + }, + 'Serialized request should match expected structure' + ) +}) + +test('createRequest handles null topics', () => { + const topics = null + const allowAutoTopicCreation = false + const includeTopicAuthorizedOperations = false + const includeClusterAuthorizedOperations = false + + const writer = createRequest( + topics, + allowAutoTopicCreation, + includeTopicAuthorizedOperations, + includeClusterAuthorizedOperations + ) + + // Verify it returns a Writer + ok(writer instanceof Writer) + + // Read the serialized data to verify correctness + const reader = Reader.from(writer) + + // Read the entire request structure and verify in one assertion + const serialized = { + topics: reader.readNullableArray( + () => { + throw new Error('This should not be called because topics is null') + }, + false, + false + ), + allowAutoTopicCreation: reader.readBoolean(), + includeClusterAuthorizedOperations: reader.readBoolean(), + includeTopicAuthorizedOperations: reader.readBoolean() + } + + deepStrictEqual( + serialized, + { + topics: null, + allowAutoTopicCreation, + includeClusterAuthorizedOperations, + includeTopicAuthorizedOperations + }, + 'Serialized request with null topics should match expected structure' + ) +}) + +test('parseResponse correctly processes a successful response', () => { + // Create a successful response + const includeClusterAuthorizedOperations = false + const writer = Writer.create() + .appendInt32(0) // throttleTimeMs + // Brokers array - compact array format + .appendArray( + [ + { + nodeId: 1, + host: 'broker1.example.com', + port: 9092, + rack: 'us-west' + }, + { + nodeId: 2, + host: 'broker2.example.com', + port: 9092, + rack: null + } + ], + (w, broker) => { + w.appendInt32(broker.nodeId) + .appendString(broker.host, false) + .appendInt32(broker.port) + .appendString(broker.rack, false) + }, + false, + false + ) + .appendString('test-cluster', false) // clusterId - compact string + .appendInt32(1) // controllerId + // Topics array + .appendArray( + [ + { + errorCode: 0, + name: 'test-topic', + isInternal: false, + partitions: [ + { + errorCode: 0, + partitionIndex: 0, + leaderId: 1, + leaderEpoch: 101, + replicaNodes: [1, 2], + isrNodes: [1, 2], + offlineReplicas: [] + } + ], + topicAuthorizedOperations: 0 + } + ], + (w, topic) => { + w.appendInt16(topic.errorCode) + .appendString(topic.name, false) + .appendBoolean(topic.isInternal) + // Partitions array + .appendArray( + topic.partitions, + (w, partition) => { + w.appendInt16(partition.errorCode) + .appendInt32(partition.partitionIndex) + .appendInt32(partition.leaderId) + .appendInt32(partition.leaderEpoch) + // ReplicaNodes, IsrNodes, and OfflineReplicas arrays + .appendArray(partition.replicaNodes, (w, r) => w.appendInt32(r), false, false) + .appendArray(partition.isrNodes, (w, r) => w.appendInt32(r), false, false) + .appendArray(partition.offlineReplicas, (w, r) => w.appendInt32(r), false, false) + }, + false, + false + ) + .appendInt32(topic.topicAuthorizedOperations) + }, + false, + false + ) + .appendInt32(0) + .appendBoolean(includeClusterAuthorizedOperations) + + const response = parseResponse(1, 3, 12, Reader.from(writer)) + + // Verify structure + deepStrictEqual(response, { + throttleTimeMs: 0, + brokers: [ + { + nodeId: 1, + host: 'broker1.example.com', + port: 9092, + rack: 'us-west' + }, + { + nodeId: 2, + host: 'broker2.example.com', + port: 9092, + rack: null + } + ], + clusterId: 'test-cluster', + controllerId: 1, + topics: [ + { + errorCode: 0, + name: 'test-topic', + isInternal: false, + partitions: [ + { + errorCode: 0, + partitionIndex: 0, + leaderId: 1, + leaderEpoch: 101, + replicaNodes: [1, 2], + isrNodes: [1, 2], + offlineReplicas: [] + } + ], + topicAuthorizedOperations: 0 + } + ], + clusterAuthorizedOperations: 0 + }) +}) + +test('parseResponse handles response with throttling', () => { + // Create a response with throttling + const writer = Writer.create() + .appendInt32(100) // throttleTimeMs (non-zero value for throttling) + // Brokers array + .appendArray( + [ + { + nodeId: 1, + host: 'broker1.example.com', + port: 9092, + rack: null + } + ], + (w, broker) => { + w.appendInt32(broker.nodeId) + .appendString(broker.host, false) + .appendInt32(broker.port) + .appendString(broker.rack, false) + }, + false, + false + ) + .appendString('test-cluster', false) // clusterId + .appendInt32(1) // controllerId + // Topics array (empty) + .appendArray([], () => {}, false, false) + .appendInt32(0) + + const response = parseResponse(1, 3, 12, Reader.from(writer)) + + // Verify response structure + deepStrictEqual(response, { + throttleTimeMs: 100, + brokers: [ + { + nodeId: 1, + host: 'broker1.example.com', + port: 9092, + rack: null + } + ], + clusterId: 'test-cluster', + controllerId: 1, + topics: [], + clusterAuthorizedOperations: 0 + }) +}) + +test('parseResponse throws error on topic error code', () => { + // Create a response with topic error + const writer = Writer.create() + .appendInt32(0) // throttleTimeMs + // Brokers array + .appendArray( + [ + { + nodeId: 1, + host: 'broker1.example.com', + port: 9092, + rack: null + } + ], + (w, broker) => { + w.appendInt32(broker.nodeId) + .appendString(broker.host, false) + .appendInt32(broker.port) + .appendString(broker.rack, false) + }, + false, + false + ) + .appendString('test-cluster', false) // clusterId + .appendInt32(1) // controllerId + // Topics array + .appendArray( + [ + { + errorCode: 3, // UNKNOWN_TOPIC_OR_PARTITION + name: 'nonexistent-topic', + isInternal: false, + partitions: [], + topicAuthorizedOperations: 0 + } + ], + (w, topic) => { + w.appendInt16(topic.errorCode) + .appendString(topic.name, false) + .appendBoolean(topic.isInternal) + // Empty partitions array + .appendArray(topic.partitions, () => {}, false, false) + .appendInt32(topic.topicAuthorizedOperations) + }, + false, + false + ) + .appendInt32(0) + + // Verify that parsing throws ResponseError + throws( + () => { + parseResponse(1, 3, 12, Reader.from(writer)) + }, + (err: any) => { + ok(err instanceof ResponseError) + ok(err.message.includes('Received response with error while executing API')) + + // Verify the error location and code + ok(typeof err.errors === 'object') + + // Verify the response is preserved + deepStrictEqual(err.response.topics[0], { + errorCode: 3, + name: 'nonexistent-topic', + isInternal: false, + partitions: [], + topicAuthorizedOperations: 0 + }) + + return true + } + ) +}) + +test('parseResponse throws error on partition error code', () => { + // Create a response with partition error + const writer = Writer.create() + .appendInt32(0) // throttleTimeMs + // Brokers array + .appendArray( + [ + { + nodeId: 1, + host: 'broker1.example.com', + port: 9092, + rack: null + } + ], + (w, broker) => { + w.appendInt32(broker.nodeId) + .appendString(broker.host, false) + .appendInt32(broker.port) + .appendString(broker.rack, false) + }, + false, + false + ) + .appendString('test-cluster', false) // clusterId + .appendInt32(1) // controllerId + // Topics array + .appendArray( + [ + { + errorCode: 0, // success + name: 'test-topic', + isInternal: false, + partitions: [ + { + errorCode: 9, // REPLICA_NOT_AVAILABLE + partitionIndex: 0, + leaderId: -1, + leaderEpoch: 0, + replicaNodes: [1], + isrNodes: [], + offlineReplicas: [2] + } + ], + topicAuthorizedOperations: 0 + } + ], + (w, topic) => { + w.appendInt16(topic.errorCode) + .appendString(topic.name, false) + .appendBoolean(topic.isInternal) + // Partitions array with error + .appendArray( + topic.partitions, + (w, partition) => { + w.appendInt16(partition.errorCode) + .appendInt32(partition.partitionIndex) + .appendInt32(partition.leaderId) + .appendInt32(partition.leaderEpoch) + // ReplicaNodes, IsrNodes, and OfflineReplicas arrays + .appendArray(partition.replicaNodes, (w, r) => w.appendInt32(r), false, false) + .appendArray(partition.isrNodes, (w, r) => w.appendInt32(r), false, false) + .appendArray(partition.offlineReplicas, (w, r) => w.appendInt32(r), false, false) + }, + false, + false + ) + .appendInt32(topic.topicAuthorizedOperations) + }, + false, + false + ) + .appendInt32(0) + + // Verify that parsing throws ResponseError + throws( + () => { + parseResponse(1, 3, 12, Reader.from(writer)) + }, + (err: any) => { + ok(err instanceof ResponseError) + ok(err.message.includes('Received response with error while executing API')) + + // Verify the error location and code + ok(typeof err.errors === 'object') + + // Verify the response is preserved + deepStrictEqual(err.response.topics[0].partitions[0], { + errorCode: 9, + partitionIndex: 0, + leaderId: -1, + leaderEpoch: 0, + replicaNodes: [1], + isrNodes: [], + offlineReplicas: [2] + }) + + return true + } + ) +})