Skip to content

Commit b2a701d

Browse files
authored
Rewrite detached actor test with go (#2722)
1 parent c807790 commit b2a701d

File tree

3 files changed

+180
-0
lines changed

3 files changed

+180
-0
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package e2e
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
. "github.com/onsi/gomega"
8+
corev1 "k8s.io/api/core/v1"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
11+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
12+
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
13+
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
14+
. "github.com/ray-project/kuberay/ray-operator/test/support"
15+
)
16+
17+
const (
18+
redisPassword = "5241590000000000"
19+
)
20+
21+
func TestRayClusterGCSFaultTolerence(t *testing.T) {
22+
test := With(t)
23+
g := NewWithT(t)
24+
25+
// Create a namespace
26+
namespace := test.NewTestNamespace()
27+
testScriptAC := newConfigMap(namespace.Name, files(test, "test_detached_actor_1.py", "test_detached_actor_2.py"))
28+
testScript, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), testScriptAC, TestApplyOptions)
29+
g.Expect(err).NotTo(HaveOccurred())
30+
31+
test.T().Run("Test Detached Actor", func(_ *testing.T) {
32+
checkRedisDBSize := deployRedis(test, namespace.Name, redisPassword)
33+
defer g.Eventually(checkRedisDBSize, time.Second*30, time.Second).Should(BeEquivalentTo("0"))
34+
35+
rayClusterSpecAC := rayv1ac.RayClusterSpec().
36+
WithGcsFaultToleranceOptions(
37+
rayv1ac.GcsFaultToleranceOptions().
38+
WithRedisAddress("redis:6379").
39+
WithRedisPassword(rayv1ac.RedisCredential().WithValue(redisPassword)),
40+
).
41+
WithRayVersion(GetRayVersion()).
42+
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
43+
WithRayStartParams(map[string]string{
44+
"num-cpus": "0",
45+
}).
46+
WithTemplate(headPodTemplateApplyConfiguration()),
47+
).
48+
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
49+
WithRayStartParams(map[string]string{
50+
"num-cpus": "1",
51+
}).
52+
WithGroupName("small-group").
53+
WithReplicas(1).
54+
WithMinReplicas(1).
55+
WithMaxReplicas(2).
56+
WithTemplate(workerPodTemplateApplyConfiguration()),
57+
)
58+
rayClusterAC := rayv1ac.RayCluster("raycluster-gcsft", namespace.Name).
59+
WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](testScript, "/home/ray/samples")))
60+
61+
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
62+
63+
g.Expect(err).NotTo(HaveOccurred())
64+
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)
65+
66+
test.T().Logf("Waiting for RayCluster %s/%s to become ready", rayCluster.Namespace, rayCluster.Name)
67+
g.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutLong).
68+
Should(WithTransform(StatusCondition(rayv1.RayClusterProvisioned), MatchCondition(metav1.ConditionTrue, rayv1.AllPodRunningAndReadyFirstTime)))
69+
70+
headPod, err := GetHeadPod(test, rayCluster)
71+
g.Expect(err).NotTo(HaveOccurred())
72+
73+
test.T().Logf("HeadPod Name: %s", headPod.Name)
74+
75+
rayNamespace := "testing-ray-namespace"
76+
test.T().Logf("Ray namespace: %s", rayNamespace)
77+
78+
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_1.py", rayNamespace})
79+
80+
// [Test 1: Kill GCS process to "restart" the head Pod]
81+
// Assertion is implement in python, so no furthur handling needed here, and so are other ExecPodCmd
82+
stdout, stderr := ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"pkill", "gcs_server"})
83+
t.Logf("pkill gcs_server output - stdout: %s, stderr: %s", stdout.String(), stderr.String())
84+
85+
// Restart count should eventually become 1, not creating a new pod
86+
HeadPodRestartCount := func(p *corev1.Pod) int32 { return p.Status.ContainerStatuses[0].RestartCount }
87+
HeadPodContainerReady := func(p *corev1.Pod) bool { return p.Status.ContainerStatuses[0].Ready }
88+
89+
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
90+
Should(WithTransform(HeadPodRestartCount, Equal(int32(1))))
91+
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
92+
Should(WithTransform(HeadPodContainerReady, Equal(true)))
93+
94+
// Pod Status should eventually become Running
95+
PodState := func(p *corev1.Pod) string { return string(p.Status.Phase) }
96+
g.Eventually(HeadPod(test, rayCluster)).
97+
Should(WithTransform(PodState, Equal("Running")))
98+
99+
headPod, err = GetHeadPod(test, rayCluster)
100+
g.Expect(err).NotTo(HaveOccurred())
101+
102+
expectedOutput := "3"
103+
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_2.py", rayNamespace, expectedOutput})
104+
105+
// Test 2: Delete the head Pod
106+
err = test.Client().Core().CoreV1().Pods(namespace.Name).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
107+
g.Expect(err).NotTo(HaveOccurred())
108+
109+
testPodNameChanged := func(p *corev1.Pod) bool { return p.Name != headPod.Name }
110+
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
111+
Should(WithTransform(testPodNameChanged, Equal(true)))
112+
113+
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
114+
Should(WithTransform(PodState, Equal("Running")))
115+
116+
headPod, _ = GetHeadPod(test, rayCluster)
117+
expectedOutput = "4"
118+
119+
ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_2.py", rayNamespace, expectedOutput})
120+
121+
err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Delete(test.Ctx(), rayCluster.Name, metav1.DeleteOptions{})
122+
g.Expect(err).NotTo(HaveOccurred())
123+
})
124+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import ray
2+
import sys
3+
4+
ray.init(namespace=sys.argv[1])
5+
6+
@ray.remote
7+
class TestCounter:
8+
def __init__(self):
9+
self.value = 0
10+
11+
def increment(self):
12+
self.value += 1
13+
return self.value
14+
15+
tc = TestCounter.options(name="testCounter", lifetime="detached", max_restarts=-1).remote()
16+
val1 = ray.get(tc.increment.remote())
17+
val2 = ray.get(tc.increment.remote())
18+
print(f"val1: {val1}, val2: {val2}")
19+
20+
assert(val1 == 1)
21+
assert(val2 == 2)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import ray
2+
import time
3+
import sys
4+
5+
def retry_with_timeout(func, timeout=90):
6+
err = None
7+
start = time.time()
8+
i = 0
9+
while time.time() - start <= timeout:
10+
try:
11+
print(f"retry iter: {i}", flush=True)
12+
i += 1
13+
return func()
14+
except BaseException as e:
15+
err = e
16+
finally:
17+
time.sleep(1)
18+
raise err
19+
20+
def get_detached_actor():
21+
return ray.get_actor("testCounter")
22+
23+
# Try to connect to Ray cluster.
24+
print("Try to connect to Ray cluster.", flush=True)
25+
retry_with_timeout(lambda: ray.init(address='ray://127.0.0.1:10001', namespace=sys.argv[1]), timeout = 180)
26+
27+
# Get TestCounter actor
28+
print("Get TestCounter actor.", flush=True)
29+
tc = retry_with_timeout(get_detached_actor)
30+
31+
print("Try to call remote function \'increment\'.", flush=True)
32+
val = retry_with_timeout(lambda: ray.get(tc.increment.remote()))
33+
print(f"val: {val}", flush=True)
34+
35+
assert(val == int(sys.argv[2]))

0 commit comments

Comments
 (0)