Skip to content

Commit 8716d3c

Browse files
Merge pull request #10 from redpanda-data/proto-updates
proto: sync protos with connect repo
2 parents 319d9d7 + b96f2d1 commit 8716d3c

File tree

16 files changed

+850
-514
lines changed

16 files changed

+850
-514
lines changed

Taskfile.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,4 @@ tasks:
8181
--grpc_python_out={{.OUT_DIR}} \
8282
--mypy_grpc_out={{.OUT_DIR}} \
8383
-I proto \
84-
proto/redpanda/runtime/proto/runtime.proto
84+
proto/redpanda/runtime/v1alpha1/*.proto

proto/redpanda/runtime/proto/runtime.proto renamed to proto/redpanda/runtime/v1alpha1/agent.proto

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,59 +16,18 @@ syntax = "proto3";
1616

1717
package redpanda.runtime.v1alpha1;
1818

19-
option go_package = "internal/agent/runtimepb";
19+
option go_package = "github.com/redpanda-data/connect/v4/internal/agent/runtimepb";
2020

2121
import "google/protobuf/timestamp.proto";
22-
23-
// `NullValue` is a representation of a null value.
24-
enum NullValue { NULL_VALUE = 0; }
25-
26-
// `StructValue` represents a struct value which can be used to represent a
27-
// structured data value.
28-
message StructValue {
29-
map<string, Value> fields = 1;
30-
}
31-
32-
// `ListValue` represents a list value which can be used to represent a list of
33-
// values.
34-
message ListValue {
35-
repeated Value values = 1;
36-
}
37-
38-
// `Value` represents a dynamically typed value which can be used to represent
39-
// a value passed to an agent.
40-
message Value {
41-
oneof kind {
42-
NullValue null_value = 1;
43-
string string_value = 2;
44-
int64 integer_value = 3;
45-
double double_value = 4;
46-
bool bool_value = 5;
47-
google.protobuf.Timestamp timestamp_value = 6;
48-
bytes bytes_value = 7;
49-
StructValue struct_value = 8;
50-
ListValue list_value = 9;
51-
}
52-
}
53-
54-
// Message represents a piece of structured data that flows through the runtime.
55-
message Message {
56-
oneof payload {
57-
bytes serialized = 1;
58-
Value structured = 2;
59-
}
60-
StructValue metadata = 3;
61-
}
22+
import "redpanda/runtime/v1alpha1/message.proto";
6223

6324
message TraceContext {
6425
string trace_id = 1;
6526
string span_id = 2;
6627
string trace_flags = 4;
6728
}
6829

69-
message Trace {
70-
repeated Span spans = 1;
71-
}
30+
message Trace { repeated Span spans = 1; }
7231

7332
message Span {
7433
string span_id = 1;
@@ -93,7 +52,7 @@ message InvokeAgentResponse {
9352
Trace trace = 2;
9453
}
9554

96-
// `Runtime` is the service that provides the ability to invoke an agent.
97-
service Runtime {
55+
// `AgentRuntime` is the service that provides the ability to invoke an agent.
56+
service AgentRuntime {
9857
rpc InvokeAgent(InvokeAgentRequest) returns (InvokeAgentResponse);
9958
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2025 Redpanda Data, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
syntax = "proto3";
16+
17+
package redpanda.runtime.v1alpha1;
18+
19+
option go_package = "github.com/redpanda-data/connect/v4/internal/rpcplugin/runtimepb";
20+
21+
import "google/protobuf/timestamp.proto";
22+
import "google/protobuf/duration.proto";
23+
24+
// `NullValue` is a representation of a null value.
25+
enum NullValue {
26+
NULL_VALUE = 0;
27+
}
28+
29+
// `StructValue` represents a struct value which can be used to represent a
30+
// structured data value.
31+
message StructValue { map<string, Value> fields = 1; }
32+
33+
// `ListValue` represents a list value which can be used to represent a list of
34+
// values.
35+
message ListValue { repeated Value values = 1; }
36+
37+
// `Value` represents a dynamically typed value which can be used to represent
38+
// a value within a Redpanda Connect pipeline.
39+
message Value {
40+
oneof kind {
41+
NullValue null_value = 1;
42+
string string_value = 2;
43+
int64 integer_value = 3;
44+
double double_value = 4;
45+
bool bool_value = 5;
46+
google.protobuf.Timestamp timestamp_value = 6;
47+
bytes bytes_value = 7;
48+
StructValue struct_value = 8;
49+
ListValue list_value = 9;
50+
}
51+
}
52+
53+
// An error in the context of a data pipeline.
54+
message Error {
55+
// The error message. If non empty, then the error to be "valid" and
56+
// if empty the error is ignored as if a success (due to proto3 empty
57+
// semantics).
58+
string message = 1;
59+
// NotConnected is returned by inputs and outputs when their Read or
60+
// Write methods are called and the connection that they maintain is lost.
61+
// This error prompts the upstream component to call Connect until the
62+
// connection is re-established.
63+
message NotConnected {}
64+
// EndOfInput is returned by inputs that have exhausted their source of
65+
// data to the point where subsequent Read calls will be ineffective. This
66+
// error prompts the upstream component to gracefully terminate the
67+
// pipeline.
68+
message EndOfInput {}
69+
// Additional error details for specific Redpanda Connect behavior.
70+
// If one of these fields is set, then message must be non-empty.
71+
oneof detail {
72+
// BackOff is an error that plugins can optionally wrap another error with
73+
// which instructs upstream components to wait for a specified period of
74+
// time before retrying the errored call.
75+
//
76+
// Only suppported by Connect methods in the Input and Output services.
77+
google.protobuf.Duration backoff = 2;
78+
NotConnected not_connected = 3;
79+
EndOfInput end_of_input = 4;
80+
}
81+
}
82+
83+
// Message represents a piece of data or an event that flows through the
84+
// runtime.
85+
message Message {
86+
oneof payload {
87+
bytes bytes = 1;
88+
Value structured = 2;
89+
}
90+
StructValue metadata = 3;
91+
Error error = 4;
92+
}
93+
94+
message MessageBatch { repeated Message messages = 1; }

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ dependencies = [
1111
"aiohttp>=3.11.16",
1212
"grpcio>=1.71.0",
1313
"grpcio-health-checking>=1.71.0",
14-
"litellm>=1.63.14",
14+
"litellm>=1.70.0",
1515
"mcp>=1.5.0",
1616
"opentelemetry-api>=1.32.1",
1717
"opentelemetry-sdk>=1.32.1",
@@ -45,7 +45,7 @@ redpanda = { workspace = true }
4545
[tool.ruff]
4646
line-length = 100
4747
target-version = "py39"
48-
exclude = ["proto"]
48+
exclude = ["v1alpha1"]
4949

5050
[tool.ruff.lint]
5151
select = [
@@ -74,7 +74,7 @@ packages = ["src/redpanda"]
7474

7575
[tool.pyright]
7676
exclude = [
77-
"src/redpanda/runtime/proto/**",
77+
"src/redpanda/runtime/v1alpha1/**",
7878
"**/__pycache__",
7979
"**/.*",
8080
]

src/redpanda/runtime/_grpc.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@
2727
from pydantic import BaseModel
2828

2929
from redpanda.agents import Agent
30-
from redpanda.runtime.proto import runtime_pb2 as pb, runtime_pb2_grpc as grpcpb
30+
from redpanda.runtime.v1alpha1 import (
31+
agent_pb2 as pb,
32+
agent_pb2_grpc as grpcpb,
33+
message_pb2 as msg_pb,
34+
)
3135

3236
from ._otel import convert_spans, current_spans_context_var
3337

3438

35-
def _serialize_payload(payload: pb.Value) -> str:
39+
def _serialize_payload(payload: msg_pb.Value) -> str:
3640
kind = payload.WhichOneof("kind")
3741
if kind == "bool_value":
3842
return "true" if payload.bool_value else "false"
@@ -67,7 +71,7 @@ def _serialize_payload(payload: pb.Value) -> str:
6771
raise ValueError(f"Unknown payload kind: {kind}")
6872

6973

70-
class RuntimeServer(grpcpb.RuntimeServicer):
74+
class RuntimeServer(grpcpb.AgentRuntimeServicer):
7175
agent: Agent
7276
tracer: trace.Tracer
7377

@@ -97,16 +101,16 @@ async def InvokeAgent(
97101
if request.message.WhichOneof("payload") == "structured":
98102
payload = _serialize_payload(request.message.structured)
99103
else:
100-
payload = request.message.serialized.decode("utf-8")
104+
payload = request.message.bytes.decode("utf-8")
101105
with self.tracer.start_as_current_span("agent_invoke", context=trace_ctx):
102106
output = await self.agent.run(input=payload)
103107
if isinstance(output, BaseModel):
104108
output = output.model_dump_json()
105109
elif not isinstance(output, str):
106110
output = json.dumps(output)
107111
return pb.InvokeAgentResponse(
108-
message=pb.Message(
109-
serialized=output.encode("utf-8"),
112+
message=msg_pb.Message(
113+
bytes=output.encode("utf-8"),
110114
metadata=request.message.metadata,
111115
),
112116
trace=pb.Trace(spans=convert_spans(spans)) if trace_ctx else None,
@@ -126,7 +130,7 @@ async def serve_main(runtime_server: RuntimeServer):
126130
health_pb2.HealthCheckResponse.ServingStatus.Value("SERVING"),
127131
)
128132
server = grpc.aio.server()
129-
grpcpb.add_RuntimeServicer_to_server(
133+
grpcpb.add_AgentRuntimeServicer_to_server(
130134
runtime_server,
131135
server,
132136
)

src/redpanda/runtime/_otel.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from opentelemetry.sdk.trace import export as traceexport
2424
from opentelemetry.util import types as oteltypes
2525

26-
from redpanda.runtime.proto import runtime_pb2 as pb
26+
from redpanda.runtime.v1alpha1 import agent_pb2 as pb, message_pb2 as msg_pb
2727

2828

2929
def _proto_timestamp_from_time_ns(time_ns: int | None) -> PbTimestamp:
@@ -33,12 +33,12 @@ def _proto_timestamp_from_time_ns(time_ns: int | None) -> PbTimestamp:
3333
return ts
3434

3535

36-
def _convert_span_attributes(attrs: oteltypes.Attributes) -> dict[str, pb.Value]:
36+
def _convert_span_attributes(attrs: oteltypes.Attributes) -> dict[str, msg_pb.Value]:
3737
if attrs is None:
3838
return {}
39-
pb_attrs: dict[str, pb.Value] = {}
39+
pb_attrs: dict[str, msg_pb.Value] = {}
4040
for k, v in attrs.items():
41-
pb_v = pb.Value()
41+
pb_v = msg_pb.Value()
4242
if isinstance(v, str):
4343
pb_v.string_value = v
4444
elif isinstance(v, bool):

src/redpanda/runtime/proto/runtime_pb2.py

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)