Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 161 additions & 2 deletions historyserver/test/e2e/historyserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

const (
LiveSessionName = "live"
EndpointLogFile = "/api/v0/logs/file"
LiveSessionName = "live"
EndpointLogFile = "/api/v0/logs/file"
EndpointLogicalActors = "/logical/actors"
)

func TestHistoryServer(t *testing.T) {
Expand All @@ -43,6 +44,10 @@ func TestHistoryServer(t *testing.T) {
name: "/v0/logs/file endpoint (dead cluster)",
testFunc: testLogFileEndpointDeadCluster,
},
{
name: "/logical/actors endpoint (dead cluster)",
testFunc: testLogicalActorsEndpointDeadCluster,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -282,3 +287,157 @@ func testLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Names
DeleteS3Bucket(test, g, s3Client)
LogWithTimestamp(test.T(), "Dead cluster log file endpoint tests completed")
}

// testLogicalActorsEndpointDeadCluster verifies that the history server can return actors from the
// in-memory ClusterActorMap after a cluster is deleted.
//
// Data flow explanation:
// The history server does not fetch actors directly from S3. Instead:
// 1. Collector pushes events to S3 on cluster deletion
// 2. Storage Reader reads event files from S3
// 3. Event Handler processes events and populates ClusterActorMap
// 4. The /logical/actors endpoint returns actors from the in-memory ClusterActorMap
//
// The test case follows these steps:
// 1. Prepare test environment by applying a Ray cluster
// 2. Submit a Ray job to the existing cluster (generates actor events)
// 3. Delete RayCluster to trigger log upload to S3 (and event processing)
// 4. Apply History Server and get its URL
// 5. Verify that the history server returns actors via /logical/actors endpoint
// 6. Verify that the history server returns a single actor via /logical/actors/{actor_id} endpoint
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test documentation lists 7 steps but misses the third sub-test in the step list. Step 6 should be expanded to include all three actor endpoint tests: (a) fetching all actors, (b) fetching a single actor by ID, and (c) handling non-existent actor queries. Consider updating the documentation to: "6. Verify that the history server returns actors via /logical/actors endpoint, returns a single actor via /logical/actors/{actor_id} endpoint, and handles non-existent actor queries appropriately"

Suggested change
// 6. Verify that the history server returns a single actor via /logical/actors/{actor_id} endpoint
// 6. Verify that the history server returns actors via /logical/actors endpoint, returns a single actor via /logical/actors/{actor_id} endpoint, and handles non-existent actor queries appropriately

Copilot uses AI. Check for mistakes.
// 7. Delete S3 bucket to ensure test isolation
func testLogicalActorsEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
rayCluster := PrepareTestEnv(test, g, namespace, s3Client)
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)

// Delete RayCluster to trigger log upload to S3
err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name)

// Wait for cluster to be fully deleted (ensures logs are uploaded to S3 and events are processed)
g.Eventually(func() error {
_, err := GetRayCluster(test, namespace.Name, rayCluster.Name)
return err
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined deletion duplicates existing DeleteRayClusterAndWait helper

Low Severity

The new testLogicalActorsEndpointDeadCluster function manually inlines the RayCluster deletion and wait logic (delete, expect no error, log, Eventually wait for IsNotFound), which is exactly what the existing DeleteRayClusterAndWait helper in historyserver.go already does. The adjacent testLogFileEndpointDeadCluster test uses DeleteRayClusterAndWait for the same purpose, making this inconsistency more noticeable. Duplicating this logic increases maintenance burden and risks divergence if the deletion flow is updated.

Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Comment on lines +218 to +228
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster deletion logic is duplicated here instead of using the existing DeleteRayClusterAndWait helper function. This is inconsistent with testLogFileEndpointDeadCluster (line 171) and testDeadClusterTasks which use the helper. Using the helper function improves code maintainability and ensures consistent deletion behavior across tests.

Suggested change
// Delete RayCluster to trigger log upload to S3
err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Deleted RayCluster %s/%s", namespace.Name, rayCluster.Name)
// Wait for cluster to be fully deleted (ensures logs are uploaded to S3 and events are processed)
g.Eventually(func() error {
_, err := GetRayCluster(test, namespace.Name, rayCluster.Name)
return err
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
// Delete RayCluster to trigger log upload to S3 and wait for full deletion
DeleteRayClusterAndWait(test, g, namespace, rayCluster)

Copilot uses AI. Check for mistakes.
ApplyHistoryServer(test, g, namespace)
historyServerURL := GetHistoryServerURL(test, g, namespace)

clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName))

client := CreateHTTPClientWithCookieJar(g)
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)

test.T().Run("should return actors from history server", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(func(gg Gomega) {
resp, err := client.Get(historyServerURL + EndpointLogicalActors)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(true))
gg.Expect(result["msg"]).To(Equal("All actors fetched."))

// Verify data.actors exists and is a map
data, ok := result["data"].(map[string]any)
gg.Expect(ok).To(BeTrue())
actors, ok := data["actors"].(map[string]any)
gg.Expect(ok).To(BeTrue())
gg.Expect(len(actors)).To(BeNumerically(">", 0), "should have at least one actor")

// Verify actor schema matches formatActorForResponse format (for the first actor)
// Required fields from router.go:formatActorForResponse
for _, actorData := range actors {
actor, ok := actorData.(map[string]any)
gg.Expect(ok).To(BeTrue(), "actor should be a map")
gg.Expect(actor["actor_id"]).NotTo(BeNil(), "actor should have actor_id")
gg.Expect(actor["job_id"]).NotTo(BeNil(), "actor should have job_id")
gg.Expect(actor["state"]).NotTo(BeNil(), "actor should have state")
gg.Expect(actor["address"]).NotTo(BeNil(), "actor should have address")
address, ok := actor["address"].(map[string]any)
gg.Expect(ok).To(BeTrue(), "address should be a map")
gg.Expect(address["node_id"]).NotTo(BeNil(), "address should have node_id")
gg.Expect(address["ip_address"]).NotTo(BeNil(), "address should have ip_address")
break // Only verify the first actor
}

LogWithTimestamp(test.T(), "Found %d actors from history server", len(actors))
}, TestTimeoutShort).Should(Succeed())
})

test.T().Run("should return single actor from history server", func(t *testing.T) {
g := NewWithT(t)

actorID := GetOneOfActorID(g, client, historyServerURL)

// Now test the single actor endpoint
g.Eventually(func(gg Gomega) {
singleActorURL := fmt.Sprintf("%s/logical/actors/%s", historyServerURL, actorID)
resp, err := client.Get(singleActorURL)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(true))
gg.Expect(result["msg"]).To(Equal("Actor fetched."))

// Verify data.detail exists and contains actor_id
data, ok := result["data"].(map[string]any)
gg.Expect(ok).To(BeTrue())
detail, ok := data["detail"].(map[string]any)
gg.Expect(ok).To(BeTrue())

// Verify actor schema matches formatActorForResponse format
// Required fields from router.go:formatActorForResponse
gg.Expect(detail["actor_id"]).To(Equal(actorID))
gg.Expect(detail["job_id"]).NotTo(BeNil())
gg.Expect(detail["state"]).NotTo(BeNil())
gg.Expect(detail["address"]).NotTo(BeNil())
address, ok := detail["address"].(map[string]any)
gg.Expect(ok).To(BeTrue(), "address should be a map")
gg.Expect(address["node_id"]).NotTo(BeNil())
gg.Expect(address["ip_address"]).NotTo(BeNil())

LogWithTimestamp(test.T(), "Successfully fetched actor %s from history server", actorID)
}, TestTimeoutShort).Should(Succeed())
})

test.T().Run("should handle non-existent actor", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(func(gg Gomega) {
fakeActorID := "ffffffffffffffffffffffffffffffffffffffff"
singleActorURL := fmt.Sprintf("%s/logical/actors/%s", historyServerURL, fakeActorID)
resp, err := client.Get(singleActorURL)
gg.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
gg.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gg.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(result["result"]).To(Equal(false))
gg.Expect(result["msg"]).To(Equal("Actor not found."))
}, TestTimeoutShort).Should(Succeed())
})

DeleteS3Bucket(test, g, s3Client)
LogWithTimestamp(test.T(), "Dead cluster logical actors endpoint tests completed")
}
33 changes: 33 additions & 0 deletions historyserver/test/support/historyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,36 @@ func GetOneOfNodeID(g *WithT, client *http.Client, historyServerURL string) stri
nodeInfo := summary[0].(map[string]any)
return nodeInfo["raylet"].(map[string]any)["nodeId"].(string)
}

// GetOneOfActorID retrieves an actor ID from the /logical/actors endpoint.
// The history server returns actors from the in-memory ClusterActorMap, which is populated
// by the Event Handler processing events from S3.
func GetOneOfActorID(g *WithT, client *http.Client, historyServerURL string) string {
resp, err := client.Get(historyServerURL + "/logical/actors")
g.Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
g.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
g.Expect(err).NotTo(HaveOccurred())

var result map[string]any
err = json.Unmarshal(body, &result)
g.Expect(err).NotTo(HaveOccurred())

// Response format: {"result": true, "msg": "...", "data": {"actors": {actor_id: {...}, ...}}}
data, ok := result["data"].(map[string]any)
g.Expect(ok).To(BeTrue(), "response should have 'data' field")

actors, ok := data["actors"].(map[string]any)
g.Expect(ok).To(BeTrue(), "data should have 'actors' field")
g.Expect(len(actors)).To(BeNumerically(">", 0), "should have at least one actor")

// Get the first actor ID from the map
for actorID := range actors {
return actorID
}

// This should never happen due to the length check above
return ""
}
Loading