Skip to content

Commit d9acd0c

Browse files
Separated logic for update|delete|insert one and many. Implemented insert batch sizing. Updated readme
1 parent d3e0d0d commit d9acd0c

File tree

5 files changed

+80
-29
lines changed

5 files changed

+80
-29
lines changed

README.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,11 @@ Environment Variables (Overrides):
117117
PERCONALOAD_DELETE_PERCENT % of ops that are DELETE
118118
PERCONALOAD_AGGREGATE_PERCENT % of ops that are AGGREGATE
119119
PERCONALOAD_TRANSACTION_PERCENT % of ops that are TRANSACTIONAL
120+
PERCONALOAD_BULK_INSERT_PERCENT % of ops that are BULK INSERTS
120121

121122
[Performance Optimization]
122123
PERCONALOAD_FIND_BATCH_SIZE Docs returned per cursor batch
124+
PERCONALOAD_INSERT_BATCH_SIZE Number of docs in batch bulk insert
123125
PERCONALOAD_FIND_LIMIT Max docs per Find query
124126
PERCONALOAD_INSERT_CACHE_SIZE Generator buffer size
125127
PERCONALOAD_OP_TIMEOUT_MS Soft timeout per DB op (ms)
@@ -192,13 +194,15 @@ You can override any setting in `config.yaml` using environment variables. This
192194
| `max_transaction_ops` | `PERCONALOAD_MAX_TRANSACTION_OPS` | Maximum number of operations to group into a single transaction block | `5` |
193195
| **Operation Ratios** | | (Must sum to ~100) | |
194196
| `find_percent` | `PERCONALOAD_FIND_PERCENT` | Percentage of Find operations | `50` |
195-
| `insert_percent` | `PERCONALOAD_INSERT_PERCENT` | Percentage of Insert operations (this is not related to the initial seed inserts) | `20` |
197+
| `insert_percent` | `PERCONALOAD_INSERT_PERCENT` | Percentage of Insert operations (this is not related to the initial seed inserts) | `10` |
198+
| `bulk_insert_percent ` | `PERCONALOAD_BULK_INSERT_PERCENT` | Percentage of Bulk Insert operations (this is not related to the initial seed inserts) | `10` |
196199
| `update_percent` | `PERCONALOAD_UPDATE_PERCENT` | Percentage of Update operations | `10` |
197200
| `delete_percent` | `PERCONALOAD_DELETE_PERCENT` | Percentage of Delete operations | `10` |
198201
| `aggregate_percent` | `PERCONALOAD_AGGREGATE_PERCENT` | Percentage of Aggregate operations | `5` |
199202
| `transaction_percent` | `PERCONALOAD_TRANSACTION_PERCENT` | Percentage of Transactional operations | `5` |
200203
| **Performance Optimization** | | | |
201204
| `find_batch_size` | `PERCONALOAD_FIND_BATCH_SIZE` | Documents returned per cursor batch | `100` |
205+
| `insert_batch_size` | `PERCONALOAD_INSERT_BATCH_SIZE` | Number of documents per insert batch | `100` |
202206
| `find_limit` | `PERCONALOAD_FIND_LIMIT` | Hard limit on documents per Find query | `10` |
203207
| `insert_cache_size` | `PERCONALOAD_INSERT_CACHE_SIZE` | Size of the document generation buffer | `1000` |
204208
| `op_timeout_ms` | `PERCONALOAD_OP_TIMEOUT_MS` | Soft timeout for individual DB operations (ms) | `500` |
@@ -380,7 +384,8 @@ By default, the tool comes preconfigured with the following workload distributio
380384
| Find | 50% |
381385
| Update | 20% |
382386
| Delete | 10% |
383-
| Insert | 10% |
387+
| Insert | 5% |
388+
| Bulk Inserts | 5% |
384389
| Aggregate | 5% |
385390
| Transaction | 5% |
386391

@@ -389,7 +394,7 @@ You can modify any of the values above to run different types of workloads.
389394
Please note:
390395

391396
* If `use_transactions: false`, the transaction_percent value is ignored.
392-
* If there are no aggregation queries defined in queries.json, the aggregate_percent value is also ignored.
397+
* If there are no aggregation queries defined in queries.json, the aggregate_percent value is also ignored.
393398
* Aggregate operations will only generate activity if at least one query with "operation": "aggregate" is defined in your active JSON query files.
394399
* The maximum number of operations within a transaction is defined in the config file via `max_transaction_ops` or the env var `PERCONALOAD_MAX_TRANSACTION_OPS`. The number of operations per transaction will be randomized, with the max number being set as explained above.
395400
* Multi-Collection Load: If multiple collections are defined in your collections_path, each worker will randomly select a collection for every operation. This includes operations within a transaction, allowing for cross-collection atomic updates.
@@ -418,12 +423,15 @@ These settings affect the efficiency of individual database operations and memor
418423
* **`find_batch_size`**: The number of documents returned per batch in a cursor.
419424
* *Tip:* Higher values reduce network round-trips but increase memory usage per worker.
420425
* *Default:* `10`
426+
* **`insert_batch_size`**: The number of documents to be inserted by bulk inserts.
427+
* *Default:* `10`
421428
* **`find_limit`**: The hard limit on documents returned for `find` operations.
422429
* *Default:* `5`
423430
* **`insert_cache_size`**: The buffer size for the document generator channel.
424431
* *Tip:* This decouples document generation from database insertion. A larger buffer ensures workers rarely wait for data generation logic.
425432
* *Default:* `1000`
426-
* **`Upserts`**: Any updateOne or updateMany operation in your query JSON files can include "upsert": true. This will cause MongoDB to create the document if no match is found for the filter.
433+
* **`upserts`**: Any updateOne or updateMany operation in your query JSON files can include "upsert": true. This will cause MongoDB to create the document if no match is found for the filter.
434+
427435

428436
#### Timeouts & Reliability
429437
Control how plgm reacts to network lag or database pressure.

cmd/plgm/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,11 @@ func main() {
6868
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_DELETE_PERCENT", "% of ops that are DELETE")
6969
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_AGGREGATE_PERCENT", "% of ops that are AGGREGATE")
7070
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_TRANSACTION_PERCENT", "% of ops that are TRANSACTIONAL")
71+
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_BULK_INSERT_PERCENT", "% of ops that are BULK INSERTS")
7172

7273
fmt.Fprintf(os.Stderr, "\n [Performance Optimization]\n")
7374
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_FIND_BATCH_SIZE", "Docs returned per cursor batch")
75+
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_INSERT_BATCH_SIZE", "Number of docs in batch bulk insert")
7476
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_FIND_LIMIT", "Max docs per Find query")
7577
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_INSERT_CACHE_SIZE", "Generator buffer size")
7678
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PERCONALOAD_OP_TIMEOUT_MS", "Soft timeout per DB op (ms)")

config.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ max_transaction_ops: 5
8585
find_percent: 50
8686
update_percent: 20
8787
delete_percent: 10
88-
insert_percent: 10
88+
insert_percent: 5
89+
bulk_insert_percent: 5
8990
aggregate_percent: 5
9091
transaction_percent: 5
9192

@@ -104,6 +105,7 @@ use_findone_for_limit_one: true # Optimization: use FindOne() if limit is 1.
104105

105106
# --- Write Operations ---
106107
insert_cache_size: 1000 # Size of the internal buffer for generated documents.
108+
insert_batch_size: 10 # Number of documents per InsertMany call
107109

108110
# --- Monitoring ---
109111
status_refresh_rate_sec: 1 # How often (seconds) to print the "AVG Ops/Sec" log line.

internal/config/config.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type AppConfig struct {
2828
InsertPercent int `yaml:"insert_percent"`
2929
AggregatePercent int `yaml:"aggregate_percent"`
3030
TransactionPercent int `yaml:"transaction_percent"`
31+
BulkInsertPercent int `yaml:"bulk_insert_percent"`
32+
InsertBatchSize int `yaml:"insert_batch_size"`
3133
UseTransactions bool `yaml:"use_transactions"`
3234
MaxTransactionOps int `yaml:"max_transaction_ops"`
3335
DebugMode bool `yaml:"debug_mode"`
@@ -82,6 +84,9 @@ func applyDefaults(cfg *AppConfig) {
8284
if cfg.FindBatchSize <= 0 {
8385
cfg.FindBatchSize = 10
8486
}
87+
if cfg.InsertBatchSize <= 0 {
88+
cfg.InsertBatchSize = 10
89+
}
8590
if cfg.FindLimit <= 0 {
8691
cfg.FindLimit = 5
8792
}
@@ -249,6 +254,16 @@ func applyEnvOverrides(cfg *AppConfig) {
249254
cfg.StatusRefreshRateSec = n
250255
}
251256
}
257+
if p := os.Getenv("PERCONALOAD_BULK_INSERT_PERCENT"); p != "" {
258+
if n, err := strconv.Atoi(p); err == nil && n >= 0 {
259+
cfg.BulkInsertPercent = n
260+
}
261+
}
262+
if v := os.Getenv("PERCONALOAD_INSERT_BATCH_SIZE"); v != "" {
263+
if n, err := strconv.Atoi(v); err == nil && n > 0 {
264+
cfg.InsertBatchSize = n
265+
}
266+
}
252267
}
253268

254269
func normalizePercentages(cfg *AppConfig) {
@@ -257,7 +272,7 @@ func normalizePercentages(cfg *AppConfig) {
257272
cfg.TransactionPercent = 0
258273
}
259274

260-
total := cfg.FindPercent + cfg.UpdatePercent + cfg.DeletePercent + cfg.InsertPercent + cfg.AggregatePercent + cfg.TransactionPercent
275+
total := cfg.FindPercent + cfg.UpdatePercent + cfg.DeletePercent + cfg.InsertPercent + cfg.AggregatePercent + cfg.TransactionPercent + cfg.BulkInsertPercent
261276
if total <= 0 {
262277
cfg.FindPercent = 100
263278
return
@@ -271,8 +286,9 @@ func normalizePercentages(cfg *AppConfig) {
271286
cfg.InsertPercent = int(float64(cfg.InsertPercent) * factor)
272287
cfg.AggregatePercent = int(float64(cfg.AggregatePercent) * factor)
273288
cfg.TransactionPercent = int(float64(cfg.TransactionPercent) * factor)
289+
cfg.BulkInsertPercent = int(float64(cfg.BulkInsertPercent) * factor)
274290

275-
finalTotal := cfg.FindPercent + cfg.UpdatePercent + cfg.DeletePercent + cfg.InsertPercent + cfg.AggregatePercent + cfg.TransactionPercent
291+
finalTotal := cfg.FindPercent + cfg.UpdatePercent + cfg.DeletePercent + cfg.InsertPercent + cfg.AggregatePercent + cfg.TransactionPercent + cfg.BulkInsertPercent
276292
if finalTotal != 100 {
277293
cfg.FindPercent += (100 - finalTotal)
278294
}

internal/mongo/runner.go

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ type workloadConfig struct {
4242

4343
var InsertDocumentCache chan map[string]interface{}
4444

45-
var operationTypes = []string{"find", "update", "delete", "insert", "aggregate", "transaction"}
45+
// base operation types for selection logic
46+
var operationTypes = []string{"find", "update", "delete", "insert", "insertMany", "aggregate", "transaction"}
4647

4748
func selectOperation(percentages map[string]int, rng *rand.Rand) string {
4849
if percentages == nil {
@@ -147,6 +148,19 @@ func generateInsertQuery(col config.CollectionDefinition, rng *rand.Rand, cfg *c
147148
}
148149
}
149150

151+
func generateInsertManyQuery(col config.CollectionDefinition, rng *rand.Rand, cfg *config.AppConfig) []interface{} {
152+
count := cfg.InsertBatchSize
153+
docs := make([]interface{}, count)
154+
for i := 0; i < count; i++ {
155+
select {
156+
case docs[i] = <-InsertDocumentCache:
157+
default:
158+
docs[i] = workloads.GenerateDocument(col, cfg)
159+
}
160+
}
161+
return docs
162+
}
163+
150164
func insertDocumentProducer(ctx context.Context, col config.CollectionDefinition, cacheSize int, cfg *config.AppConfig) {
151165
for {
152166
select {
@@ -176,33 +190,36 @@ func runTransaction(ctx context.Context, id int, wCfg workloadConfig, rng *rand.
176190
_, err = session.WithTransaction(ctx, func(sessCtx context.Context) (interface{}, error) {
177191
numOps := rng.Intn(wCfg.appConfig.MaxTransactionOps) + 1
178192
for i := 0; i < numOps; i++ {
179-
// Select random collection for this step of the transaction
180193
currentCol := wCfg.collections[rng.Intn(len(wCfg.collections))]
181-
182194
innerOp := selectOperation(wCfg.percentages, rng)
183195
if innerOp == "aggregate" || innerOp == "transaction" {
184196
innerOp = "find"
185197
}
186198

187199
var q config.QueryDefinition
200+
var insertManyDocs []interface{}
188201
var run bool
189202

190-
if innerOp == "insert" {
203+
switch innerOp {
204+
case "insert":
191205
q = generateInsertQuery(currentCol, rng, wCfg.appConfig)
192206
run = true
193-
} else {
207+
case "insertMany":
208+
insertManyDocs = generateInsertManyQuery(currentCol, rng, wCfg.appConfig)
209+
run = true
210+
default:
194211
q, run = selectRandomQueryByType(sessCtx, wCfg.database, innerOp, wCfg.queryMap, currentCol, wCfg.debug, rng, wCfg.primaryFilterField, wCfg.appConfig)
195212
}
196213

197214
if !run {
198215
continue
199216
}
200217

201-
coll := wCfg.database.Collection(q.Collection)
218+
coll := wCfg.database.Collection(currentCol.Name)
202219
filter := cloneMap(q.Filter)
203220
processRecursive(filter, rng)
204221

205-
switch q.Operation {
222+
switch innerOp {
206223
case "find":
207224
cursor, err := coll.Find(sessCtx, filter, options.Find().SetLimit(1))
208225
if err == nil {
@@ -216,10 +233,14 @@ func runTransaction(ctx context.Context, id int, wCfg workloadConfig, rng *rand.
216233
case "updateMany":
217234
opts := options.UpdateMany().SetUpsert(q.Upsert)
218235
_, err = coll.UpdateMany(sessCtx, filter, q.Update, opts)
219-
case "deleteOne", "deleteMany":
236+
case "deleteOne":
220237
_, err = coll.DeleteOne(sessCtx, filter)
238+
case "deleteMany":
239+
_, err = coll.DeleteMany(sessCtx, filter)
221240
case "insert":
222241
_, err = coll.InsertOne(sessCtx, q.Filter)
242+
case "insertMany":
243+
_, err = coll.InsertMany(sessCtx, insertManyDocs)
223244
}
224245

225246
if err != nil {
@@ -250,9 +271,7 @@ func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg wor
250271
default:
251272
}
252273

253-
// Pick random collection for this operation
254274
currentCol := wCfg.collections[rng.Intn(len(wCfg.collections))]
255-
256275
opType := selectOperation(wCfg.percentages, rng)
257276

258277
if opType == "transaction" {
@@ -264,12 +283,16 @@ func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg wor
264283
}
265284

266285
var q config.QueryDefinition
286+
var insertManyDocs []interface{}
267287
var run bool
268288

269289
switch opType {
270290
case "insert":
271291
q = generateInsertQuery(currentCol, rng, wCfg.appConfig)
272292
run = true
293+
case "insertMany":
294+
insertManyDocs = generateInsertManyQuery(currentCol, rng, wCfg.appConfig)
295+
run = true
273296
case "find", "updateOne", "updateMany", "deleteOne", "deleteMany", "aggregate":
274297
q, run = selectRandomQueryByType(dbOpCtx, wCfg.database, opType, wCfg.queryMap, currentCol, wCfg.debug, rng, wCfg.primaryFilterField, wCfg.appConfig)
275298
default:
@@ -281,9 +304,7 @@ func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg wor
281304
continue
282305
}
283306

284-
db := wCfg.database
285-
coll := db.Collection(q.Collection)
286-
307+
coll := wCfg.database.Collection(currentCol.Name)
287308
var filter map[string]interface{}
288309
var pipeline []interface{}
289310

@@ -292,16 +313,16 @@ func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg wor
292313
pipeline = cloned
293314
processRecursive(pipeline, rng)
294315
}
295-
} else {
316+
} else if opType != "insertMany" {
296317
filter = cloneMap(q.Filter)
297318
processRecursive(filter, rng)
298319
}
299320

300321
start := time.Now()
301322

302-
switch q.Operation {
323+
switch opType {
303324
case "find":
304-
limit := int64(q.Limit)
325+
limit := q.Limit
305326
if limit <= 0 {
306327
limit = wCfg.findLimit
307328
}
@@ -327,26 +348,28 @@ func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg wor
327348
_ = cursor.Close(dbOpCtx)
328349
}
329350
case "updateOne":
330-
// Use UpdateOne() for single document updates in v2
331351
opts := options.UpdateOne().SetUpsert(q.Upsert)
332352
_, err := coll.UpdateOne(dbOpCtx, filter, q.Update, opts)
333353
if err != nil && wCfg.debug {
334354
log.Printf("[Worker %d] UpdateOne error: %v", id, err)
335355
}
336356
case "updateMany":
337-
// Use UpdateMany() for multiple document updates in v2
338357
opts := options.UpdateMany().SetUpsert(q.Upsert)
339358
_, err := coll.UpdateMany(dbOpCtx, filter, q.Update, opts)
340359
if err != nil && wCfg.debug {
341360
log.Printf("[Worker %d] UpdateMany error: %v", id, err)
342361
}
343-
case "deleteOne", "deleteMany":
362+
case "deleteOne":
344363
coll.DeleteOne(dbOpCtx, filter)
364+
case "deleteMany":
365+
coll.DeleteMany(dbOpCtx, filter)
345366
case "insert":
346367
coll.InsertOne(dbOpCtx, q.Filter)
368+
case "insertMany":
369+
coll.InsertMany(dbOpCtx, insertManyDocs)
347370
}
348371

349-
wCfg.collector.Track(q.Operation, time.Since(start))
372+
wCfg.collector.Track(opType, time.Since(start))
350373
}
351374
}
352375

@@ -436,6 +459,7 @@ func RunWorkload(ctx context.Context, db *mongo.Database, collections []config.C
436459
"update": cfg.UpdatePercent,
437460
"delete": cfg.DeletePercent,
438461
"insert": cfg.InsertPercent,
462+
"insertMany": cfg.BulkInsertPercent,
439463
"aggregate": cfg.AggregatePercent,
440464
"transaction": cfg.TransactionPercent,
441465
},
@@ -455,7 +479,6 @@ func runContinuousWorkload(ctx context.Context, wCfg workloadConfig) error {
455479
workloadCtx, cancel := context.WithTimeout(ctx, wCfg.duration)
456480
defer cancel()
457481

458-
// Handle initial document producer for multiple collections
459482
for _, col := range wCfg.collections {
460483
go insertDocumentProducer(workloadCtx, col, wCfg.maxInsertCache, wCfg.appConfig)
461484
}
@@ -488,7 +511,7 @@ func runAllQueriesOnce(ctx context.Context, db *mongo.Database, queries []config
488511
wg.Add(1)
489512
go queryWorkerOnce(ctx, 1, tasks, &wg)
490513
for i, q := range queries {
491-
if q.Operation == "insert" {
514+
if q.Operation == "insert" || q.Operation == "insertMany" {
492515
continue
493516
}
494517
tasks <- &queryTask{

0 commit comments

Comments
 (0)