Skip to content

Commit 34eb41f

Browse files
ffelipelimaoLuiz Felipe Limaohspedro
authored
Added maestro room resilience (#668)
* Added maestro room resilience * Fixed docker and log message * Fixed code review * fix errAllErrors * add context cancel * Fixed code review * Update err handle cha Co-authored-by: Pedro Soares <pedro.soares@wildlifestudios.com> * changed erro handle --------- Co-authored-by: Luiz Felipe Limao <luizfelipe.limao@MacBook-Pro-de-Luiz.local> Co-authored-by: Pedro Soares <pedro.soares@wildlifestudios.com>
1 parent 3d9f4d8 commit 34eb41f

File tree

5 files changed

+94
-32
lines changed

5 files changed

+94
-32
lines changed

e2e/framework/maestro/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ COPY --from=builder /app/maestro .
4747
COPY ./config ./config
4848

4949
ENTRYPOINT ["/app/maestro"]
50-
CMD ["--help"]
50+
CMD ["--help"]

e2e/framework/maestro/dependencies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func provideDependencies(ctx context.Context, maestroPath string, compose tc.Com
4848
return nil, fmt.Errorf("failed to tear down running containers: %w", err)
4949
}
5050

51-
services := []string{"postgres", "redis", "k3s_agent", "k3s_server"}
51+
services := []string{"postgres", "redis"}
5252
err = compose.Up(context.Background(), tc.RunServices(services...), tc.RemoveOrphans(true), tc.Wait(true))
5353
if err != nil {
5454
return nil, fmt.Errorf("failed to start dependencies: %w", err)

internal/core/operations/rooms/add/executor.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ package add
2525
import (
2626
"context"
2727
"fmt"
28+
"sync"
29+
"time"
2830

2931
"github.com/topfreegames/maestro/internal/core/logs"
3032

3133
"github.com/topfreegames/maestro/internal/core/ports"
3234

3335
"github.com/topfreegames/maestro/internal/core/entities"
34-
"golang.org/x/sync/errgroup"
3536

3637
"go.uber.org/zap"
3738

@@ -88,24 +89,51 @@ func (ex *Executor) Execute(ctx context.Context, op *operation.Operation, defini
8889
amount = ex.config.AmountLimit
8990
}
9091

91-
errGroup, errContext := errgroup.WithContext(ctx)
92-
executionLogger.Info("start adding rooms", zap.Int32("amount", amount))
92+
var wg sync.WaitGroup
93+
errsCh := make(chan error, amount)
94+
9395
for i := int32(1); i <= amount; i++ {
94-
errGroup.Go(func() error {
95-
err := ex.createRoom(errContext, scheduler, executionLogger)
96-
return err
97-
})
96+
wg.Add(1)
97+
go func() {
98+
defer wg.Done()
99+
100+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
101+
defer cancel()
102+
103+
errsCh <- ex.createRoom(ctx, scheduler, executionLogger)
104+
}()
98105
}
99106

100-
if executionErr := errGroup.Wait(); executionErr != nil {
101-
executionLogger.Error("Error creating rooms", zap.Error(executionErr))
102-
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, executionErr.Error())
103-
return executionErr
107+
wg.Wait()
108+
close(errsCh)
109+
110+
var collectedErrors []error
111+
for err := range errsCh {
112+
if err != nil {
113+
collectedErrors = append(collectedErrors, err)
114+
}
104115
}
105116

106-
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("added %d rooms", amount))
107-
executionLogger.Info("finished adding rooms")
108-
return nil
117+
errCount := int32(len(collectedErrors))
118+
successCount := amount - errCount
119+
120+
switch {
121+
case errCount > successCount:
122+
ErrMajorityRooms := fmt.Errorf("more rooms failed than succeeded, errors: %d and successes: %d of amount: %d", errCount, successCount, amount)
123+
124+
executionLogger.Error(ErrMajorityRooms.Error(),
125+
zap.Error(ErrMajorityRooms),
126+
zap.Int32("failedRoomsCount", errCount),
127+
zap.Int32("successRoomsCount", successCount),
128+
zap.Any("allErrors", collectedErrors))
129+
130+
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, ErrMajorityRooms.Error())
131+
return ErrMajorityRooms
132+
default:
133+
executionLogger.Sugar().Infof("added rooms successfully with errors: %d and success: %d of amount: %d", errCount, successCount, amount)
134+
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("added %d rooms", amount))
135+
return nil
136+
}
109137
}
110138

111139
func (ex *Executor) Rollback(ctx context.Context, op *operation.Operation, definition operations.Definition, executeErr error) error {

internal/core/operations/rooms/add/executor_test.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,13 @@
2626
package add
2727

2828
import (
29+
"context"
2930
"fmt"
31+
"testing"
3032
"time"
3133

3234
clock_mock "github.com/topfreegames/maestro/internal/core/ports/clock_mock.go"
3335

34-
"context"
35-
"testing"
36-
3736
"github.com/golang/mock/gomock"
3837
"github.com/stretchr/testify/require"
3938
"github.com/topfreegames/maestro/internal/core/entities"
@@ -108,7 +107,7 @@ func TestExecutor_Execute(t *testing.T) {
108107
require.Nil(t, err)
109108
})
110109

111-
t.Run("should fail - some room creation fail, others succeed => returns unexpected error", func(t *testing.T) {
110+
t.Run("should succeed - some room creation fail, others succeed => returns success", func(t *testing.T) {
112111
_, roomsManager, schedulerStorage, operationsManager := testSetup(t, config)
113112

114113
schedulerStorage.EXPECT().GetScheduler(gomock.Any(), op.SchedulerName).Return(&scheduler, nil)
@@ -117,14 +116,15 @@ func TestExecutor_Execute(t *testing.T) {
117116
gameRoomReady.Status = game_room.GameStatusReady
118117

119118
roomsManager.EXPECT().CreateRoom(gomock.Any(), gomock.Any(), false).Return(&gameRoom, &gameRoomInstance, nil).Times(9)
119+
120120
roomsManager.EXPECT().CreateRoom(gomock.Any(), gomock.Any(), false).Return(nil, nil, porterrors.NewErrUnexpected("error"))
121-
operationsManager.EXPECT().AppendOperationEventToExecutionHistory(gomock.Any(), &op, "error while creating room: error")
121+
122+
operationsManager.EXPECT().AppendOperationEventToExecutionHistory(gomock.Any(), &op, fmt.Sprintf("added %d rooms", definition.Amount))
122123

123124
executor := NewExecutor(roomsManager, schedulerStorage, operationsManager, config)
124125
err := executor.Execute(context.Background(), &op, &definition)
125126

126-
require.NotNil(t, err)
127-
require.ErrorContains(t, err, "error while creating room: error")
127+
require.Nil(t, err)
128128
})
129129

130130
t.Run("should fail - no scheduler found => returns error", func(t *testing.T) {
@@ -165,10 +165,29 @@ func TestExecutor_Execute(t *testing.T) {
165165

166166
require.Nil(t, err)
167167
})
168+
169+
t.Run("should fail - majority of room creation fail, others succeed => returns error", func(t *testing.T) {
170+
_, roomsManager, schedulerStorage, operationsManager := testSetup(t, config)
171+
172+
schedulerStorage.EXPECT().GetScheduler(gomock.Any(), op.SchedulerName).Return(&scheduler, nil)
173+
174+
gameRoomReady := gameRoom
175+
gameRoomReady.Status = game_room.GameStatusReady
176+
177+
roomsManager.EXPECT().CreateRoom(gomock.Any(), gomock.Any(), false).Return(&gameRoom, &gameRoomInstance, nil).Times(4)
178+
roomsManager.EXPECT().CreateRoom(gomock.Any(), gomock.Any(), false).Return(nil, nil, porterrors.NewErrUnexpected("error")).Times(6)
179+
180+
operationsManager.EXPECT().AppendOperationEventToExecutionHistory(gomock.Any(), &op, "more rooms failed than succeeded, errors: 6 and successes: 4 of amount: 10")
181+
182+
executor := NewExecutor(roomsManager, schedulerStorage, operationsManager, config)
183+
err := executor.Execute(context.Background(), &op, &definition)
184+
185+
require.NotNil(t, err)
186+
require.ErrorContains(t, err, "more rooms failed than succeeded, errors: 6 and successes: 4 of amount: 10")
187+
})
168188
}
169189

170190
func TestExecutor_Rollback(t *testing.T) {
171-
172191
definition := Definition{Amount: 10}
173192

174193
op := operation.Operation{
@@ -188,7 +207,6 @@ func TestExecutor_Rollback(t *testing.T) {
188207
rollbackErr := NewExecutor(roomsManager, schedulerStorage, operationsManager, config).Rollback(context.Background(), &op, &definition, nil)
189208
require.NoError(t, rollbackErr)
190209
})
191-
192210
}
193211

194212
func testSetup(t *testing.T, cfg Config) (*Executor, *mockports.MockRoomManager, *mockports.MockSchedulerStorage, *mockports.MockOperationManager) {

internal/service/adapters.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ func NewRuntimeKubernetes(c config.Config) (ports.Runtime, error) {
170170

171171
// NewOperationStorageRedis instantiates redis as operation storage.
172172
func NewOperationStorageRedis(clock ports.Clock, operationDefinitionProviders map[string]operations.DefinitionConstructor, c config.Config) (ports.OperationStorage, error) {
173-
client, err := createRedisClient(c, c.GetString(operationStorageRedisURLPath))
173+
client, err := createRedisClient(c, c.GetString(operationStorageRedisURLPath), func(opts *redis.Options) {
174+
opts.MaxRetries = 3
175+
})
174176
if err != nil {
175177
return nil, fmt.Errorf("failed to initialize Redis operation storage: %w", err)
176178
}
@@ -187,7 +189,9 @@ func NewOperationStorageRedis(clock ports.Clock, operationDefinitionProviders ma
187189

188190
// NewOperationLeaseStorageRedis instantiates redis as operation lease storage.
189191
func NewOperationLeaseStorageRedis(clock ports.Clock, c config.Config) (ports.OperationLeaseStorage, error) {
190-
client, err := createRedisClient(c, c.GetString(operationLeaseStorageRedisURLPath))
192+
client, err := createRedisClient(c, c.GetString(operationLeaseStorageRedisURLPath), func(opts *redis.Options) {
193+
opts.MaxRetries = 3
194+
})
191195
if err != nil {
192196
return nil, fmt.Errorf("failed to initialize Redis operation lease storage: %w", err)
193197
}
@@ -197,7 +201,9 @@ func NewOperationLeaseStorageRedis(clock ports.Clock, c config.Config) (ports.Op
197201

198202
// NewRoomStorageRedis instantiates redis as room storage.
199203
func NewRoomStorageRedis(c config.Config) (ports.RoomStorage, error) {
200-
client, err := createRedisClient(c, c.GetString(roomStorageRedisURLPath))
204+
client, err := createRedisClient(c, c.GetString(roomStorageRedisURLPath), func(opts *redis.Options) {
205+
opts.MaxRetries = -1
206+
})
201207
if err != nil {
202208
return nil, fmt.Errorf("failed to initialize Redis room storage: %w", err)
203209
}
@@ -207,7 +213,9 @@ func NewRoomStorageRedis(c config.Config) (ports.RoomStorage, error) {
207213

208214
// NewGameRoomInstanceStorageRedis instantiates redis as instance storage.
209215
func NewGameRoomInstanceStorageRedis(c config.Config) (ports.GameRoomInstanceStorage, error) {
210-
client, err := createRedisClient(c, c.GetString(instanceStorageRedisURLPath))
216+
client, err := createRedisClient(c, c.GetString(instanceStorageRedisURLPath), func(opts *redis.Options) {
217+
opts.MaxRetries = 3
218+
})
211219
if err != nil {
212220
return nil, fmt.Errorf("failed to initialize Redis instance storage: %w", err)
213221
}
@@ -217,7 +225,9 @@ func NewGameRoomInstanceStorageRedis(c config.Config) (ports.GameRoomInstanceSto
217225

218226
// NewSchedulerCacheRedis instantiates redis as scheduler cache.
219227
func NewSchedulerCacheRedis(c config.Config) (ports.SchedulerCache, error) {
220-
client, err := createRedisClient(c, c.GetString(schedulerCacheRedisURLPath))
228+
client, err := createRedisClient(c, c.GetString(schedulerCacheRedisURLPath), func(opts *redis.Options) {
229+
opts.MaxRetries = 3
230+
})
221231
if err != nil {
222232
return nil, fmt.Errorf("failed to initialize Redis scheduler cache: %w", err)
223233
}
@@ -261,7 +271,7 @@ func GetSchedulerStoragePostgresURL(c config.Config) string {
261271
return c.GetString(schedulerStoragePostgresURLPath)
262272
}
263273

264-
func createRedisClient(c config.Config, url string) (*redis.Client, error) {
274+
func createRedisClient(c config.Config, url string, customizers ...func(*redis.Options)) (*redis.Client, error) {
265275
opts, err := redis.ParseURL(url)
266276
if err != nil {
267277
return nil, fmt.Errorf("invalid redis URL: %w", err)
@@ -273,6 +283,10 @@ func createRedisClient(c config.Config, url string) (*redis.Client, error) {
273283

274284
hostPort := strings.Split(opts.Addr, ":")
275285

286+
for _, customizer := range customizers {
287+
customizer(opts)
288+
}
289+
276290
client := redis.NewClient(opts)
277291

278292
if tracing.IsTracingEnabled(c) {
@@ -299,7 +313,9 @@ func NewAutoscaler(policies autoscaler.PolicyMap) ports.Autoscaler {
299313

300314
// NewOperationFlowRedis instantiates a new operation flow using redis as backend.
301315
func NewOperationFlowRedis(c config.Config) (ports.OperationFlow, error) {
302-
client, err := createRedisClient(c, c.GetString(operationFlowRedisURLPath))
316+
client, err := createRedisClient(c, c.GetString(operationFlowRedisURLPath), func(opts *redis.Options) {
317+
opts.MaxRetries = 3
318+
})
303319
if err != nil {
304320
return nil, fmt.Errorf("failed to initialize Redis operation storage: %w", err)
305321
}

0 commit comments

Comments
 (0)