Skip to content

Commit 9ff4e6d

Browse files
authored
Fix flaky tests: port in for loki source api tests and logs integration test (#4875)
* Fix port in use flakyness for loki source api tests * Pin loki container version for integration tests
1 parent 5bf2d4f commit 9ff4e6d

File tree

3 files changed

+82
-101
lines changed

3 files changed

+82
-101
lines changed

internal/cmd/integration-tests/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ services:
6464
- integration-tests
6565

6666
loki:
67-
image: grafana/loki:latest
67+
image: grafana/loki:3.5.8
6868
command: -config.file=/etc/loki/local-config.yaml
6969
ports:
7070
- "3100:3100"

internal/component/loki/source/api/api_test.go

Lines changed: 73 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestLokiSourceAPI_Simple(t *testing.T) {
9999
receiver := fake.NewClient(func() {})
100100
defer receiver.Stop()
101101

102-
args := testArgsWith(t, func(a *Arguments) {
102+
args := testArgsWith(func(a *Arguments) {
103103
a.Server.HTTP.ListenPort = 8532
104104
a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}
105105
a.UseIncomingTimestamp = true
@@ -144,7 +144,7 @@ func TestLokiSourceAPI_Update(t *testing.T) {
144144
receiver := fake.NewClient(func() {})
145145
defer receiver.Stop()
146146

147-
args := testArgsWith(t, func(a *Arguments) {
147+
args := testArgsWith(func(a *Arguments) {
148148
a.Server.HTTP.ListenPort = 8583
149149
a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}
150150
a.UseIncomingTimestamp = true
@@ -221,7 +221,7 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {
221221
receivers[i] = fake.NewClient(func() {})
222222
}
223223

224-
args := testArgsWith(t, func(a *Arguments) {
224+
args := testArgsWith(func(a *Arguments) {
225225
a.Server.HTTP.ListenPort = 8537
226226
a.ForwardTo = mapToChannels(receivers)
227227
})
@@ -267,73 +267,63 @@ func TestLokiSourceAPI_FanOut(t *testing.T) {
267267
}
268268

269269
func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
270-
httpPort := getFreePort(t)
271-
grpcPort := getFreePort(t, httpPort)
272270
tests := []struct {
273271
name string
274272
args Arguments
275273
newArgs Arguments
274+
changeHttpPort bool
276275
restartRequired bool
277276
}{
278277
{
279278
name: "identical args don't require server restart",
280-
args: testArgsWithPorts(httpPort, grpcPort),
281-
newArgs: testArgsWithPorts(httpPort, grpcPort),
279+
args: testArgs(),
280+
newArgs: testArgs(),
282281
restartRequired: false,
283282
},
284283
{
285284
name: "change in address requires server restart",
286-
args: testArgsWithPorts(httpPort, grpcPort),
287-
newArgs: testArgsWith(t, func(args *Arguments) {
285+
args: testArgs(),
286+
newArgs: testArgsWith(func(args *Arguments) {
288287
args.Server.HTTP.ListenAddress = "localhost"
289-
args.Server.HTTP.ListenPort = httpPort
290-
args.Server.GRPC.ListenPort = grpcPort
291288
}),
292289
restartRequired: true,
293290
},
294291
{
295292
name: "change in port requires server restart",
296-
args: testArgsWithPorts(httpPort, grpcPort),
297-
newArgs: testArgsWithPorts(getFreePort(t, httpPort, grpcPort), grpcPort),
293+
args: testArgs(),
294+
changeHttpPort: true,
295+
newArgs: testArgs(),
298296
restartRequired: true,
299297
},
300298
{
301299
name: "change in forwardTo does not require server restart",
302-
args: testArgsWithPorts(httpPort, grpcPort),
303-
newArgs: testArgsWith(t, func(args *Arguments) {
300+
args: testArgs(),
301+
newArgs: testArgsWith(func(args *Arguments) {
304302
args.ForwardTo = []loki.LogsReceiver{}
305-
args.Server.HTTP.ListenPort = httpPort
306-
args.Server.GRPC.ListenPort = grpcPort
307303
}),
308304
restartRequired: false,
309305
},
310306
{
311307
name: "change in labels does not require server restart",
312-
args: testArgsWithPorts(httpPort, grpcPort),
313-
newArgs: testArgsWith(t, func(args *Arguments) {
308+
args: testArgs(),
309+
newArgs: testArgsWith(func(args *Arguments) {
314310
args.Labels = map[string]string{"some": "label"}
315-
args.Server.HTTP.ListenPort = httpPort
316-
args.Server.GRPC.ListenPort = grpcPort
317311
}),
318312
restartRequired: false,
319313
},
320314
{
321315
name: "change in relabel rules does not require server restart",
322-
args: testArgsWithPorts(httpPort, grpcPort),
323-
newArgs: testArgsWith(t, func(args *Arguments) {
316+
args: testArgs(),
317+
newArgs: testArgsWith(func(args *Arguments) {
324318
args.RelabelRules = relabel.Rules{}
325-
args.Server.HTTP.ListenPort = httpPort
326-
args.Server.GRPC.ListenPort = grpcPort
327319
}),
328320
restartRequired: false,
329321
},
330322
{
331323
name: "change in use incoming timestamp does not require server restart",
332-
args: testArgsWithPorts(httpPort, grpcPort),
333-
newArgs: testArgsWith(t, func(args *Arguments) {
324+
args: testArgs(),
325+
newArgs: testArgsWith(func(args *Arguments) {
334326
args.UseIncomingTimestamp = !args.UseIncomingTimestamp
335-
args.Server.HTTP.ListenPort = httpPort
336-
args.Server.GRPC.ListenPort = grpcPort
337327
}),
338328
restartRequired: false,
339329
},
@@ -346,6 +336,13 @@ func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
346336
comp := startTestComponent(t, defaultOptions(), tc.args, ctx)
347337

348338
serverBefore := comp.server
339+
340+
if tc.changeHttpPort {
341+
httpPort, err := freeport.GetFreePort()
342+
require.NoError(t, err)
343+
tc.newArgs.Server.HTTP.ListenPort = httpPort
344+
}
345+
349346
require.NoError(t, comp.Update(tc.newArgs))
350347

351348
restarted := serverBefore != comp.server
@@ -368,8 +365,7 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
368365
receiver := fake.NewClient(func() {})
369366
defer receiver.Stop()
370367

371-
args := testArgsWith(t, func(a *Arguments) {
372-
a.Server.HTTP.ListenPort = getFreePort(t)
368+
args := testArgsWith(func(a *Arguments) {
373369
a.Server.HTTP.TLSConfig = &fnet.TLSConfig{
374370
Cert: testCert,
375371
Key: alloytypes.Secret(testKey),
@@ -378,10 +374,10 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
378374
a.UseIncomingTimestamp = true
379375
})
380376
opts := defaultOptions()
381-
_ = startTestComponent(t, opts, args, ctx)
377+
c := startTestComponent(t, opts, args, ctx)
382378

383379
// Create TLS-enabled Loki client
384-
lokiClient := newTestLokiClientTLS(t, args, opts)
380+
lokiClient := newTestLokiClientTLS(t, c.server.HTTPListenAddress(), opts)
385381
defer lokiClient.Stop()
386382

387383
now := time.Now()
@@ -396,7 +392,9 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
396392

397393
require.Eventually(
398394
t,
399-
func() bool { return len(receiver.Received()) == 1 },
395+
func() bool {
396+
return len(receiver.Received()) == 1
397+
},
400398
10*time.Second,
401399
10*time.Millisecond,
402400
"did not receive the forwarded message within the timeout",
@@ -412,12 +410,11 @@ func TestLokiSourceAPI_TLS(t *testing.T) {
412410
}
413411

414412
// newTestLokiClientTLS creates a Loki client configured for TLS connections
415-
func newTestLokiClientTLS(t *testing.T, args Arguments, opts component.Options) client.Client {
413+
func newTestLokiClientTLS(t *testing.T, httpListenAddress string, opts component.Options) client.Client {
416414
url := flagext.URLValue{}
417415
err := url.Set(fmt.Sprintf(
418-
"https://%s:%d/api/v1/push",
419-
args.Server.HTTP.ListenAddress,
420-
args.Server.HTTP.ListenPort,
416+
"https://%s/api/v1/push",
417+
httpListenAddress,
421418
))
422419
require.NoError(t, err)
423420

@@ -439,7 +436,7 @@ func newTestLokiClientTLS(t *testing.T, args Arguments, opts component.Options)
439436
}
440437

441438
func TestDefaultServerConfig(t *testing.T) {
442-
args := testArgs(t)
439+
args := testArgs()
443440
args.Server = nil // user did not define server options
444441

445442
comp, err := New(
@@ -484,7 +481,7 @@ func startTestComponent(
484481
}
485482

486483
func TestShutdown(t *testing.T) {
487-
args := testArgsWith(t, func(a *Arguments) {
484+
args := testArgsWith(func(a *Arguments) {
488485
a.Server.GracefulShutdownTimeout = 5 * time.Second
489486
a.ForwardTo = []loki.LogsReceiver{loki.NewLogsReceiver()}
490487
})
@@ -503,13 +500,13 @@ func TestShutdown(t *testing.T) {
503500
waitForServerToBeReady(t, comp)
504501

505502
// First request should be forwarded on channel
506-
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), args))
503+
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
507504
require.NoError(t, err)
508505

509506
codes := make(chan int)
510507
for range 5 {
511508
go func() {
512-
res, err := http.DefaultClient.Do(newRequest(t, context.Background(), args))
509+
res, err := http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
513510
if err != nil || res == nil {
514511
// This should not happen but if it does we return -1 here so test will fail.
515512
codes <- -1
@@ -537,7 +534,7 @@ func TestShutdown(t *testing.T) {
537534
}
538535

539536
func TestCancelRequest(t *testing.T) {
540-
args := testArgsWith(t, func(a *Arguments) {
537+
args := testArgsWith(func(a *Arguments) {
541538
a.Server.GracefulShutdownTimeout = 5 * time.Second
542539
a.ForwardTo = []loki.LogsReceiver{loki.NewLogsReceiver()}
543540
})
@@ -556,15 +553,15 @@ func TestCancelRequest(t *testing.T) {
556553
waitForServerToBeReady(t, comp)
557554

558555
// First request should be forwarded on channel
559-
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), args))
556+
_, err = http.DefaultClient.Do(newRequest(t, context.Background(), comp.server.HTTPListenAddress()))
560557
require.NoError(t, err)
561558

562559
var wg sync.WaitGroup
563560
for range 5 {
564561
wg.Go(func() {
565562
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
566563
defer cancel()
567-
res, err := http.DefaultClient.Do(newRequest(t, ctx, args))
564+
res, err := http.DefaultClient.Do(newRequest(t, ctx, comp.server.HTTPListenAddress()))
568565
require.ErrorIs(t, err, context.DeadlineExceeded)
569566
require.Nil(t, res)
570567
})
@@ -573,52 +570,44 @@ func TestCancelRequest(t *testing.T) {
573570
wg.Wait()
574571
}
575572

576-
func newRequest(t *testing.T, ctx context.Context, args Arguments) *http.Request {
573+
func newRequest(t *testing.T, ctx context.Context, httpListendAddress string) *http.Request {
577574
body := bytes.Buffer{}
578575
err := util.SerializeProto(&body, &push.PushRequest{Streams: []push.Stream{{Labels: `{foo="foo"}`, Entries: []push.Entry{{Line: "line"}}}}}, util.RawSnappy)
579576
require.NoError(t, err)
580-
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s:%d/loki/api/v1/push", args.Server.HTTP.ListenAddress, args.Server.HTTP.ListenPort), &body)
577+
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s/loki/api/v1/push", httpListendAddress), &body)
581578
require.NoError(t, err)
582579
req.Header.Set("Content-Type", "application/x-protobuf")
583580
return req
584581
}
585582

586583
func waitForServerToBeReady(t *testing.T, comp *Component) {
587-
require.Eventuallyf(t, func() bool {
588-
// Determine if TLS is enabled to choose the right protocol
589-
protocol := "http"
590-
var tlsConfig *tls.Config
591-
592-
serverConfig := comp.server.ServerConfig()
593-
if serverConfig.HTTP.TLSConfig != nil {
594-
protocol = "https"
595-
tlsConfig = &tls.Config{
596-
InsecureSkipVerify: true,
597-
}
584+
// Determine if TLS is enabled to choose the right protocol
585+
protocol := "http"
586+
var tlsConfig *tls.Config
587+
588+
serverConfig := comp.server.ServerConfig()
589+
if serverConfig.HTTP.TLSConfig != nil {
590+
protocol = "https"
591+
tlsConfig = &tls.Config{
592+
InsecureSkipVerify: true,
598593
}
594+
}
599595

600-
url := fmt.Sprintf(
601-
"%s://%v:%d/wrong/url",
602-
protocol,
603-
serverConfig.HTTP.ListenAddress,
604-
serverConfig.HTTP.ListenPort,
605-
)
606-
607-
var resp *http.Response
608-
var err error
596+
url := fmt.Sprintf(
597+
"%s://%s/wrong/url",
598+
protocol,
599+
comp.server.HTTPListenAddress(),
600+
)
609601

610-
if protocol == "https" {
611-
client := &http.Client{
612-
Transport: &http.Transport{
613-
TLSClientConfig: tlsConfig,
614-
},
615-
Timeout: 1 * time.Second,
616-
}
617-
resp, err = client.Get(url)
618-
} else {
619-
client := &http.Client{Timeout: 1 * time.Second}
620-
resp, err = client.Get(url)
602+
client := &http.Client{Timeout: 1 * time.Second}
603+
if protocol == "https" {
604+
client.Transport = &http.Transport{
605+
TLSClientConfig: tlsConfig,
621606
}
607+
}
608+
609+
require.Eventuallyf(t, func() bool {
610+
resp, err := client.Get(url)
622611

623612
return err == nil && resp != nil && resp.StatusCode == 404
624613
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
@@ -661,16 +650,14 @@ func defaultOptions() component.Options {
661650
}
662651
}
663652

664-
func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments {
665-
a := testArgs(t)
666-
mutator(&a)
667-
return a
653+
func testArgs() Arguments {
654+
return testArgsWithPorts(0, 0)
668655
}
669656

670-
func testArgs(t *testing.T) Arguments {
671-
httpPort := getFreePort(t)
672-
grpPort := getFreePort(t, httpPort)
673-
return testArgsWithPorts(httpPort, grpPort)
657+
func testArgsWith(mutator func(arguments *Arguments)) Arguments {
658+
a := testArgsWithPorts(0, 0)
659+
mutator(&a)
660+
return a
674661
}
675662

676663
func testArgsWithPorts(httpPort int, grpcPort int) Arguments {
@@ -698,17 +685,3 @@ func testArgsWithPorts(httpPort int, grpcPort int) Arguments {
698685
MaxSendMessageSize: 100 * units.MiB,
699686
}
700687
}
701-
702-
func getFreePort(t *testing.T, exclude ...int) int {
703-
const maxRetries = 10
704-
for range maxRetries {
705-
port, err := freeport.GetFreePort()
706-
require.NoError(t, err)
707-
if !slices.Contains(exclude, port) {
708-
return port
709-
}
710-
}
711-
712-
t.Fatal("fail to get free port")
713-
return 0
714-
}

internal/component/loki/source/api/internal/lokipush/push_api_server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ func (s *PushAPIServer) ServerConfig() fnet.ServerConfig {
119119
return *s.serverConfig
120120
}
121121

122+
func (s *PushAPIServer) HTTPListenAddress() string {
123+
return s.server.HTTPListenAddr()
124+
}
125+
126+
func (s *PushAPIServer) GRPCListenAddress() string {
127+
return s.server.GRPCListenAddr()
128+
}
129+
122130
func (s *PushAPIServer) Shutdown() {
123131
level.Info(s.logger).Log("msg", "stopping push API server")
124132
// StopAndShutdown tries to gracefully shutdown.

0 commit comments

Comments
 (0)