@@ -61,17 +61,17 @@ type WorkloadDesc struct {
6161 Write WriteDesc `yaml:"write_options"`
6262}
6363
64- type timeseries struct {
64+ type Timeseries struct {
6565 labelSets [][]prompb.Label
6666 lastValue float64
6767 seriesType SeriesType
6868}
6969
70- type writeWorkload struct {
71- replicas int
72- series []* timeseries
73- totalSeries int
74- totalSeriesTypeMap map [SeriesType ]int
70+ type WriteWorkload struct {
71+ Replicas int
72+ Series []* Timeseries
73+ TotalSeries int
74+ TotalSeriesTypeMap map [SeriesType ]int
7575
7676 missedIterations prometheus.Counter
7777
@@ -80,18 +80,56 @@ type writeWorkload struct {
8080 seriesBufferChan chan []prompb.TimeSeries
8181}
8282
83- func newWriteWorkload (workloadDesc WorkloadDesc , reg prometheus.Registerer ) * writeWorkload {
83+ func newWriteWorkload (workloadDesc WorkloadDesc , reg prometheus.Registerer ) * WriteWorkload {
84+ series , totalSeriesTypeMap := SeriesDescToSeries (workloadDesc .Series )
85+
8486 totalSeries := 0
87+ for _ , typeTotal := range totalSeriesTypeMap {
88+ totalSeries += typeTotal
89+ }
90+
91+ // Set batch size to 500 samples if not set
92+ if workloadDesc .Write .BatchSize == 0 {
93+ workloadDesc .Write .BatchSize = 500
94+ }
95+
96+ // Set the write interval to 15 seconds if not set
97+ if workloadDesc .Write .Interval == 0 {
98+ workloadDesc .Write .Interval = time .Second * 15
99+ }
100+
101+ // Set the write timeout to 15 seconds if not set
102+ if workloadDesc .Write .Timeout == 0 {
103+ workloadDesc .Write .Timeout = time .Second * 15
104+ }
105+
106+ return & WriteWorkload {
107+ Replicas : workloadDesc .Replicas ,
108+ Series : series ,
109+ TotalSeries : totalSeries ,
110+ TotalSeriesTypeMap : totalSeriesTypeMap ,
111+ options : workloadDesc .Write ,
112+
113+ missedIterations : promauto .With (reg ).NewCounter (
114+ prometheus.CounterOpts {
115+ Namespace : "benchtool" ,
116+ Name : "write_iterations_late_total" ,
117+ Help : "Number of write intervals started late because the previous interval did not complete in time." ,
118+ },
119+ ),
120+ }
121+ }
122+
123+ func SeriesDescToSeries (seriesDescs []SeriesDesc ) ([]* Timeseries , map [SeriesType ]int ) {
124+ series := []* Timeseries {}
85125 totalSeriesTypeMap := map [SeriesType ]int {
86126 GaugeZero : 0 ,
87127 GaugeRandom : 0 ,
88128 CounterOne : 0 ,
89129 CounterRandom : 0 ,
90130 }
91131
92- series := []* timeseries {}
93-
94- for _ , seriesDesc := range workloadDesc .Series {
132+ for _ , seriesDesc := range seriesDescs {
95133 // Create the metric with a name value
96134 labelSets := [][]prompb.Label {
97135 {
@@ -109,45 +147,15 @@ func newWriteWorkload(workloadDesc WorkloadDesc, reg prometheus.Registerer) *wri
109147 labelSets = addLabelToLabelSet (labelSets , lbl )
110148 }
111149
112- series = append (series , & timeseries {
150+ series = append (series , & Timeseries {
113151 labelSets : labelSets ,
114152 seriesType : seriesDesc .Type ,
115153 })
116154 numSeries := len (labelSets )
117- totalSeries += numSeries
118155 totalSeriesTypeMap [seriesDesc .Type ] += numSeries
119156 }
120157
121- // Set batch size to 500 samples if not set
122- if workloadDesc .Write .BatchSize == 0 {
123- workloadDesc .Write .BatchSize = 500
124- }
125-
126- // Set the write interval to 15 seconds if not set
127- if workloadDesc .Write .Interval == 0 {
128- workloadDesc .Write .Interval = time .Second * 15
129- }
130-
131- // Set the write timeout to 15 seconds if not set
132- if workloadDesc .Write .Timeout == 0 {
133- workloadDesc .Write .Timeout = time .Second * 15
134- }
135-
136- return & writeWorkload {
137- replicas : workloadDesc .Replicas ,
138- series : series ,
139- totalSeries : totalSeries ,
140- totalSeriesTypeMap : totalSeriesTypeMap ,
141- options : workloadDesc .Write ,
142-
143- missedIterations : promauto .With (reg ).NewCounter (
144- prometheus.CounterOpts {
145- Namespace : "benchtool" ,
146- Name : "write_iterations_late_total" ,
147- Help : "Number of write intervals started late because the previous interval did not complete in time." ,
148- },
149- ),
150- }
158+ return series , totalSeriesTypeMap
151159}
152160
153161func addLabelToLabelSet (labelSets [][]prompb.Label , lbl LabelDesc ) [][]prompb.Label {
@@ -167,14 +175,14 @@ func addLabelToLabelSet(labelSets [][]prompb.Label, lbl LabelDesc) [][]prompb.La
167175 return newLabelSets
168176}
169177
170- func (w * writeWorkload ) generateTimeSeries (id string , t time.Time ) []prompb.TimeSeries {
178+ func (w * WriteWorkload ) GenerateTimeSeries (id string , t time.Time ) []prompb.TimeSeries {
171179 now := t .UnixNano () / int64 (time .Millisecond )
172180
173- timeseries := make ([]prompb.TimeSeries , 0 , w .replicas * w .totalSeries )
174- for replicaNum := 0 ; replicaNum < w .replicas ; replicaNum ++ {
181+ timeseries := make ([]prompb.TimeSeries , 0 , w .Replicas * w .TotalSeries )
182+ for replicaNum := 0 ; replicaNum < w .Replicas ; replicaNum ++ {
175183 replicaLabel := prompb.Label {Name : "bench_replica" , Value : fmt .Sprintf ("replica-%05d" , replicaNum )}
176184 idLabel := prompb.Label {Name : "bench_id" , Value : id }
177- for _ , series := range w .series {
185+ for _ , series := range w .Series {
178186 var value float64
179187 switch series .seriesType {
180188 case GaugeZero :
@@ -215,7 +223,7 @@ type batchReq struct {
215223 putBack chan []prompb.TimeSeries
216224}
217225
218- func (w * writeWorkload ) getSeriesBuffer (ctx context.Context ) []prompb.TimeSeries {
226+ func (w * WriteWorkload ) getSeriesBuffer (ctx context.Context ) []prompb.TimeSeries {
219227 select {
220228 case <- ctx .Done ():
221229 return nil
@@ -224,7 +232,7 @@ func (w *writeWorkload) getSeriesBuffer(ctx context.Context) []prompb.TimeSeries
224232 }
225233}
226234
227- func (w * writeWorkload ) generateWriteBatch (ctx context.Context , id string , numBuffers int , seriesChan chan batchReq ) error {
235+ func (w * WriteWorkload ) generateWriteBatch (ctx context.Context , id string , numBuffers int , seriesChan chan batchReq ) error {
228236 w .seriesBufferChan = make (chan []prompb.TimeSeries , numBuffers )
229237 for i := 0 ; i < numBuffers ; i ++ {
230238 w .seriesBufferChan <- make ([]prompb.TimeSeries , 0 , w .options .BatchSize )
@@ -250,10 +258,10 @@ func (w *writeWorkload) generateWriteBatch(ctx context.Context, id string, numBu
250258 timeNow := time .Now ()
251259 timeNowMillis := timeNow .UnixNano () / int64 (time .Millisecond )
252260 wg := & sync.WaitGroup {}
253- for replicaNum := 0 ; replicaNum < w .replicas ; replicaNum ++ {
261+ for replicaNum := 0 ; replicaNum < w .Replicas ; replicaNum ++ {
254262 replicaLabel := prompb.Label {Name : "bench_replica" , Value : fmt .Sprintf ("replica-%05d" , replicaNum )}
255263 idLabel := prompb.Label {Name : "bench_id" , Value : id }
256- for _ , series := range w .series {
264+ for _ , series := range w .Series {
257265 var value float64
258266 switch series .seriesType {
259267 case GaugeZero :
0 commit comments