Skip to content

Commit cd9ed64

Browse files
committed
use string as default schema if topic do not have one
1 parent 2f46d42 commit cd9ed64

File tree

2 files changed

+25
-15
lines changed

2 files changed

+25
-15
lines changed

docs/tools/functions_as_tools.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ To enable this functionality, you need to specific the default `--pulsar-instanc
2626

2727
Example:
2828
```bash
29-
snmcp stdio --organization my-org --key-file /path/to/key-file.json --features pulsar-admin,pulsar-client,functions-as-tools --pulsar-instance instance --pulsar-cluster cluster
29+
snmcp sse --organization my-org --key-file /path/to/key-file.json --features pulsar-admin,pulsar-client,functions-as-tools --pulsar-instance instance --pulsar-cluster cluster
3030
```
3131
If `functions-as-tools` is part of a broader feature set like `all` and `streamnative-cloud`, enabling `all` or `streamnative-cloud` would also activate this feature.
3232

pkg/pftools/manager.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import (
2727
"time"
2828

2929
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
30+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
3031
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
32+
cliutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
3133
"github.com/google/go-cmp/cmp"
3234
"github.com/mark3labs/mcp-go/mcp"
3335
"github.com/mark3labs/mcp-go/server"
@@ -41,6 +43,16 @@ const (
4143
CustomRuntimeOptionsEnvMcpToolDescriptionKey = "MCP_TOOL_DESCRIPTION"
4244
)
4345

46+
var DefaultStringSchemaInfo = &SchemaInfo{
47+
Type: "STRING",
48+
Definition: map[string]interface{}{
49+
"type": "string",
50+
},
51+
PulsarSchemaInfo: &cliutils.SchemaInfo{
52+
Type: "STRING",
53+
},
54+
}
55+
4456
// NewPulsarFunctionManager creates a new PulsarFunctionManager
4557
func NewPulsarFunctionManager(mcpServer *server.MCPServer, readOnly bool, options *ManagerOptions) (*PulsarFunctionManager, error) {
4658
// Get Pulsar client and admin client
@@ -295,15 +307,14 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
295307
// Get schema for input topic
296308
inputSchema, err := GetSchemaFromTopic(m.v2adminClient, inputTopic)
297309
if err != nil {
298-
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
299310
// Continue with a default schema
300-
inputSchema = &SchemaInfo{
301-
Type: "STRING",
302-
Definition: map[string]interface{}{
303-
"type": "string",
304-
},
311+
inputSchema = DefaultStringSchemaInfo
312+
if restError, ok := err.(rest.Error); ok {
313+
if restError.Code != 404 {
314+
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
315+
schemaFetchSuccess = false
316+
}
305317
}
306-
schemaFetchSuccess = false
307318
}
308319

309320
// Get output topic and schema
@@ -312,15 +323,14 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
312323
if outputTopic != "" {
313324
outputSchema, err = GetSchemaFromTopic(m.v2adminClient, outputTopic)
314325
if err != nil {
315-
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
316326
// Continue with a default schema
317-
outputSchema = &SchemaInfo{
318-
Type: "STRING",
319-
Definition: map[string]interface{}{
320-
"type": "string",
321-
},
327+
outputSchema = DefaultStringSchemaInfo
328+
if restError, ok := err.(rest.Error); ok {
329+
if restError.Code != 404 {
330+
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
331+
schemaFetchSuccess = false
332+
}
322333
}
323-
schemaFetchSuccess = false
324334
}
325335
}
326336

0 commit comments

Comments
 (0)