diff --git a/README.md b/README.md index 797cc0f..2e3e3bc 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # StreamNative MCP Server -A Model Context Protocol (MCP) server for integrating AI agents with StreamNative Cloud resources and Apache Pulsar/Kafka messaging systems. +A Model Context Protocol (MCP) server for integrating AI agents with StreamNative Cloud resources and Apache Kafka/Pulsar messaging systems. ## Overview -StreamNative MCP Server provides a standard interface for LLMs (Large Language Models) and AI agents to interact with StreamNative Cloud services, Apache Pulsar, and Apache Kafka. This implementation follows the [Model Context Protocol](https://modelcontextprotocol.io/introduction) specification, enabling AI applications to access messaging services through a standardized interface. +StreamNative MCP Server provides a standard interface for LLMs (Large Language Models) and AI agents to interact with StreamNative Cloud services, Apache Kafka, and Apache Pulsar. This implementation follows the [Model Context Protocol](https://modelcontextprotocol.io/introduction) specification, enabling AI applications to access messaging services through a standardized interface. ## Features @@ -12,19 +12,21 @@ StreamNative MCP Server provides a standard interface for LLMs (Large Language M - Connect to StreamNative Cloud resources with authentication - Switch to clusters available in your organization - Describe the status of clusters resources -- **Apache Pulsar Support**: Interact with Pulsar resources including: - - Pulsar Admin operations (topics, namespaces, tenants, schemas, etc.) - - Pulsar Client operations (producers, consumers) - - Functions, Sources, and Sinks management -- **Apache Kafka Support**: Interact with Kafka resources including: +- **Apache Kafka Support**: Interact with Apache Kafka resources including: - Kafka Admin operations (topics, partitions, consumer groups) - Schema Registry operations - - Kafka Connect operations + - Kafka Connect operations (*) - Kafka Client operations (producers, consumers) +- **Apache Pulsar Support**: Interact with Apache Pulsar resources including: + - Pulsar Admin operations (topics, namespaces, tenants, schemas, etc.) + - Pulsar Client operations (producers, consumers) + - Functions, Sources, and Sinks management - **Multiple Connection Options**: - Connect to StreamNative Cloud with service account authentication - - Connect directly to external Pulsar clusters - - Connect directly to external Kafka clusters + - Connect directly to external Apache Kafka clusters + - Connect directly to external Apache Pulsar clusters + +> *: The Kafka Connect operations are only tested and verfied on StreamNative Cloud. ## Installation @@ -124,45 +126,65 @@ The StreamNative MCP Server supports enabling or disabling specific groups of fu ### Available Features -The following sets of tools are available (all available by default on StreamNative Cloud) - -| Features | Description | -| ------|-------| -| `all` | All tools, including StreamNative Cloud tools, Pulsar tools and Kafka tools | -| `all-pulsar` | All Pulsar admin and Pulsar client tools | -| `all-kafka` | All Kafka admin and Kafka client tools | -| `pulsar-admin` | Pulsar administrative operations (including all `pulsar-admin-*`) | -| `pulsar-client` | Pulsar client operations (produce and consume messages) | -| `pulsar-admin-brokers` | Manage Pulsar brokers | -| `pulsar-admin-broker-stats` | Access Pulsar broker statistics | -| `pulsar-admin-clusters` | Manage Pulsar clusters | -| `pulsar-admin-functions-worker` | Manage Pulsar Function workers | -| `pulsar-admin-namespaces` | Manage Pulsar namespaces | -| `pulsar-admin-namespace-policy` | Configure Pulsar namespace policies | -| `pulsar-admin-isolation-policy` | Manage namespace isolation policies | -| `pulsar-admin-packages` | Manage Pulsar packages | -| `pulsar-admin-resource-quotas` | Configure resource quotas | -| `pulsar-admin-schemas` | Manage Pulsar schemas | -| `pulsar-admin-subscriptions` | Manage Pulsar subscriptions | -| `pulsar-admin-tenants` | Manage Pulsar tenants | -| `pulsar-admin-topics` | Manage Pulsar topics | -| `pulsar-admin-sinks` | Manage Pulsar IO sinks | -| `pulsar-admin-functions` | Manage Pulsar Functions | -| `pulsar-admin-sources` | Manage Pulsar IO sources | -| `pulsar-admin-topic-policy` | Configure Pulsar topic policies | -| `kafka-admin` | Kafka administrative operations (including all `kafka-admin-*`) | -| `kafka-client` | Kafka client operations (produce and consume messages) | -| `kafka-admin-topics` | Manage Kafka partitions | -| `kafka-admin-partitions` | Manage Kafka partitions | -| `kafka-admin-groups` | Manage Kafka consumer groups | -| `kafka-admin-schema-registry` | Interact with Kafka Schema Registry | -| `kafka-admin-connect` | Manage Kafka Connect connectors | -| `streamnative-cloud` | Manage the context, check resources logs of StreamNative Cloud | - -### Usage Examples - -To enable only specific feature sets: - +The StreamNative MCP Server allows you to enable or disable specific groups of features using the `--features` flag. This helps you control which tools are available to your AI agents and can reduce context size for LLMs. + +#### Combination Feature Sets + +| Feature | Description | +|---------------|-----------------------------------------------------------------------------| +| `all` | Enables all features: StreamNative Cloud, Pulsar, and Kafka tools | + +--- + +#### Kafka Features + +| Feature | Description | Docs | +|--------------------------|--------------------------------------------------|------| +| `all-kafka` | Enables all Kafka admin and client tools, without Apache Pulsar and StreamNative Cloud tools | +| `kafka-admin` | Kafka administrative operations (all admin tools) | | +| `kafka-client` | Kafka client operations (produce/consume) |[kafka_client_consume.md](docs/tools/kafka_client_consume.md), [kafka_client_produce.md](docs/tools/kafka_client_produce.md) | +| `kafka-admin-topics` | Manage Kafka topics | [kafka_admin_topics.md](docs/tools/kafka_admin_topics.md) | +| `kafka-admin-partitions` | Manage Kafka partitions | [kafka_admin_partitions.md](docs/tools/kafka_admin_partitions.md) | +| `kafka-admin-groups` | Manage Kafka consumer groups | [kafka_admin_groups.md](docs/tools/kafka_admin_groups.md) | +| `kafka-admin-schema-registry` | Interact with Kafka Schema Registry | [kafka_admin_schema_registry.md](docs/tools/kafka_admin_schema_registry.md) | +| `kafka-admin-connect` | Manage Kafka Connect connectors | [kafka_admin_connect.md](docs/tools/kafka_admin_connect.md) | + +--- + +#### Pulsar Features + +| Feature | Description | Docs | +|--------------------------|--------------------------------------------------|------| +| `all-pulsar` | Enables all Pulsar admin and client tools, without Apache Kafka and StreamNative Cloud tools | | +| `pulsar-admin` | Pulsar administrative operations (all admin tools)| | +| `pulsar-client` | Pulsar client operations (produce/consume) | [pulsar_client_consume.md](docs/tools/pulsar_client_consume.md), [pulsar_client_produce.md](docs/tools/pulsar_client_produce.md) | +| `pulsar-admin-brokers` | Manage Pulsar brokers | [pulsar_admin_brokers.md](docs/tools/pulsar_admin_brokers.md) | +| `pulsar-admin-broker-stats` | Access Pulsar broker statistics | [pulsar_admin_broker_stats.md](docs/tools/pulsar_admin_broker_stats.md) | +| `pulsar-admin-clusters` | Manage Pulsar clusters | [pulsar_admin_clusters.md](docs/tools/pulsar_admin_clusters.md) | +| `pulsar-admin-functions-worker`| Manage Pulsar Function workers | [pulsar_admin_functions_worker.md](docs/tools/pulsar_admin_functions_worker.md) | +| `pulsar-admin-namespaces` | Manage Pulsar namespaces | [pulsar_admin_namespaces.md](docs/tools/pulsar_admin_namespaces.md) | +| `pulsar-admin-namespace-policy`| Configure Pulsar namespace policies | [pulsar_admin_namespace_policy.md](docs/tools/pulsar_admin_namespace_policy.md) | +| `pulsar-admin-isolation-policy`| Manage namespace isolation policies | [pulsar_admin_isolation_policy.md](docs/tools/pulsar_admin_isolation_policy.md) | +| `pulsar-admin-packages` | Manage Pulsar packages | [pulsar_admin_packages.md](docs/tools/pulsar_admin_packages.md) | +| `pulsar-admin-resource-quotas` | Configure resource quotas | [pulsar_admin_resource_quotas.md](docs/tools/pulsar_admin_resource_quotas.md) | +| `pulsar-admin-schemas` | Manage Pulsar schemas | [pulsar_admin_schemas.md](docs/tools/pulsar_admin_schemas.md) | +| `pulsar-admin-subscriptions` | Manage Pulsar subscriptions | [pulsar_admin_subscriptions.md](docs/tools/pulsar_admin_subscriptions.md) | +| `pulsar-admin-tenants` | Manage Pulsar tenants | [pulsar_admin_tenants.md](docs/tools/pulsar_admin_tenants.md) | +| `pulsar-admin-topics` | Manage Pulsar topics | [pulsar_admin_topics.md](docs/tools/pulsar_admin_topics.md) | +| `pulsar-admin-sinks` | Manage Pulsar IO sinks | [pulsar_admin_sinks.md](docs/tools/pulsar_admin_sinks.md) | +| `pulsar-admin-functions` | Manage Pulsar Functions | [pulsar_admin_functions.md](docs/tools/pulsar_admin_functions.md) | +| `pulsar-admin-sources` | Manage Pulsar Sources | [pulsar_admin_sources.md](docs/tools/pulsar_admin_sources.md) | +| `pulsar-admin-topic-policy` | Configure Pulsar topic policies | [pulsar_admin_topic_policy.md](docs/tools/pulsar_admin_topic_policy.md) | + +--- + +#### StreamNative Cloud Features + +| Feature | Description | Docs | +|---------------------|------------------------------------------------------------------|------| +| `streamnative-cloud`| Manage StreamNative Cloud context and check resource logs | [streamnative_cloud.md](docs/tools/streamnative_cloud.md) | + +You can combine these features as needed using the `--features` flag. For example, to enable only Pulsar client features: ```bash # Enable only Pulsar client features bin/snmcp stdio --organization my-org --key-file /path/to/key-file.json --features pulsar-client @@ -244,3 +266,5 @@ This project uses [semver](https://semver.org/) semantics. ## License Licensed under the Apache License Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + + diff --git a/docs/tools/kafka_admin_connect.md b/docs/tools/kafka_admin_connect.md new file mode 100644 index 0000000..2edcb39 --- /dev/null +++ b/docs/tools/kafka_admin_connect.md @@ -0,0 +1,34 @@ +#### kafka-admin-connect + +Kafka Connect is a framework for integrating Kafka with external systems. The following resources and operations are supported: + +- **kafka-connect-cluster** + - **get**: Get information about the Kafka Connect cluster + - _Parameters_: None + +- **connectors** + - **list**: List all connectors in the cluster + - _Parameters_: None + +- **connector** + - **get**: Get details of a specific connector + - `name` (string, required): The connector name + - **create**: Create a new connector + - `name` (string, required): The connector name + - `config` (object, required): Connector configuration + - Must include at least `connector.class` and other required fields for the connector type + - **update**: Update an existing connector + - `name` (string, required): The connector name + - `config` (object, required): Updated configuration + - **delete**: Delete a connector + - `name` (string, required): The connector name + - **restart**: Restart a connector + - `name` (string, required): The connector name + - **pause**: Pause a connector + - `name` (string, required): The connector name + - **resume**: Resume a paused connector + - `name` (string, required): The connector name + +- **connector-plugins** + - **list**: List all available connector plugins + - _Parameters_: None \ No newline at end of file diff --git a/docs/tools/kafka_admin_groups.md b/docs/tools/kafka_admin_groups.md new file mode 100644 index 0000000..fd78e0b --- /dev/null +++ b/docs/tools/kafka_admin_groups.md @@ -0,0 +1,24 @@ +#### kafka-admin-groups + +This tool provides access to Kafka consumer group operations including listing, describing, and managing group membership. + +- **groups** + - **list**: List all Kafka Consumer Groups in the cluster + - _Parameters_: None + +- **group** + - **describe**: Get detailed information about a specific Consumer Group + - `group` (string, required): The name of the Kafka Consumer Group + - **remove-members**: Remove specific members from a Consumer Group + - `group` (string, required): The name of the Kafka Consumer Group + - `members` (string, required): Comma-separated list of member instance IDs (e.g. "consumer-instance-1,consumer-instance-2") + - **offsets**: Get offsets for a specific consumer group + - `group` (string, required): The name of the Kafka Consumer Group + - **delete-offset**: Delete a specific offset for a consumer group of a topic + - `group` (string, required): The name of the Kafka Consumer Group + - `topic` (string, required): The name of the Kafka topic + - **set-offset**: Set a specific offset for a consumer group's topic-partition + - `group` (string, required): The name of the Kafka Consumer Group + - `topic` (string, required): The name of the Kafka topic + - `partition` (number, required): The partition number + - `offset` (number, required): The offset value to set (use -1 for earliest, -2 for latest, or a specific value) \ No newline at end of file diff --git a/docs/tools/kafka_admin_partitions.md b/docs/tools/kafka_admin_partitions.md new file mode 100644 index 0000000..11689b2 --- /dev/null +++ b/docs/tools/kafka_admin_partitions.md @@ -0,0 +1,8 @@ +#### kafka-admin-partitions + +This tool provides access to Kafka partition operations, particularly adding partitions to existing topics. + +- **partition** + - **update**: Update the number of partitions for an existing Kafka topic (can only increase, not decrease) + - `topic` (string, required): The name of the Kafka topic + - `new-total` (number, required): The new total number of partitions (must be greater than current) \ No newline at end of file diff --git a/docs/tools/kafka_admin_schema_registry.md b/docs/tools/kafka_admin_schema_registry.md new file mode 100644 index 0000000..033d6c6 --- /dev/null +++ b/docs/tools/kafka_admin_schema_registry.md @@ -0,0 +1,38 @@ +#### kafka-admin-schema-registry + +This tool provides access to Kafka Schema Registry operations, including managing subjects, versions, and compatibility settings. + +- **subjects** + - **list**: List all schema subjects in the Schema Registry + - _Parameters_: None + +- **subject** + - **get**: Get the latest schema for a subject + - `subject` (string, required): The subject name + - **create**: Register a new schema for a subject + - `subject` (string, required): The subject name + - `schema` (string, required): The schema definition (in AVRO/JSON/PROTOBUF, etc.) + - `type` (string, optional): The schema type (e.g. AVRO, JSON, PROTOBUF) + - **delete**: Delete a schema subject + - `subject` (string, required): The subject name + +- **versions** + - **list**: List all versions for a specific subject + - `subject` (string, required): The subject name + - **get**: Get a specific version of a subject's schema + - `subject` (string, required): The subject name + - `version` (number, required): The version number + - **delete**: Delete a specific version of a subject's schema + - `subject` (string, required): The subject name + - `version` (number, required): The version number + +- **compatibility** + - **get**: Get compatibility setting for a subject + - `subject` (string, required): The subject name + - **set**: Set compatibility level for a subject + - `subject` (string, required): The subject name + - `level` (string, required): The compatibility level (e.g. BACKWARD, FORWARD, FULL, NONE) + +- **types** + - **list**: List supported schema types (e.g. AVRO, JSON, PROTOBUF) + - _Parameters_: None \ No newline at end of file diff --git a/docs/tools/kafka_admin_topics.md b/docs/tools/kafka_admin_topics.md new file mode 100644 index 0000000..a06187e --- /dev/null +++ b/docs/tools/kafka_admin_topics.md @@ -0,0 +1,18 @@ +#### kafka-admin-topics + +This tool provides access to various Kafka topic operations, including creation, deletion, listing, and configuration retrieval. + +- **topics** + - **list**: List all topics in the Kafka cluster + - `include-internal` (boolean, optional): Whether to include internal Kafka topics (those starting with an underscore). Default: false + +- **topic** + - **get**: Get detailed configuration for a specific topic + - `name` (string, required): The name of the Kafka topic + - **create**: Create a new topic + - `name` (string, required): The name of the Kafka topic + - `partitions` (number, optional): Number of partitions. Default: 1 + - `replication-factor` (number, optional): Replication factor. Default: 1 + - `configs` (array of string, optional): Topic configuration overrides as key-value strings, e.g. ["cleanup.policy=compact", "retention.ms=604800000"] + - **delete**: Delete an existing topic + - `name` (string, required): The name of the Kafka topic \ No newline at end of file diff --git a/docs/tools/kafka_client_consume.md b/docs/tools/kafka_client_consume.md new file mode 100644 index 0000000..f5e4c55 --- /dev/null +++ b/docs/tools/kafka_client_consume.md @@ -0,0 +1,15 @@ +#### kafka-client-consume + +Consume messages from a Kafka topic. This tool allows you to read messages from Kafka topics with various consumption options. + +- **kafka_client_consume** + - **Description**: Read messages from a Kafka topic, with support for consumer groups, offset control, and timeouts. If schema registry integration enabled, and the topic have schema with `topicName-value`, the consume tool will try to use the schema to decode the messages. + - **Parameters**: + - `topic` (string, required): The name of the Kafka topic to consume messages from. + - `group` (string, optional): The consumer group ID to use. If provided, offsets are tracked and committed; otherwise, a random group is used and offsets are not committed. + - `offset` (string, optional): The offset position to start consuming from. One of: + - 'atstart': Begin from the earliest available message (default) + - 'atend': Begin from the next message after the consumer starts + - 'atcommitted': Begin from the last committed offset (only works with specified 'group') + - `max-messages` (number, optional): Maximum number of messages to consume in this request. Default: 10 + - `timeout` (number, optional): Maximum time in seconds to wait for messages. Default: 10 \ No newline at end of file diff --git a/docs/tools/kafka_client_produce.md b/docs/tools/kafka_client_produce.md new file mode 100644 index 0000000..768b66f --- /dev/null +++ b/docs/tools/kafka_client_produce.md @@ -0,0 +1,12 @@ +#### kafka-client-produce + +Produce messages to a Kafka topic. This tool allows you to send single or multiple messages with various options. + +- **kafka_client_produce** + - **Description**: Send messages to a Kafka topic, supporting keys, headers, partitions, batching, and file-based payloads. + - **Parameters**: + - `topic` (string, required): The name of the Kafka topic to produce messages to. + - `key` (string, optional): The key for the message. Used for partition assignment and ordering. + - `value` (string, required if 'messages' is not provided): The value/content of the message to send. + - `headers` (array, optional): Message headers in the format of [{"key": "header-key", "value": "header-value"}]. + - `sync` (boolean, optional): Whether to wait for server acknowledgment before returning. Default: true. \ No newline at end of file diff --git a/docs/tools/pulsar_admin_broker_stats.md b/docs/tools/pulsar_admin_broker_stats.md new file mode 100644 index 0000000..8837cf8 --- /dev/null +++ b/docs/tools/pulsar_admin_broker_stats.md @@ -0,0 +1,19 @@ +#### pulsar_admin_broker_stats + +Unified tool for retrieving Apache Pulsar broker statistics. + +- **monitoring_metrics** + - **get**: Get broker monitoring metrics + +- **mbeans** + - **get**: Get JVM MBeans statistics from broker + +- **topics** + - **get**: Get statistics for all topics managed by the broker + +- **allocator_stats** + - **get**: Get memory allocator statistics + - `allocator_name` (string, required): Name of the allocator + +- **load_report** + - **get**: Get broker load report \ No newline at end of file diff --git a/docs/tools/pulsar_admin_brokers.md b/docs/tools/pulsar_admin_brokers.md new file mode 100644 index 0000000..e475d3c --- /dev/null +++ b/docs/tools/pulsar_admin_brokers.md @@ -0,0 +1,27 @@ +#### pulsar_admin_brokers + +Unified tool for managing Apache Pulsar broker resources. + +- **brokers** + - **list**: List all active brokers in a cluster + - `clusterName` (string, required): The cluster name + +- **health** + - **get**: Check the health status of a broker + +- **config** + - **get**: Get broker configuration + - `configType` (string, required): Configuration type, available options: + - `dynamic`: Get list of dynamically modifiable configuration names + - `runtime`: Get all runtime configurations (including static and dynamic configs) + - `internal`: Get internal configuration information + - `all_dynamic`: Get all dynamic configurations and their current values + - **update**: Update broker configuration + - `configName` (string, required): Configuration parameter name + - `configValue` (string, required): Configuration parameter value + - **delete**: Delete broker configuration + - `configName` (string, required): Configuration parameter name + +- **namespaces** + - **get**: Get namespaces managed by a broker + - `brokerUrl` (string, required): Broker URL, e.g., '127.0.0.1:8080' \ No newline at end of file diff --git a/docs/tools/pulsar_admin_cluster.md b/docs/tools/pulsar_admin_cluster.md new file mode 100644 index 0000000..fe90279 --- /dev/null +++ b/docs/tools/pulsar_admin_cluster.md @@ -0,0 +1,45 @@ +#### pulsar_admin_cluster + +Unified tool for managing Apache Pulsar clusters. + +- **cluster** + - **list**: List all clusters + - **get**: Get configuration for a specific cluster + - `cluster_name` (string, required): The cluster name + - **create**: Create a new cluster + - `cluster_name` (string, required): The cluster name + - `service_url` (string, optional): Cluster web service URL + - `service_url_tls` (string, optional): Cluster TLS web service URL + - `broker_service_url` (string, optional): Cluster broker service URL + - `broker_service_url_tls` (string, optional): Cluster TLS broker service URL + - `peer_cluster_names` (array, optional): List of peer clusters + - **update**: Update an existing cluster + - `cluster_name` (string, required): The cluster name + - Same optional parameters as create + - **delete**: Delete a cluster + - `cluster_name` (string, required): The cluster name + +- **peer_clusters** + - **get**: Get list of peer clusters + - `cluster_name` (string, required): The cluster name + - **update**: Update peer clusters list + - `cluster_name` (string, required): The cluster name + - `peer_cluster_names` (array, required): List of peer cluster names + +- **failure_domain** + - **list**: List all failure domains in a cluster + - `cluster_name` (string, required): The cluster name + - **get**: Get configuration for a specific failure domain + - `cluster_name` (string, required): The cluster name + - `domain_name` (string, required): The failure domain name + - **create**: Create a new failure domain + - `cluster_name` (string, required): The cluster name + - `domain_name` (string, required): The failure domain name + - `brokers` (array, required): List of brokers in the domain + - **update**: Update an existing failure domain + - `cluster_name` (string, required): The cluster name + - `domain_name` (string, required): The failure domain name + - `brokers` (array, required): List of brokers in the domain + - **delete**: Delete a failure domain + - `cluster_name` (string, required): The cluster name + - `domain_name` (string, required): The failure domain name \ No newline at end of file diff --git a/docs/tools/pulsar_admin_functions.md b/docs/tools/pulsar_admin_functions.md new file mode 100644 index 0000000..e6d0963 --- /dev/null +++ b/docs/tools/pulsar_admin_functions.md @@ -0,0 +1,83 @@ +#### pulsar_admin_functions + +Manage Apache Pulsar Functions for stream processing. Pulsar Functions are lightweight compute processes that can consume messages from one or more Pulsar topics, apply user-defined processing logic, and produce results to another topic. Functions support Java, Python, and Go runtimes, enabling complex event processing, data transformations, filtering, and integration with external systems. + +This tool provides a comprehensive set of operations to manage the entire function lifecycle: + +- **list**: List all functions in a namespace + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + +- **get**: Get function configuration + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + +- **status**: Get runtime status of a function (instances, metrics) + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + +- **stats**: Get detailed statistics of a function (throughput, processing latency) + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + +- **create**: Deploy a new function with specified parameters + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + - `classname` (string, required): The fully qualified class name implementing the function + - `inputs` (array, required): The input topics for the function + - `output` (string, optional): The output topic for function results + - `jar` (string, optional): Path to the JAR file for Java functions + - `py` (string, optional): Path to the Python file for Python functions + - `go` (string, optional): Path to the Go binary for Go functions + - `parallelism` (number, optional): The parallelism factor of the function (default: 1) + - `userConfig` (object, optional): User-defined config key/values + +- **update**: Update an existing function + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + - Parameters similar to `create` operation + +- **delete**: Delete a function + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + +- **start**: Start a stopped function + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + +- **stop**: Stop a running function + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + +- **restart**: Restart a function + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + +- **querystate**: Query state stored by a stateful function for a specific key + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + - `key` (string, required): The state key to query + +- **putstate**: Store state in a function's state store + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + - `key` (string, required): The state key + - `value` (string, required): The state value + +- **trigger**: Manually trigger a function with a specific value + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The function name + - `topic` (string, optional): The specific topic to trigger on + - `triggerValue` (string, optional): The value to trigger the function with \ No newline at end of file diff --git a/docs/tools/pulsar_admin_namespace.md b/docs/tools/pulsar_admin_namespace.md new file mode 100644 index 0000000..c1c28bd --- /dev/null +++ b/docs/tools/pulsar_admin_namespace.md @@ -0,0 +1,37 @@ +#### pulsar_admin_namespace + +Manage Pulsar namespaces with various operations. + +- **list**: List all namespaces for a tenant + - `tenant` (string, required): The tenant name + +- **get_topics**: Get all topics within a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + +- **create**: Create a new namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + - `bundles` (string, optional): Number of bundles to activate + - `clusters` (array, optional): List of clusters to assign + +- **delete**: Delete a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + +- **clear_backlog**: Clear backlog for all topics in a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + - `subscription` (string, optional): Subscription name + - `bundle` (string, optional): Bundle name or range + - `force` (string, optional): Force clear backlog (true/false) + +- **unsubscribe**: Unsubscribe from a subscription for all topics in a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + - `subscription` (string, required): Subscription name + - `bundle` (string, optional): Bundle name or range + +- **unload**: Unload a namespace from the current serving broker + - `namespace` (string, required): The namespace name (format: tenant/namespace) + - `bundle` (string, optional): Bundle name or range + +- **split_bundle**: Split a namespace bundle + - `namespace` (string, required): The namespace name (format: tenant/namespace) + - `bundle` (string, required): Bundle name or range + - `unload` (string, optional): Unload newly split bundles (true/false) \ No newline at end of file diff --git a/docs/tools/pulsar_admin_namespace_policy.md b/docs/tools/pulsar_admin_namespace_policy.md new file mode 100644 index 0000000..999218a --- /dev/null +++ b/docs/tools/pulsar_admin_namespace_policy.md @@ -0,0 +1,41 @@ +#### pulsar_admin_namespace_policy + +Tools for managing Pulsar namespace policies. + +- **pulsar_admin_namespace_policy_get**: Get configuration policies of a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + +- **pulsar_admin_namespace_policy_set**: Set a policy for a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + - `policy` (string, required): Type of policy to set, options include: + - `message-ttl`: Sets time to live for messages + - `retention`: Sets retention policy for messages + - `permission`: Grants permissions to a role + - `replication-clusters`: Sets clusters to replicate to + - `backlog-quota`: Sets backlog quota policy + - `topic-auto-creation`: Configures automatic topic creation + - `schema-validation`: Sets schema validation enforcement + - `schema-auto-update`: Sets schema auto-update strategy + - `auto-update-schema`: Controls if schemas can be automatically updated + - `offload-threshold`: Sets threshold for data offloading + - `offload-deletion-lag`: Sets time to wait before deleting offloaded data + - `compaction-threshold`: Sets threshold for topic compaction + - `max-producers-per-topic`: Limits producers per topic + - `max-consumers-per-topic`: Limits consumers per topic + - `max-consumers-per-subscription`: Limits consumers per subscription + - `anti-affinity-group`: Sets anti-affinity group for isolation + - `persistence`: Sets persistence configuration + - `deduplication`: Controls message deduplication + - `encryption-required`: Controls message encryption + - `subscription-auth-mode`: Sets subscription auth mode + - `subscription-permission`: Grants permissions to access a subscription + - `dispatch-rate`: Sets message dispatch rate + - `replicator-dispatch-rate`: Sets replicator dispatch rate + - `subscribe-rate`: Sets subscribe rate per consumer + - `subscription-dispatch-rate`: Sets subscription dispatch rate + - `publish-rate`: Sets maximum message publish rate + - Additional parameters vary based on the policy type + +- **pulsar_admin_namespace_policy_remove**: Remove a policy from a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) + - `policy` (string, required): Type of policy to remove \ No newline at end of file diff --git a/docs/tools/pulsar_admin_nsisolationpolicy.md b/docs/tools/pulsar_admin_nsisolationpolicy.md new file mode 100644 index 0000000..41a0e6a --- /dev/null +++ b/docs/tools/pulsar_admin_nsisolationpolicy.md @@ -0,0 +1,32 @@ +#### pulsar_admin_nsisolationpolicy + +Manage namespace isolation policies in a Pulsar cluster. Namespace isolation policies enable physical isolation of namespaces by controlling which brokers specific namespaces can use. This helps provide predictable performance and resource isolation, especially in multi-tenant environments. + +This tool provides operations across three resource types: + +- **policy** (Namespace isolation policy): + - **get**: Get details of a specific isolation policy + - `cluster` (string, required): The cluster name + - `name` (string, required): Name of the isolation policy + - **list**: List all isolation policies in a cluster + - `cluster` (string, required): The cluster name + - **set**: Create or update an isolation policy + - `cluster` (string, required): The cluster name + - `name` (string, required): Name of the isolation policy + - `namespaces` (array, required): List of namespaces to apply the isolation policy + - `primary` (array, required): List of primary brokers for the namespaces + - `secondary` (array, optional): List of secondary brokers for the namespaces + - `autoFailoverPolicyType` (string, optional): Auto failover policy type (e.g., min_available) + - `autoFailoverPolicyParams` (object, optional): Auto failover policy parameters (e.g., {'min_limit': '1', 'usage_threshold': '100'}) + - **delete**: Delete an isolation policy + - `cluster` (string, required): The cluster name + - `name` (string, required): Name of the isolation policy + +- **broker** (Broker with isolation policies): + - **get**: Get details of a specific broker with its isolation policies + - `cluster` (string, required): The cluster name + - `name` (string, required): Name of the broker + +- **brokers** (All brokers with isolation policies): + - **list**: List all brokers with their isolation policies + - `cluster` (string, required): The cluster name \ No newline at end of file diff --git a/docs/tools/pulsar_admin_package.md b/docs/tools/pulsar_admin_package.md new file mode 100644 index 0000000..5c0fe86 --- /dev/null +++ b/docs/tools/pulsar_admin_package.md @@ -0,0 +1,32 @@ +#### pulsar_admin_package + +Manage packages in Apache Pulsar. Packages are reusable components that can be shared across functions, sources, and sinks. The system supports package schemes including `function://`, `source://`, and `sink://` for different component types. + +This tool provides operations across two resource types: + +- **package** (A specific package): + - **list**: List all versions of a specific package + - `packageName` (string, required): Name of the package + - **get**: Get metadata of a specific package + - `packageName` (string, required): Name of the package + - **update**: Update metadata of a specific package + - `packageName` (string, required): Name of the package + - `description` (string, required): Description of the package + - `contact` (string, optional): Contact information for the package + - `properties` (object, optional): Additional properties as key-value pairs + - **delete**: Delete a specific package + - `packageName` (string, required): Name of the package + - **download**: Download a package to local storage + - `packageName` (string, required): Name of the package + - `path` (string, required): Path to download the package to + - **upload**: Upload a package from local storage + - `packageName` (string, required): Name of the package + - `path` (string, required): Path to upload the package from + - `description` (string, required): Description of the package + - `contact` (string, optional): Contact information for the package + - `properties` (object, optional): Additional properties as key-value pairs + +- **packages** (Packages of a specific type): + - **list**: List all packages of a specific type in a namespace + - `type` (string, required): Package type (function, source, sink) + - `namespace` (string, required): The namespace name \ No newline at end of file diff --git a/docs/tools/pulsar_admin_resourcequota.md b/docs/tools/pulsar_admin_resourcequota.md new file mode 100644 index 0000000..e5b349b --- /dev/null +++ b/docs/tools/pulsar_admin_resourcequota.md @@ -0,0 +1,24 @@ +#### pulsar_admin_resourcequota + +Manage Apache Pulsar resource quotas for brokers, namespaces and bundles. Resource quotas define limits for resource usage such as message rates, bandwidth, and memory. These quotas help prevent resource abuse and ensure fair resource allocation across the Pulsar cluster. + +This tool provides operations on the following resource: + +- **quota** (Resource quota configuration): + - **get**: Get resource quota for a namespace bundle or the default quota + - `namespace` (string, optional): The namespace name in format 'tenant/namespace' + - `bundle` (string, optional): The bundle range in format '{start-boundary}_{end-boundary}' + - Note: If namespace and bundle are both omitted, returns the default quota + - Note: If namespace is specified, bundle must also be specified and vice versa + - **set**: Set resource quota for a namespace bundle or the default quota + - `namespace` (string, optional): The namespace name in format 'tenant/namespace' + - `bundle` (string, optional): The bundle range in format '{start-boundary}_{end-boundary}' + - `msgRateIn` (number, required): Maximum incoming messages per second + - `msgRateOut` (number, required): Maximum outgoing messages per second + - `bandwidthIn` (number, required): Maximum inbound bandwidth in bytes per second + - `bandwidthOut` (number, required): Maximum outbound bandwidth in bytes per second + - `memory` (number, required): Maximum memory usage in Mbytes + - `dynamic` (boolean, optional): Whether to allow quota to be dynamically re-calculated + - **reset**: Reset a namespace bundle's resource quota to default value + - `namespace` (string, required): The namespace name in format 'tenant/namespace' + - `bundle` (string, required): The bundle range in format '{start-boundary}_{end-boundary}' \ No newline at end of file diff --git a/docs/tools/pulsar_admin_schema.md b/docs/tools/pulsar_admin_schema.md new file mode 100644 index 0000000..f6ec821 --- /dev/null +++ b/docs/tools/pulsar_admin_schema.md @@ -0,0 +1,13 @@ +#### pulsar_admin_schema + +Manage Apache Pulsar schemas for topics. + +- **schema** + - **get**: Get the schema for a topic + - `topic` (string, required): The fully qualified topic name + - `version` (number, optional): Schema version number + - **upload**: Upload a new schema for a topic + - `topic` (string, required): The fully qualified topic name + - `filename` (string, required): Path to the schema definition file + - **delete**: Delete the schema for a topic + - `topic` (string, required): The fully qualified topic name \ No newline at end of file diff --git a/docs/tools/pulsar_admin_sinks.md b/docs/tools/pulsar_admin_sinks.md new file mode 100644 index 0000000..f957087 --- /dev/null +++ b/docs/tools/pulsar_admin_sinks.md @@ -0,0 +1,64 @@ +#### pulsar_admin_sinks + +Manage Apache Pulsar Sinks for data movement and integration. Pulsar Sinks are connectors that export data from Pulsar topics to external systems such as databases, storage services, messaging systems, and third-party applications. Sinks consume messages from one or more Pulsar topics, transform the data if needed, and write it to external systems in a format compatible with the target destination. + +This tool provides complete lifecycle management for sink connectors: + +- **list**: List all sinks in a namespace + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + +- **get**: Get sink configuration + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + +- **status**: Get runtime status of a sink (instances, metrics) + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + +- **create**: Deploy a new sink connector + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + - Either `archive` or `sink-type` must be specified (but not both): + - `archive` (string): Path to the archive file containing sink code + - `sink-type` (string): Built-in connector type to use (e.g., 'jdbc', 'elastic-search', 'kafka') + - Either `inputs` or `topics-pattern` must be specified: + - `inputs` (array): The sink's input topics (array of strings) + - `topics-pattern` (string): TopicsPattern to consume from topics matching the pattern (regex) + - `subs-name` (string, optional): Pulsar subscription name for input topic consumer + - `parallelism` (number, optional): Number of instances to run concurrently (default: 1) + - `sink-config` (object, optional): Connector-specific configuration parameters + +- **update**: Update an existing sink connector + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + - Parameters similar to `create` operation (all optional during update) + +- **delete**: Delete a sink + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + +- **start**: Start a stopped sink + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + +- **stop**: Stop a running sink + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + +- **restart**: Restart a sink + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The sink name + +- **list-built-in**: List all built-in sink connectors available in the system + - No parameters required + +Built-in sink connectors are available for common systems like Kafka, JDBC, Elasticsearch, and cloud storage. Sinks follow the tenant/namespace/name hierarchy for organization and access control, can scale through parallelism configuration, and support configurable subscription types. Sinks require proper permissions to access their input topics. \ No newline at end of file diff --git a/docs/tools/pulsar_admin_sources.md b/docs/tools/pulsar_admin_sources.md new file mode 100644 index 0000000..cc2e738 --- /dev/null +++ b/docs/tools/pulsar_admin_sources.md @@ -0,0 +1,65 @@ +#### pulsar_admin_sources + +Manage Apache Pulsar Sources for data ingestion and integration. Pulsar Sources are connectors that import data from external systems into Pulsar topics. Sources connect to external systems such as databases, messaging platforms, storage services, and real-time data streams to pull data and publish it to Pulsar topics. + +This tool provides complete lifecycle management for source connectors: + +- **list**: List all sources in a namespace + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + +- **get**: Get source configuration + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + +- **status**: Get runtime status of a source (instances, metrics) + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + +- **create**: Deploy a new source connector + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + - `destination-topic-name` (string, required): Topic where data will be written + - Either `archive` or `source-type` must be specified (but not both): + - `archive` (string): Path to the archive file containing source code + - `source-type` (string): Built-in connector type to use (e.g., 'kafka', 'jdbc') + - `deserialization-classname` (string, optional): SerDe class for the source + - `schema-type` (string, optional): Schema type for encoding messages (e.g., 'avro', 'json') + - `classname` (string, optional): Source class name if using custom implementation + - `processing-guarantees` (string, optional): Delivery semantics ('atleast_once', 'atmost_once', 'effectively_once') + - `parallelism` (number, optional): Number of instances to run concurrently (default: 1) + - `source-config` (object, optional): Connector-specific configuration parameters + +- **update**: Update an existing source connector + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + - Parameters similar to `create` operation (all optional during update) + +- **delete**: Delete a source + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + +- **start**: Start a stopped source + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + +- **stop**: Stop a running source + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + +- **restart**: Restart a source + - `tenant` (string, required): The tenant name + - `namespace` (string, required): The namespace name + - `name` (string, required): The source name + +- **list-built-in**: List all built-in source connectors available in the system + - No parameters required + +Built-in source connectors are available for common systems like Kafka, JDBC, AWS services, and more. Sources follow the tenant/namespace/name hierarchy for organization and access control, can scale through parallelism configuration, and support various processing guarantees. \ No newline at end of file diff --git a/docs/tools/pulsar_admin_subscription.md b/docs/tools/pulsar_admin_subscription.md new file mode 100644 index 0000000..0769ccd --- /dev/null +++ b/docs/tools/pulsar_admin_subscription.md @@ -0,0 +1,25 @@ +#### pulsar_admin_subscription + +Manage Pulsar topic subscriptions, which represent consumer groups reading from topics. + +- **list**: List all subscriptions for a topic + - `topic` (string, required): The fully qualified topic name +- **create**: Create a new subscription + - `topic` (string, required): The fully qualified topic name + - `subscription` (string, required): The subscription name + - `message_id` (string, optional): Initial position, default is latest +- **delete**: Delete a subscription + - `topic` (string, required): The fully qualified topic name + - `subscription` (string, required): The subscription name +- **skip**: Skip messages for a subscription + - `topic` (string, required): The fully qualified topic name + - `subscription` (string, required): The subscription name + - `count` (number, required): Number of messages to skip +- **expire**: Expire messages for a subscription + - `topic` (string, required): The fully qualified topic name + - `subscription` (string, required): The subscription name + - `expiry_time` (string, required): Expiry time in seconds +- **reset-cursor**: Reset subscription position + - `topic` (string, required): The fully qualified topic name + - `subscription` (string, required): The subscription name + - `message_id` (string, required): Message ID to reset to \ No newline at end of file diff --git a/docs/tools/pulsar_admin_tenant.md b/docs/tools/pulsar_admin_tenant.md new file mode 100644 index 0000000..9a9f5e7 --- /dev/null +++ b/docs/tools/pulsar_admin_tenant.md @@ -0,0 +1,17 @@ +#### pulsar_admin_tenant + +Manage Pulsar tenants, which are the highest level administrative units. + +- **list**: List all tenants in the Pulsar instance +- **get**: Get configuration details for a specific tenant + - `tenant` (string, required): The tenant name +- **create**: Create a new tenant + - `tenant` (string, required): The tenant name + - `admin_roles` (array, optional): List of roles with admin permissions + - `allowed_clusters` (array, required): List of clusters tenant can access +- **update**: Update configuration for an existing tenant + - `tenant` (string, required): The tenant name + - `admin_roles` (array, optional): List of roles with admin permissions + - `allowed_clusters` (array, required): List of clusters tenant can access +- **delete**: Delete a tenant + - `tenant` (string, required): The tenant name \ No newline at end of file diff --git a/docs/tools/pulsar_admin_topic.md b/docs/tools/pulsar_admin_topic.md new file mode 100644 index 0000000..1f88d8c --- /dev/null +++ b/docs/tools/pulsar_admin_topic.md @@ -0,0 +1,48 @@ +#### pulsar_admin_topic + +Manage Apache Pulsar topics. Topics are the core messaging entities in Pulsar that store and transmit messages. Pulsar supports two types of topics: persistent (durable storage with guaranteed delivery) and non-persistent (in-memory with at-most-once delivery). Topics can be partitioned for parallel processing and higher throughput. + +- **topic** + - **get**: Get metadata for a topic + - `topic` (string, required): The fully qualified topic name + - **create**: Create a new topic with optional partitions + - `topic` (string, required): The fully qualified topic name + - `partitions` (number, required): The number of partitions (0 for non-partitioned) + - **delete**: Delete a topic + - `topic` (string, required): The fully qualified topic name + - `force` (boolean, optional): Force operation even if it disrupts producers/consumers + - `non-partitioned` (boolean, optional): Delete only the non-partitioned topic with the same name + - **stats**: Get statistics for a topic + - `topic` (string, required): The fully qualified topic name + - `partitioned` (boolean, optional): Get stats for a partitioned topic + - `per-partition` (boolean, optional): Include per-partition stats + - **lookup**: Look up the broker serving a topic + - `topic` (string, required): The fully qualified topic name + - **internal-stats**: Get internal stats for a topic + - `topic` (string, required): The fully qualified topic name + - **internal-info**: Get internal info for a topic + - `topic` (string, required): The fully qualified topic name + - **bundle-range**: Get the bundle range of a topic + - `topic` (string, required): The fully qualified topic name + - **last-message-id**: Get the last message ID of a topic + - `topic` (string, required): The fully qualified topic name + - **status**: Get the status of a topic + - `topic` (string, required): The fully qualified topic name + - **unload**: Unload a topic from broker memory + - `topic` (string, required): The fully qualified topic name + - **terminate**: Terminate a topic (close all producers and mark as inactive) + - `topic` (string, required): The fully qualified topic name + - **compact**: Trigger compaction on a topic + - `topic` (string, required): The fully qualified topic name + - **update**: Update the number of partitions for a topic + - `topic` (string, required): The fully qualified topic name + - `partitions` (number, required): The new number of partitions + - **offload**: Offload data from a topic to long-term storage + - `topic` (string, required): The fully qualified topic name + - `messageId` (string, required): Message ID up to which to offload (format: ledgerId:entryId) + - **offload-status**: Check the status of data offloading for a topic + - `topic` (string, required): The fully qualified topic name + +- **topics** + - **list**: List all topics in a namespace + - `namespace` (string, required): The namespace name (format: tenant/namespace) \ No newline at end of file diff --git a/docs/tools/pulsar_admin_topic_policy.md b/docs/tools/pulsar_admin_topic_policy.md new file mode 100644 index 0000000..da28715 --- /dev/null +++ b/docs/tools/pulsar_admin_topic_policy.md @@ -0,0 +1,14 @@ +#### pulsar_admin_topic_policy + +Manage Pulsar topic-level policies, which override namespace-level policies. + +- **get**: Get policies for a topic + - `topic` (string, required): The fully qualified topic name + - `policy_type` (string, required): Type of policy to retrieve +- **set**: Set a policy for a topic + - `topic` (string, required): The fully qualified topic name + - `policy_type` (string, required): Type of policy to set + - `policy_value` (string/object, required): Policy value +- **delete**: Remove a policy from a topic + - `topic` (string, required): The fully qualified topic name + - `policy_type` (string, required): Type of policy to remove \ No newline at end of file diff --git a/docs/tools/pulsar_client_consume.md b/docs/tools/pulsar_client_consume.md new file mode 100644 index 0000000..1cc9f88 --- /dev/null +++ b/docs/tools/pulsar_client_consume.md @@ -0,0 +1,17 @@ +#### pulsar_client_consume + +Consume messages from a Pulsar topic. This tool allows you to consume messages from a specified Pulsar topic with various options to control the subscription behavior, message processing, and display format. + +- **pulsar_client_consume** + - `topic` (string, required): Topic to consume from + - `subscription-name` (string, required): Subscription name + - `subscription-type` (string, optional): Subscription type (default: exclusive) + - Options: exclusive, shared, failover, key_shared + - `subscription-mode` (string, optional): Subscription mode (default: durable) + - Options: durable, non-durable + - `initial-position` (string, optional): Initial position (default: latest) + - Options: latest (consume from the latest message), earliest (consume from the earliest message) + - `num-messages` (number, optional): Number of messages to consume (0 for unlimited, default: 0) + - `timeout` (number, optional): Timeout for consuming messages in seconds (default: 30) + - `show-properties` (boolean, optional): Show message properties (default: false) + - `hide-payload` (boolean, optional): Hide message payload (default: false) \ No newline at end of file diff --git a/docs/tools/pulsar_client_produce.md b/docs/tools/pulsar_client_produce.md new file mode 100644 index 0000000..5601d00 --- /dev/null +++ b/docs/tools/pulsar_client_produce.md @@ -0,0 +1,14 @@ +#### pulsar_client_produce + +Produce messages to a Pulsar topic. This tool allows you to send messages to a specified Pulsar topic with various options to control message format, batching, and properties. + +- **pulsar_client_produce** + - `topic` (string, required): Topic to produce to + - `messages` (array, required): Messages to send. Specify multiple times for multiple messages + - `num-produce` (number, optional): Number of times to send message(s) (default: 1) + - `rate` (number, optional): Rate (in msg/sec) at which to produce, 0 means produce as fast as possible (default: 0) + - `disable-batching` (boolean, optional): Disable batch sending of messages (default: false) + - `chunking` (boolean, optional): Should split the message and publish in chunks if message size is larger than allowed max size (default: false) + - `separator` (string, optional): Character to split messages string on (default: none) + - `properties` (array, optional): Properties to add, key=value format. Specify multiple times for multiple properties + - `key` (string, optional): Partitioning key to add to each message \ No newline at end of file diff --git a/docs/tools/streamnative_cloud.md b/docs/tools/streamnative_cloud.md new file mode 100644 index 0000000..8307707 --- /dev/null +++ b/docs/tools/streamnative_cloud.md @@ -0,0 +1,80 @@ +#### sncloud_context_available_clusters + +Display the available Pulsar clusters for the current organization on StreamNative Cloud. This information helps you select the appropriate cluster when setting your context. + +- **sncloud_context_available_clusters** + - No parameters required + +You can use `sncloud_context_use_cluster` to change the context to a specific cluster. You will need to ask for user confirmation of the target context cluster if there are multiple clusters available. + +--- + +#### sncloud_context_use_cluster + +Set the current context to a specific StreamNative Cloud cluster. Once you set the context, you can use Pulsar and Kafka tools to interact with the cluster. + +- **sncloud_context_use_cluster** + - `instanceName` (string, required): The name of the Pulsar instance to use + - `clusterName` (string, required): The name of the Pulsar cluster to use + +If you encounter `ContextNotSetErr`, please use `sncloud_context_available_clusters` to list the available clusters and set the context to a specific cluster. + +--- + +#### sncloud_context_whoami + +Display the currently logged-in service account. Returns the name of the authenticated service account and the organization. + +- **sncloud_context_whoami** + - No parameters required + +This tool returns a JSON object containing the service account name and organization. + +--- + +#### sncloud_logs + +Display logs of resources in StreamNative Cloud, including Pulsar functions, source connectors, sink connectors, and Kafka Connect connectors. This tool helps debug issues with resources in StreamNative Cloud and works with the current context cluster. + +- **sncloud_logs** + - `component` (string, required): The component type to get logs from + - Options: sink, source, function, kafka-connect + - `name` (string, required): The name of the resource to get logs from + - `tenant` (string, required): The Pulsar tenant of the resource (default: "public") + - Required for Pulsar functions, sources, and sinks + - Optional for Kafka Connect connectors + - `namespace` (string, required): The Pulsar namespace of the resource (default: "default") + - Required for Pulsar functions, sources, and sinks + - Optional for Kafka Connect connectors + - `size` (string, optional): Number of log lines to retrieve (default: "20") + - `replica_id` (number, optional): The replica index for resources with multiple replicas (default: -1, which means all replicas) + - `timestamp` (string, optional): Start timestamp of logs in milliseconds (e.g., "1662430984225") + - `since` (string, optional): Retrieve logs from a relative time in the past (e.g., "1h" for one hour ago) + - `previous_container` (boolean, optional): Return logs from previously terminated container (default: false) + +--- + +#### sncloud_resources_apply + +Apply (create or update) StreamNative Cloud resources from JSON definitions. This tool can be used to manage infrastructure resources such as PulsarInstances and PulsarClusters in your StreamNative Cloud organization. + +- **sncloud_resources_apply** + - `json_content` (string, required): The JSON content to apply, defining the resource according to the StreamNative Cloud API schema + - `dry_run` (boolean, optional): If true, only validate the resource without applying it to the server (default: false) + +Supported resource types: +- PulsarInstance (apiVersion: cloud.streamnative.io/v1alpha1) +- PulsarCluster (apiVersion: cloud.streamnative.io/v1alpha1) + +--- + +#### sncloud_resources_delete + +Delete StreamNative Cloud resources. This tool removes resources from your StreamNative Cloud organization. + +- **sncloud_resources_delete** + - `name` (string, required): The name of the resource to delete + - `type` (string, required): The type of the resource to delete + - Options: PulsarInstance, PulsarCluster + +This is a destructive operation that cannot be undone. Use with caution. \ No newline at end of file diff --git a/pkg/mcp/pulsar_client_consume_tools.go b/pkg/mcp/pulsar_client_consume_tools.go index 361911f..ec71b23 100644 --- a/pkg/mcp/pulsar_client_consume_tools.go +++ b/pkg/mcp/pulsar_client_consume_tools.go @@ -59,35 +59,17 @@ func PulsarClientAddConsumerTools(s *server.MCPServer, _ bool, features []string ", earliest (consume from the earliest message) (default: latest)"), ), mcp.WithNumber("num-messages", - mcp.Description("Number of messages to consume (0 for unlimited, default: 0)"), + mcp.Description("Number of messages to consume (default: 10)"), ), mcp.WithNumber("timeout", mcp.Description("Timeout for consuming messages in seconds (default: 30)"), ), - mcp.WithBoolean("regex", - mcp.Description("Treat the topic as a regex pattern (default: false)"), - ), - mcp.WithString("schema", - mcp.Description("Schema type: string, json, avro, protobuf"), - ), - mcp.WithNumber("max-redeliver-count", - mcp.Description("Maximum redelivery count for dead letter queue (0 to disable, default: 0)"), - ), - mcp.WithString("dlq-topic", - mcp.Description("Dead letter queue topic"), - ), mcp.WithBoolean("show-properties", mcp.Description("Show message properties (default: false)"), ), mcp.WithBoolean("hide-payload", mcp.Description("Hide message payload (default: false)"), ), - mcp.WithBoolean("read-compacted", - mcp.Description("Read compacted topic (default: false)"), - ), - mcp.WithNumber("receiver-queue-size", - mcp.Description("Size of the consumer receive queue (default: 1000)"), - ), ) s.AddTool(consumeTool, handleClientConsume) } @@ -120,7 +102,7 @@ func handleClientConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp initialPosition = val } - numMessages := 0 + numMessages := 10 if val, exists := optionalParam[float64](request.Params.Arguments, "num-messages"); exists { numMessages = int(val) } @@ -130,21 +112,6 @@ func handleClientConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp timeout = int(val) } - regex := false - if val, exists := optionalParam[bool](request.Params.Arguments, "regex"); exists { - regex = val - } - - var maxRedeliverCount int32 - if val, exists := optionalParam[float64](request.Params.Arguments, "max-redeliver-count"); exists { - maxRedeliverCount = int32(val) - } - - dlqTopic := "" - if val, exists := optionalParam[string](request.Params.Arguments, "dlq-topic"); exists { - dlqTopic = val - } - showProperties := false if val, exists := optionalParam[bool](request.Params.Arguments, "show-properties"); exists { showProperties = val @@ -155,16 +122,6 @@ func handleClientConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp hidePayload = val } - readCompacted := false - if val, exists := optionalParam[bool](request.Params.Arguments, "read-compacted"); exists { - readCompacted = val - } - - receiverQueueSize := 1000 - if val, exists := optionalParam[float64](request.Params.Arguments, "receiver-queue-size"); exists { - receiverQueueSize = int(val) - } - // Setup client client, err := mcppulsar.GetPulsarClient() if err != nil { @@ -174,18 +131,11 @@ func handleClientConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp // Prepare consumer options consumerOpts := pulsar.ConsumerOptions{ - Name: "snmcp-consumer", - ReceiverQueueSize: receiverQueueSize, - ReadCompacted: readCompacted, - SubscriptionName: subscriptionName, + Name: "snmcp-consumer", + SubscriptionName: subscriptionName, } - // Set topic or topics pattern - if regex { - consumerOpts.TopicsPattern = topic - } else { - consumerOpts.Topic = topic - } + consumerOpts.Topic = topic // Set subscription type switch strings.ToLower(subscriptionType) { @@ -221,19 +171,6 @@ func handleClientConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp return mcp.NewToolResultError(fmt.Sprintf("Invalid initial position: %s", initialPosition)), nil } - // Set DLQ policy if specified - if maxRedeliverCount > 0 { - if dlqTopic == "" { - return mcp.NewToolResultError("DLQ topic is required when max-redeliver-count is specified"), nil - } - consumerOpts.DLQ = &pulsar.DLQPolicy{ - //nolint:gosec - MaxDeliveries: uint32(maxRedeliverCount), - DeadLetterTopic: dlqTopic, - RetryLetterTopic: fmt.Sprintf("%s-retry", dlqTopic), - } - } - // Create consumer consumer, err := client.Subscribe(consumerOpts) if err != nil { diff --git a/pkg/mcp/pulsar_client_produce_tools.go b/pkg/mcp/pulsar_client_produce_tools.go index a1d7a0d..16d33d4 100644 --- a/pkg/mcp/pulsar_client_produce_tools.go +++ b/pkg/mcp/pulsar_client_produce_tools.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "os" "slices" "strings" "time" @@ -49,9 +48,6 @@ func PulsarClientAddProducerTools(s *server.MCPServer, _ bool, features []string mcp.WithArray("messages", mcp.Description("Messages to send. Specify multiple times for multiple messages. IMPORTANT: Use this parameter to provide message content."), ), - mcp.WithArray("files", - mcp.Description("Files to send as message content. Specify multiple times for multiple files."), - ), mcp.WithNumber("num-produce", mcp.Description("Number of times to send message(s) (default: 1)"), ), @@ -94,16 +90,7 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp } } - files := []string{} - if val, exists := optionalParam[[]interface{}](request.Params.Arguments, "files"); exists && len(val) > 0 { - for _, f := range val { - if strFile, ok := f.(string); ok { - files = append(files, strFile) - } - } - } - - if len(messages) == 0 && len(files) == 0 { + if len(messages) == 0 { return mcp.NewToolResultError("Please supply message content with 'messages' parameter."), nil } @@ -187,7 +174,7 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp defer producer.Close() // Generate message bodies from messages and files - messagePayloads, err := generateMessagePayloads(messages, files) + messagePayloads, err := generateMessagePayloads(messages) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to generate message payloads: %v", err)), nil } @@ -262,8 +249,8 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp return mcp.NewToolResultText(string(jsonBytes)), nil } -// generateMessagePayloads generates message payloads from message strings and files -func generateMessagePayloads(messages []string, files []string) ([][]byte, error) { +// generateMessagePayloads generates message payloads from message strings +func generateMessagePayloads(messages []string) ([][]byte, error) { var payloads [][]byte // Add message strings @@ -271,14 +258,5 @@ func generateMessagePayloads(messages []string, files []string) ([][]byte, error payloads = append(payloads, []byte(msg)) } - // Add file contents - for _, file := range files { - data, err := os.ReadFile(file) - if err != nil { - return nil, fmt.Errorf("failed to read file %s: %w", file, err) - } - payloads = append(payloads, data) - } - return payloads, nil }