@@ -19,7 +19,9 @@ package gen
1919
2020import (
2121 "context"
22+ "crypto/rand"
2223 "encoding/json"
24+ "errors"
2325 "fmt"
2426 "io"
2527 "net/http"
@@ -54,14 +56,64 @@ func New(url, apikey string, kbc *kibana.Client, logger *zap.Logger) *Generator
5456 }
5557}
5658
57- func (g * Generator ) waitForAPMToBePublishReady (ctx context.Context , maxWaitDuration time.Duration ) error {
59+ // RunBlockingWait runs the underlying generator in blocking mode and waits for all in-flight
60+ // data to be flushed before proceeding. This allows the caller to ensure than 1m aggregation
61+ // metrics are ingested immediately after raw data ingestion, without variable delays.
62+ // This may lead to data loss if the final flush takes more than 30s, which may happen if the
63+ // quantity of data ingested with runBlocking gets too big. The current quantity does not
64+ // trigger this behavior.
65+ func (g * Generator ) RunBlockingWait (ctx context.Context , version ech.Version , integrations bool ) error {
66+ g .logger .Info ("wait for apm server to be ready" )
67+ if err := g .waitForAPMToBePublishReady (ctx ); err != nil {
68+ // If the APM server is not ready, we likely ran into an issue.
69+ // For example, see https://github.com/elastic/apm-server/issues/17605.
70+ // We can try to temporarily resolve it by re-applying the Elastic APM policy,
71+ // and wait again.
72+ //
73+ // NOTE: This retry only works if there is integrations server, otherwise
74+ // simply do nothing.
75+ if ! integrations {
76+ return fmt .Errorf ("failed to wait for apm server: %w" , err )
77+ }
78+ if err = g .reapplyAPMPolicy (ctx , version ); err != nil {
79+ return fmt .Errorf ("failed to re-apply apm policy: %w" , err )
80+ }
81+ if err = g .waitForAPMToBePublishReady (ctx ); err != nil {
82+ return fmt .Errorf ("failed to wait for apm server: %w" , err )
83+ }
84+ }
85+
86+ g .logger .Info ("ingest data" )
87+ if err := g .runBlocking (ctx , version ); err != nil {
88+ return fmt .Errorf ("cannot run generator: %w" , err )
89+ }
90+
91+ // With Fleet managed APM server, we can trigger metrics flush.
92+ if integrations {
93+ g .logger .Info ("flush apm metrics" )
94+ if err := g .flushAPMMetrics (ctx , version ); err != nil {
95+ return fmt .Errorf ("cannot flush apm metrics: %w" , err )
96+ }
97+ return nil
98+ }
99+
100+ // With standalone, we don't have Fleet, so simply just wait for some arbitrary time.
101+ time .Sleep (180 * time .Second )
102+ return nil
103+ }
104+
105+ // waitForAPMToBePublishReady waits for APM server to be publish-ready by querying the server.
106+ func (g * Generator ) waitForAPMToBePublishReady (ctx context.Context ) error {
107+ maxWaitDuration := 60 * time .Second
58108 timer := time .NewTimer (maxWaitDuration )
59109 defer timer .Stop ()
60110
61111 for {
62112 select {
113+ case <- ctx .Done ():
114+ return errors .New ("apm server not ready but context done" )
63115 case <- timer .C :
64- return fmt .Errorf ("apm server not yet ready after %s" , maxWaitDuration )
116+ return fmt .Errorf ("apm server not ready after %s" , maxWaitDuration )
65117 default :
66118 info , err := queryAPMInfo (ctx , g .apmServerURL , g .apmAPIKey )
67119 if err != nil {
@@ -72,15 +124,14 @@ func (g *Generator) waitForAPMToBePublishReady(ctx context.Context, maxWaitDurat
72124 return nil
73125 }
74126
75- time .Sleep (1 * time .Second )
127+ time .Sleep (10 * time .Second )
76128 }
77129 }
78130}
79131
80132// runBlocking runs the underlying generator in blocking mode.
81133func (g * Generator ) runBlocking (ctx context.Context , version ech.Version ) error {
82134 eventRate := "1000/s"
83-
84135 cfg := telemetrygen .DefaultConfig ()
85136 cfg .APIKey = g .apmAPIKey
86137 cfg .TargetStackVersion = supportedstacks .TargetStackVersionLatest
@@ -104,53 +155,31 @@ func (g *Generator) runBlocking(ctx context.Context, version ech.Version) error
104155 return fmt .Errorf ("cannot create telemetrygen generator: %w" , err )
105156 }
106157
107- g .logger .Info ("wait for apm server to be ready" )
108- if err = g .waitForAPMToBePublishReady (ctx , 30 * time .Second ); err != nil {
109- return err
110- }
111-
112- g .logger .Info ("ingest data" )
113158 gen .Logger = g .logger
114159 return gen .RunBlocking (ctx )
115160}
116161
117- // RunBlockingWait runs the underlying generator in blocking mode and waits for all in-flight
118- // data to be flushed before proceeding. This allows the caller to ensure than 1m aggregation
119- // metrics are ingested immediately after raw data ingestion, without variable delays.
120- // This may lead to data loss if the final flush takes more than 30s, which may happen if the
121- // quantity of data ingested with runBlocking gets too big. The current quantity does not
122- // trigger this behavior.
123- func (g * Generator ) RunBlockingWait (ctx context.Context , version ech.Version , integrations bool ) error {
124- if err := g .runBlocking (ctx , version ); err != nil {
125- return fmt .Errorf ("cannot run generator: %w" , err )
126- }
162+ func (g * Generator ) reapplyAPMPolicy (ctx context.Context , version ech.Version ) error {
163+ policyID := "elastic-cloud-apm"
164+ description := fmt .Sprintf ("%s %s" , version , rand .Text ()[5 :])
127165
128- // With Fleet managed APM server, we can trigger metrics flush.
129- if integrations {
130- if err := flushAPMMetrics (ctx , g .kbc , version ); err != nil {
131- return fmt .Errorf ("cannot flush apm metrics: %w" , err )
132- }
133- return nil
166+ if err := g .kbc .UpdatePackagePolicyDescriptionByID (ctx , policyID , version , description ); err != nil {
167+ return fmt .Errorf (
168+ "cannot update %s package policy description: %w" ,
169+ policyID , err ,
170+ )
134171 }
135172
136- // With standalone, we don't have Fleet, so simply just wait for some arbitrary time.
137- time .Sleep (180 * time .Second )
138173 return nil
139174}
140175
141176// flushAPMMetrics sends an update to the Fleet APM package policy in order
142177// to trigger the flushing of in-flight APM metrics.
143- func flushAPMMetrics (ctx context.Context , kbc * kibana.Client , version ech.Version ) error {
144- policyID := "elastic-cloud-apm"
145- description := fmt .Sprintf ("Integration server test %s" , version )
146-
147- // Sending an update with modifying the description is enough to trigger
148- // final aggregations in APM Server and flush of in-flight metrics.
149- if err := kbc .UpdatePackagePolicyDescriptionByID (ctx , policyID , version , description ); err != nil {
150- return fmt .Errorf (
151- "cannot update %s package policy description to flush aggregation metrics: %w" ,
152- policyID , err ,
153- )
178+ func (g * Generator ) flushAPMMetrics (ctx context.Context , version ech.Version ) error {
179+ // Re-applying the Elastic APM policy is enough to trigger final aggregations
180+ // in APM Server and flush of in-flight metrics.
181+ if err := g .reapplyAPMPolicy (ctx , version ); err != nil {
182+ return err
154183 }
155184
156185 // APM Server needs some time to flush all metrics, and we don't have any
0 commit comments