99 "log"
1010 "runtime"
1111 "runtime/metrics"
12+ "slices"
1213 "sort"
1314 "strconv"
1415 "sync"
@@ -23,7 +24,7 @@ import (
2324// === Helper Functions ===
2425
2526// runBatchPutGetCycle performs a BatchWriteItem-BatchGetItem cycle with 25 items and measures performance
26- func (b * DBESDKBenchmark ) runBatchPutGetCycle (data []byte , baseItemId string ) (float64 , float64 , error ) {
27+ func (b * DBESDKBenchmark ) runBatchPutGetCycle (data []byte ) (float64 , float64 , error ) {
2728 ctx := context .Background ()
2829 tableName := b .Config .TableName
2930
@@ -81,7 +82,7 @@ func (b *DBESDKBenchmark) runBatchPutGetCycle(data []byte, baseItemId string) (f
8182
8283 for _ , item := range items {
8384 retrievedData , ok := item ["attribute1" ].(* types.AttributeValueMemberB )
84- if ! ok || len (retrievedData .Value ) != len ( data ) {
85+ if ! ok || ! slices . Equal (retrievedData .Value , data ) {
8586 return 0 , 0 , fmt .Errorf ("data verification failed" )
8687 }
8788 }
@@ -113,8 +114,7 @@ func (b *DBESDKBenchmark) runThroughputTest(dataSize int, iterations int) (*Benc
113114
114115 // Warmup
115116 for i := 0 ; i < b .Config .Iterations .Warmup ; i ++ {
116- itemId := fmt .Sprintf ("%d" , i ) // Use numeric string for Number attribute
117- if _ , _ , err := b .runBatchPutGetCycle (testData , itemId ); err != nil {
117+ if _ , _ , err := b .runBatchPutGetCycle (testData ); err != nil {
118118 return nil , fmt .Errorf ("warmup iteration %d failed: %w" , i , err )
119119 }
120120 }
@@ -131,9 +131,8 @@ func (b *DBESDKBenchmark) runThroughputTest(dataSize int, iterations int) (*Benc
131131
132132 startTime := time .Now ()
133133 for i := 0 ; i < iterations ; i ++ {
134- itemId := fmt .Sprintf ("%d" , 1000 + i ) // Use numeric string, offset to avoid warmup conflicts
135134 iterationStart := time .Now ()
136- putMs , getMs , err := b .runBatchPutGetCycle (testData , itemId )
135+ putMs , getMs , err := b .runBatchPutGetCycle (testData )
137136 if err != nil {
138137 return nil , fmt .Errorf ("measurement iteration %d failed: %w" , i , err )
139138 }
@@ -250,8 +249,7 @@ func (b *DBESDKBenchmark) runMemoryTest(dataSize int) (*BenchmarkResult, error)
250249
251250 // Run operation
252251 operationStart := time .Now ()
253- itemId := fmt .Sprintf ("%d" , 2000 + i ) // Use numeric string, offset to avoid conflicts
254- _ , _ , err := b .runBatchPutGetCycle (data , itemId )
252+ _ , _ , err := b .runBatchPutGetCycle (data )
255253 operationDuration := time .Since (operationStart )
256254
257255 close (stopSampling )
@@ -327,77 +325,77 @@ func (b *DBESDKBenchmark) runMemoryTest(dataSize int) (*BenchmarkResult, error)
327325// === Concurrent Test Implementation ===
328326
329327// runConcurrentTest runs concurrent operations benchmark test
330- // func (b *DBESDKBenchmark) runConcurrentTest(dataSize int, concurrency int, iterationsPerWorker int) (*BenchmarkResult, error) {
331- // log.Printf("Running concurrent test - Size: %d bytes, Concurrency: %d", dataSize, concurrency)
332-
333- // data := b.GenerateTestData(dataSize)
334- // var allTimes []float64
335- // var timesMutex sync.Mutex
336- // var wg sync.WaitGroup
337-
338- // errorChan := make(chan error, concurrency)
339- // startTime := time.Now()
340-
341- // // Launch workers
342- // for i := 0; i < concurrency; i++ {
343- // wg.Add(1)
344- // go func(workerID int) {
345- // defer wg.Done()
346-
347- // var workerTimes []float64
348- // for j := 0; j < iterationsPerWorker; j++ {
349- // iterStart := time.Now()
350- // _, _, err := b.runEncryptDecryptCycle (data)
351- // if err != nil {
352- // errorChan <- fmt.Errorf("worker %d iteration %d failed: %w", workerID, j, err)
353- // return
354- // }
355- // workerTimes = append(workerTimes, time.Since(iterStart).Seconds()*1000)
356- // }
357-
358- // timesMutex.Lock()
359- // allTimes = append(allTimes, workerTimes...)
360- // timesMutex.Unlock()
361- // }(i)
362- // }
363-
364- // wg.Wait()
365- // totalDuration := time.Since(startTime).Seconds()
366-
367- // // Check for errors
368- // select {
369- // case err := <-errorChan:
370- // return nil, err
371- // default:
372- // }
373-
374- // // Calculate metrics
375- // totalOps := concurrency * iterationsPerWorker
376- // totalBytes := int64(totalOps * dataSize)
377-
378- // sort.Float64s(allTimes)
379- // result := &BenchmarkResult{
380- // TestName: "concurrent",
381- // Language: "go",
382- // DataSize: dataSize,
383- // Concurrency: concurrency,
384- // EndToEndLatencyMs: Average(allTimes),
385- // OpsPerSecond: float64(totalOps) / totalDuration,
386- // BytesPerSecond: float64(totalBytes) / totalDuration,
387- // P50Latency: Percentile(allTimes, 0.50),
388- // P95Latency: Percentile(allTimes, 0.95),
389- // P99Latency: Percentile(allTimes, 0.99),
390- // Timestamp: time.Now().Format("2006-01-02 15:04:05"),
391- // GoVersion: runtime.Version(),
392- // CPUCount: b.CPUCount,
393- // TotalMemoryGB: b.TotalMemoryGB,
394- // }
395-
396- // log.Printf("Concurrent test completed - Ops/sec: %.2f, Avg latency: %.2f ms",
397- // result.OpsPerSecond, result.EndToEndLatencyMs)
398-
399- // return result, nil
400- // }
328+ func (b * DBESDKBenchmark ) runConcurrentTest (dataSize int , concurrency int , iterationsPerWorker int ) (* BenchmarkResult , error ) {
329+ log .Printf ("Running concurrent test - Size: %d bytes, Concurrency: %d" , dataSize , concurrency )
330+
331+ data := b .GenerateTestData (dataSize )
332+ var allTimes []float64
333+ var timesMutex sync.Mutex
334+ var wg sync.WaitGroup
335+
336+ errorChan := make (chan error , concurrency )
337+ startTime := time .Now ()
338+
339+ // Launch workers
340+ for i := 0 ; i < concurrency ; i ++ {
341+ wg .Add (1 )
342+ go func (workerID int ) {
343+ defer wg .Done ()
344+
345+ var workerTimes []float64
346+ for j := 0 ; j < iterationsPerWorker ; j ++ {
347+ iterStart := time .Now ()
348+ _ , _ , err := b .runBatchPutGetCycle (data )
349+ if err != nil {
350+ errorChan <- fmt .Errorf ("worker %d iteration %d failed: %w" , workerID , j , err )
351+ return
352+ }
353+ workerTimes = append (workerTimes , time .Since (iterStart ).Seconds ()* 1000 )
354+ }
355+
356+ timesMutex .Lock ()
357+ allTimes = append (allTimes , workerTimes ... )
358+ timesMutex .Unlock ()
359+ }(i )
360+ }
361+
362+ wg .Wait ()
363+ totalDuration := time .Since (startTime ).Seconds ()
364+
365+ // Check for errors
366+ select {
367+ case err := <- errorChan :
368+ return nil , err
369+ default :
370+ }
371+
372+ // Calculate metrics
373+ totalOps := concurrency * iterationsPerWorker
374+ totalBytes := int64 (totalOps * dataSize )
375+
376+ sort .Float64s (allTimes )
377+ result := & BenchmarkResult {
378+ TestName : "concurrent" ,
379+ Language : "go" ,
380+ DataSize : dataSize ,
381+ Concurrency : concurrency ,
382+ EndToEndLatencyMs : Average (allTimes ),
383+ OpsPerSecond : float64 (totalOps ) / totalDuration ,
384+ BytesPerSecond : float64 (totalBytes ) / totalDuration ,
385+ P50Latency : Percentile (allTimes , 0.50 ),
386+ P95Latency : Percentile (allTimes , 0.95 ),
387+ P99Latency : Percentile (allTimes , 0.99 ),
388+ Timestamp : time .Now ().Format ("2006-01-02 15:04:05" ),
389+ GoVersion : runtime .Version (),
390+ CPUCount : b .CPUCount ,
391+ TotalMemoryGB : b .TotalMemoryGB ,
392+ }
393+
394+ log .Printf ("Concurrent test completed - Ops/sec: %.2f, Avg latency: %.2f ms" ,
395+ result .OpsPerSecond , result .EndToEndLatencyMs )
396+
397+ return result , nil
398+ }
401399
402400// === Test Orchestration ===
403401
@@ -430,22 +428,22 @@ func (b *DBESDKBenchmark) runMemoryTests(dataSizes []int) {
430428}
431429
432430// runConcurrencyTests executes all concurrency tests
433- // func (b *DBESDKBenchmark) runConcurrencyTests(dataSizes []int, concurrencyLevels []int) {
434- // log.Println("Running concurrency tests...")
435- // for _, dataSize := range dataSizes {
436- // for _, concurrency := range concurrencyLevels {
437- // if concurrency > 1 { // Skip single-threaded
438- // result, err := b.runConcurrentTest(dataSize, concurrency, 5)
439- // if err != nil {
440- // log.Printf("Concurrent test failed: %v", err)
441- // continue
442- // }
443- // b.Results = append(b.Results, *result)
444- // log.Printf("Concurrent test completed: %.2f ops/sec @ %d threads", result.OpsPerSecond, concurrency)
445- // }
446- // }
447- // }
448- // }
431+ func (b * DBESDKBenchmark ) runConcurrencyTests (dataSizes []int , concurrencyLevels []int ) {
432+ log .Println ("Running concurrency tests..." )
433+ for _ , dataSize := range dataSizes {
434+ for _ , concurrency := range concurrencyLevels {
435+ if concurrency > 1 { // Skip single-threaded
436+ result , err := b .runConcurrentTest (dataSize , concurrency , 5 )
437+ if err != nil {
438+ log .Printf ("Concurrent test failed: %v" , err )
439+ continue
440+ }
441+ b .Results = append (b .Results , * result )
442+ log .Printf ("Concurrent test completed: %.2f ops/sec @ %d threads" , result .OpsPerSecond , concurrency )
443+ }
444+ }
445+ }
446+ }
449447
450448// RunAllBenchmarks runs all configured benchmark tests
451449func (b * DBESDKBenchmark ) RunAllBenchmarks () error {
@@ -470,11 +468,11 @@ func (b *DBESDKBenchmark) RunAllBenchmarks() error {
470468 log .Println ("Skipping memory tests (not in test_types)" )
471469 }
472470
473- // if b.shouldRunTestType("concurrency") {
474- // b.runConcurrencyTests(dataSizes, b.Config.ConcurrencyLevels)
475- // } else {
476- // log.Println("Skipping concurrency tests (not in test_types)")
477- // }
471+ if b .shouldRunTestType ("concurrency" ) {
472+ b .runConcurrencyTests (dataSizes , b .Config .ConcurrencyLevels )
473+ } else {
474+ log .Println ("Skipping concurrency tests (not in test_types)" )
475+ }
478476
479477 log .Printf ("Benchmark suite completed. Total results: %d" , len (b .Results ))
480478 return nil
0 commit comments