Skip to content

Commit a554298

Browse files
feat(serverless-spark): add list/get sessions commands (googleapis#2576)
## Description Adds commands for Serverless Spark sessions analogous to existing list/get commands for batches. Unlike batches, we don't plan to add create session commands, as these are generally only created via the Jupyter protocol. Agents will need to have other ways to create sessions, for example by interacting directly with an IDE like Jupyter Lab or VS Code, but those won't be part of MCP Toolbox. ## PR Checklist > Thank you for opening a Pull Request! Before submitting your PR, there are a > few things you can do to make sure it goes smoothly: - [ ] Make sure you reviewed [CONTRIBUTING.md](https://github.com/googleapis/genai-toolbox/blob/main/CONTRIBUTING.md) - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/genai-toolbox/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) - [ ] Make sure to add `!` if this involve a breaking change 🛠️ Part of googleapis#2405 Co-authored-by: Wenxin Du <117315983+duwenxin99@users.noreply.github.com>
1 parent a8edb79 commit a554298

File tree

16 files changed

+1176
-13
lines changed

16 files changed

+1176
-13
lines changed

cmd/internal/imports.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,9 @@ import (
204204
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkcreatepysparkbatch"
205205
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkcreatesparkbatch"
206206
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkgetbatch"
207+
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkgetsession"
207208
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparklistbatches"
209+
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparklistsessions"
208210
_ "github.com/googleapis/genai-toolbox/internal/tools/singlestore/singlestoreexecutesql"
209211
_ "github.com/googleapis/genai-toolbox/internal/tools/singlestore/singlestoresql"
210212
_ "github.com/googleapis/genai-toolbox/internal/tools/snowflake/snowflakeexecutesql"

cmd/internal/tools_file_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1753,7 +1753,7 @@ func TestPrebuiltTools(t *testing.T) {
17531753
wantToolset: server.ToolsetConfigs{
17541754
"serverless_spark_tools": tools.ToolsetConfig{
17551755
Name: "serverless_spark_tools",
1756-
ToolNames: []string{"list_batches", "get_batch", "cancel_batch", "create_pyspark_batch", "create_spark_batch"},
1756+
ToolNames: []string{"list_batches", "get_batch", "cancel_batch", "create_pyspark_batch", "create_spark_batch", "list_sessions", "get_session"},
17571757
},
17581758
},
17591759
},

docs/en/reference/prebuilt-tools.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,8 @@ See [Usage Examples](../reference/cli.md#examples).
746746
* `cancel_batch`: Cancels a Spark batch.
747747
* `create_pyspark_batch`: Creates a PySpark batch.
748748
* `create_spark_batch`: Creates a Spark batch.
749+
* `list_sessions`: Lists Spark sessions.
750+
* `get_session`: Gets a Spark session.
749751

750752
## SingleStore
751753

docs/en/resources/sources/serverless-spark.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ Apache Spark.
2525
Create a Serverless Spark PySpark batch operation.
2626
- [`serverless-spark-create-spark-batch`](../tools/serverless-spark/serverless-spark-create-spark-batch.md)
2727
Create a Serverless Spark Java batch operation.
28+
- [`serverless-spark-list-sessions`](../tools/serverless-spark/serverless-spark-list-sessions.md)
29+
List and filter Serverless Spark sessions.
30+
- [`serverless-spark-get-session`](../tools/serverless-spark/serverless-spark-get-session.md)
31+
Get a Serverless Spark session.
2832

2933
## Requirements
3034

docs/en/resources/tools/serverless-spark/_index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ description: >
1111
- [serverless-spark-cancel-batch](./serverless-spark-cancel-batch.md)
1212
- [serverless-spark-create-pyspark-batch](./serverless-spark-create-pyspark-batch.md)
1313
- [serverless-spark-create-spark-batch](./serverless-spark-create-spark-batch.md)
14+
- [serverless-spark-get-session](./serverless-spark-get-session.md)
15+
- [serverless-spark-list-sessions](./serverless-spark-list-sessions.md)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
---
2+
title: "serverless-spark-get-session"
3+
type: docs
4+
weight: 1
5+
description: >
6+
A "serverless-spark-get-session" tool retrieves a specific Spark session from the source.
7+
aliases:
8+
- /resources/tools/serverless-spark-get-session
9+
---
10+
11+
## About
12+
13+
A `serverless-spark-get-session` tool retrieves a specific Spark session from a
14+
Google Cloud Serverless for Apache Spark source. It's compatible with the
15+
following sources:
16+
17+
- [serverless-spark](../../sources/serverless-spark.md)
18+
19+
`serverless-spark-get-session` accepts the following parameters:
20+
21+
- **`name`** (required): The short name of the session, e.g. for `projects/my-project/locations/us-central1/sessions/my-session`, pass `my-session`.
22+
23+
The tool gets the `project` and `location` from the source configuration.
24+
25+
## Example
26+
27+
```yaml
28+
kind: tools
29+
name: get_spark_session
30+
type: serverless-spark-get-session
31+
source: my-serverless-spark-source
32+
description: Use this tool to get details of a serverless spark session.
33+
```
34+
35+
## Response Format
36+
37+
```json
38+
{
39+
"consoleUrl": "https://console.cloud.google.com/dataproc/interactive/us-central1/my-session/details?project=my-project",
40+
"logsUrl": "https://console.cloud.google.com/logs/viewer?...",
41+
"session": {
42+
"name": "projects/my-project/locations/us-central1/sessions/my-session",
43+
"uuid": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
44+
"state": "ACTIVE",
45+
// ... complete session resource definition
46+
}
47+
}
48+
```
49+
50+
## Reference
51+
52+
| **field** | **type** | **required** | **description** |
53+
| ------------ | :------: | :----------: | -------------------------------------------------- |
54+
| type | string | true | Must be "serverless-spark-get-session". |
55+
| source | string | true | Name of the source the tool should use. |
56+
| description | string | true | Description of the tool that is passed to the LLM. |
57+
| authRequired | string[] | false | List of auth services required to invoke this tool |
58+
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
---
2+
title: "serverless-spark-list-sessions"
3+
type: docs
4+
weight: 1
5+
description: >
6+
A "serverless-spark-list-sessions" tool returns a list of Spark sessions from the source.
7+
aliases:
8+
- /resources/tools/serverless-spark-list-sessions
9+
---
10+
11+
## About
12+
13+
A `serverless-spark-list-sessions` tool returns a list of Spark sessions from a
14+
Google Cloud Serverless for Apache Spark source. It's compatible with the
15+
following sources:
16+
17+
- [serverless-spark](../../sources/serverless-spark.md)
18+
19+
`serverless-spark-list-sessions` accepts the following parameters:
20+
21+
- **`filter`** (optional): Optional. A filter for the sessions to return in the
22+
response. A filter is a logical expression constraining the values of various
23+
fields in each session resource. Filters are case sensitive, and may contain
24+
multiple clauses combined with logical operators (AND, OR). Supported fields
25+
are session_id, session_uuid, state, create_time, and labels. Example: `state
26+
= ACTIVE and create_time < "2023-01-01T00:00:00Z"` is a filter for sessions in
27+
an ACTIVE state that were created before 2023-01-01. `state = ACTIVE and
28+
labels.environment=production` is a filter for sessions in an ACTIVE state
29+
that have a production environment label.
30+
- **`pageSize`** (optional): The maximum number of sessions to return in a single
31+
page. Defaults to `20`.
32+
- **`pageToken`** (optional): A page token, received from a previous call, to
33+
retrieve the next page of results.
34+
35+
The tool gets the `project` and `location` from the source configuration.
36+
37+
## Example
38+
39+
```yaml
40+
kind: tools
41+
name: list_spark_sessions
42+
type: serverless-spark-list-sessions
43+
source: my-serverless-spark-source
44+
description: Use this tool to list and filter serverless spark sessions.
45+
```
46+
47+
## Response Format
48+
49+
```json
50+
{
51+
"sessions": [
52+
{
53+
"name": "projects/my-project/locations/us-central1/sessions/session-abc-123",
54+
"uuid": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
55+
"state": "ACTIVE",
56+
"creator": "alice@example.com",
57+
"createTime": "2023-10-27T10:00:00Z",
58+
"consoleUrl": "https://console.cloud.google.com/dataproc/interactive/us-central1/session-abc-123/details?project=my-project",
59+
"logsUrl": "https://console.cloud.google.com/logs/viewer?..."
60+
},
61+
{
62+
"name": "projects/my-project/locations/us-central1/sessions/session-def-456",
63+
"uuid": "b2c3d4e5-f6a7-8901-2345-678901bcdefa",
64+
"state": "TERMINATED",
65+
"creator": "alice@example.com",
66+
"createTime": "2023-10-27T11:30:00Z",
67+
"consoleUrl": "https://console.cloud.google.com/dataproc/interactive/us-central1/session-def-456/details?project=my-project",
68+
"logsUrl": "https://console.cloud.google.com/logs/viewer?..."
69+
}
70+
],
71+
"nextPageToken": "abcd1234"
72+
}
73+
```
74+
75+
## Reference
76+
77+
| **field** | **type** | **required** | **description** |
78+
| ------------ | :------: | :----------: | -------------------------------------------------- |
79+
| type | string | true | Must be "serverless-spark-list-sessions". |
80+
| source | string | true | Name of the source the tool should use. |
81+
| description | string | true | Description of the tool that is passed to the LLM. |
82+
| authRequired | string[] | false | List of auth services required to invoke this tool |

internal/prebuiltconfigs/tools/serverless-spark.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ tools:
3434
create_spark_batch:
3535
kind: serverless-spark-create-spark-batch
3636
source: serverless-spark-source
37+
list_sessions:
38+
kind: serverless-spark-list-sessions
39+
source: serverless-spark-source
40+
get_session:
41+
kind: serverless-spark-get-session
42+
source: serverless-spark-source
3743

3844
toolsets:
3945
serverless_spark_tools:
@@ -42,3 +48,5 @@ toolsets:
4248
- cancel_batch
4349
- create_pyspark_batch
4450
- create_spark_batch
51+
- list_sessions
52+
- get_session

internal/sources/serverlessspark/serverlessspark.go

Lines changed: 133 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package serverlessspark
1717
import (
1818
"context"
1919
"encoding/json"
20+
"errors"
2021
"fmt"
2122
"time"
2223

@@ -77,11 +78,16 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
7778
if err != nil {
7879
return nil, fmt.Errorf("failed to create longrunning client: %w", err)
7980
}
81+
sessionClient, err := dataproc.NewSessionControllerClient(ctx, option.WithEndpoint(endpoint), option.WithUserAgent(ua))
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to create dataproc session client: %w", err)
84+
}
8085

8186
s := &Source{
82-
Config: r,
83-
Client: client,
84-
OpsClient: opsClient,
87+
Config: r,
88+
Client: client,
89+
OpsClient: opsClient,
90+
SessionClient: sessionClient,
8591
}
8692
return s, nil
8793
}
@@ -90,8 +96,9 @@ var _ sources.Source = &Source{}
9096

9197
type Source struct {
9298
Config
93-
Client *dataproc.BatchControllerClient
94-
OpsClient *longrunning.OperationsClient
99+
Client *dataproc.BatchControllerClient
100+
OpsClient *longrunning.OperationsClient
101+
SessionClient *dataproc.SessionControllerClient
95102
}
96103

97104
func (s *Source) SourceType() string {
@@ -114,18 +121,16 @@ func (s *Source) GetBatchControllerClient() *dataproc.BatchControllerClient {
114121
return s.Client
115122
}
116123

124+
func (s *Source) GetSessionControllerClient() *dataproc.SessionControllerClient {
125+
return s.SessionClient
126+
}
127+
117128
func (s *Source) GetOperationsClient(ctx context.Context) (*longrunning.OperationsClient, error) {
118129
return s.OpsClient, nil
119130
}
120131

121132
func (s *Source) Close() error {
122-
if err := s.Client.Close(); err != nil {
123-
return err
124-
}
125-
if err := s.OpsClient.Close(); err != nil {
126-
return err
127-
}
128-
return nil
133+
return errors.Join(s.Client.Close(), s.SessionClient.Close(), s.OpsClient.Close())
129134
}
130135

131136
func (s *Source) CancelOperation(ctx context.Context, operation string) (any, error) {
@@ -292,3 +297,119 @@ func (s *Source) GetBatch(ctx context.Context, name string) (map[string]any, err
292297

293298
return wrappedResult, nil
294299
}
300+
301+
// ListSessionsResponse is the response from the list sessions API.
302+
type ListSessionsResponse struct {
303+
Sessions []Session `json:"sessions"`
304+
NextPageToken string `json:"nextPageToken"`
305+
}
306+
307+
// Session represents a single session job.
308+
type Session struct {
309+
Name string `json:"name"`
310+
UUID string `json:"uuid"`
311+
State string `json:"state"`
312+
Creator string `json:"creator"`
313+
CreateTime string `json:"createTime"`
314+
ConsoleURL string `json:"consoleUrl"`
315+
LogsURL string `json:"logsUrl"`
316+
}
317+
318+
func (s *Source) ListSessions(ctx context.Context, ps *int, pt, filter string) (any, error) {
319+
client := s.GetSessionControllerClient()
320+
parent := fmt.Sprintf("projects/%s/locations/%s", s.GetProject(), s.GetLocation())
321+
req := &dataprocpb.ListSessionsRequest{
322+
Parent: parent,
323+
}
324+
325+
if ps != nil {
326+
req.PageSize = int32(*ps)
327+
}
328+
if pt != "" {
329+
req.PageToken = pt
330+
}
331+
if filter != "" {
332+
req.Filter = filter
333+
}
334+
335+
it := client.ListSessions(ctx, req)
336+
pager := iterator.NewPager(it, int(req.PageSize), req.PageToken)
337+
338+
var sessionPbs []*dataprocpb.Session
339+
nextPageToken, err := pager.NextPage(&sessionPbs)
340+
if err != nil {
341+
return nil, fmt.Errorf("failed to list sessions: %w", err)
342+
}
343+
344+
sessions, err := ToSessions(sessionPbs)
345+
if err != nil {
346+
return nil, err
347+
}
348+
349+
return ListSessionsResponse{Sessions: sessions, NextPageToken: nextPageToken}, nil
350+
}
351+
352+
func (s *Source) GetSession(ctx context.Context, name string) (map[string]any, error) {
353+
client := s.GetSessionControllerClient()
354+
req := &dataprocpb.GetSessionRequest{
355+
Name: fmt.Sprintf("projects/%s/locations/%s/sessions/%s", s.GetProject(), s.GetLocation(), name),
356+
}
357+
358+
sessionPb, err := client.GetSession(ctx, req)
359+
if err != nil {
360+
return nil, fmt.Errorf("failed to get session: %w", err)
361+
}
362+
363+
jsonBytes, err := protojson.Marshal(sessionPb)
364+
if err != nil {
365+
return nil, fmt.Errorf("failed to marshal session to JSON: %w", err)
366+
}
367+
368+
var result map[string]any
369+
if err := json.Unmarshal(jsonBytes, &result); err != nil {
370+
return nil, fmt.Errorf("failed to unmarshal session JSON: %w", err)
371+
}
372+
373+
consoleUrl, err := SessionConsoleURLFromProto(sessionPb)
374+
if err != nil {
375+
return nil, fmt.Errorf("error generating console url: %v", err)
376+
}
377+
logsUrl, err := SessionLogsURLFromProto(sessionPb)
378+
if err != nil {
379+
return nil, fmt.Errorf("error generating logs url: %v", err)
380+
}
381+
382+
wrappedResult := map[string]any{
383+
"consoleUrl": consoleUrl,
384+
"logsUrl": logsUrl,
385+
"session": result,
386+
}
387+
388+
return wrappedResult, nil
389+
}
390+
391+
// ToSessions converts a slice of protobuf Session messages to a slice of Session structs.
392+
func ToSessions(sessionPbs []*dataprocpb.Session) ([]Session, error) {
393+
sessions := make([]Session, 0, len(sessionPbs))
394+
for _, sessionPb := range sessionPbs {
395+
consoleUrl, err := SessionConsoleURLFromProto(sessionPb)
396+
if err != nil {
397+
return nil, fmt.Errorf("error generating console url: %v", err)
398+
}
399+
logsUrl, err := SessionLogsURLFromProto(sessionPb)
400+
if err != nil {
401+
return nil, fmt.Errorf("error generating logs url: %v", err)
402+
}
403+
session := Session{
404+
Name: sessionPb.Name,
405+
UUID: sessionPb.Uuid,
406+
State: sessionPb.State.Enum().String(),
407+
Creator: sessionPb.Creator,
408+
CreateTime: sessionPb.CreateTime.AsTime().Format(time.RFC3339),
409+
ConsoleURL: consoleUrl,
410+
LogsURL: logsUrl,
411+
}
412+
sessions = append(sessions, session)
413+
}
414+
return sessions, nil
415+
}

0 commit comments

Comments
 (0)