Skip to content

Commit 0db8092

Browse files
committed
fix: add support for bulk job enqueue; --bulk-from-file
1 parent 4cc5062 commit 0db8092

File tree

2 files changed

+69
-7
lines changed

2 files changed

+69
-7
lines changed

client.go

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,34 @@ package main
33
import (
44
"context"
55
"encoding/json"
6+
"os"
67

78
"github.com/jackc/pgx/v5"
9+
"github.com/jackc/pgx/v5/pgxpool"
810
"github.com/riverqueue/river"
911
"github.com/riverqueue/river/riverdriver/riverpgxv5"
1012
"github.com/rs/zerolog/log"
1113
)
1214

15+
type BulkJob struct {
16+
Args DPromptsJobArgs `json:"args"`
17+
Metadata map[string]interface{} `json:"metadata,omitempty"`
18+
}
19+
20+
// RunClient enqueues a job with args and metadata as JSON strings.
21+
func RunClient(ctx context.Context, driver *riverpgxv5.Driver, argsJSON string, metadataJSON string, bulkFile string, dbPool *pgxpool.Pool) {
22+
riverClient, err := newRiverClient(driver)
23+
if err != nil {
24+
log.Fatal().Err(err).Msg("Failed to create River client")
25+
}
26+
27+
if bulkFile != "" {
28+
if err := enqueueBulkJobsFromFile(ctx, riverClient, dbPool, bulkFile); err != nil {
29+
log.Fatal().Err(err).Msg("Bulk insert failed")
30+
}
31+
return
32+
}
1333

14-
func RunClient(ctx context.Context, driver *riverpgxv5.Driver, argsJSON string, metadataJSON string) {
1534
if argsJSON == "" {
1635
log.Fatal().Msg("Args JSON is required in client mode")
1736
}
@@ -36,17 +55,59 @@ func RunClient(ctx context.Context, driver *riverpgxv5.Driver, argsJSON string,
3655
}
3756
}
3857

39-
riverClient, err := newRiverClient(driver)
40-
if err != nil {
41-
log.Fatal().Err(err).Msg("Failed to create River client")
42-
}
43-
4458
if _, err := riverClient.Insert(ctx, &args, insertOpts); err != nil {
4559
log.Fatal().Err(err).Msg("Failed to enqueue job")
4660
}
4761
log.Info().Interface("args", args).Interface("metadata", insertOpts).Msg("Enqueued job")
4862
}
4963

64+
func enqueueBulkJobsFromFile(ctx context.Context, riverClient *river.Client[pgx.Tx], dbPool *pgxpool.Pool, filename string) error {
65+
file, err := os.Open(filename)
66+
if err != nil {
67+
return err
68+
}
69+
defer file.Close()
70+
71+
var jobs []BulkJob
72+
if err := json.NewDecoder(file).Decode(&jobs); err != nil {
73+
return err
74+
}
75+
76+
tx, err := dbPool.Begin(ctx)
77+
if err != nil {
78+
return err
79+
}
80+
defer tx.Rollback(ctx)
81+
82+
var jobsToInsert []river.InsertManyParams
83+
for _, job := range jobs {
84+
var insertOpts *river.InsertOpts
85+
if job.Metadata != nil {
86+
metadataBytes, err := json.Marshal(job.Metadata)
87+
if err != nil {
88+
return err
89+
}
90+
insertOpts = &river.InsertOpts{Metadata: metadataBytes}
91+
}
92+
jobsToInsert = append(jobsToInsert, river.InsertManyParams{
93+
Args: job.Args,
94+
InsertOpts: insertOpts,
95+
})
96+
}
97+
98+
results, err := riverClient.InsertManyTx(ctx, tx, jobsToInsert)
99+
if err != nil {
100+
return err
101+
}
102+
103+
if err := tx.Commit(ctx); err != nil {
104+
return err
105+
}
106+
107+
log.Info().Msgf("Successfully enqueued %d jobs", len(results))
108+
return nil
109+
}
110+
50111
func newRiverClient(driver *riverpgxv5.Driver) (*river.Client[pgx.Tx], error) {
51112
return river.NewClient[pgx.Tx](driver, &river.Config{})
52113
}

main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func main() {
1717
mode := flag.String("mode", "worker", "Mode: 'client' to enqueue a job, 'worker' to run worker")
1818
argsJSON := flag.String("args", "", "Job args as JSON (for client mode)")
1919
metadataJSON := flag.String("metadata", "", "Job metadata as JSON (for client mode)")
20+
bulkFile := flag.String("bulk-from-file", "", "Bulk insert jobs from JSON file")
2021
configPath := flag.String("config", "", "Path to config file (default: $HOME/.dprompt.toml)")
2122

2223
flag.Parse()
@@ -35,7 +36,7 @@ func main() {
3536

3637
switch *mode {
3738
case "client":
38-
RunClient(ctx, driver, *argsJSON, *metadataJSON)
39+
RunClient(ctx, driver, *argsJSON, *metadataJSON, *bulkFile, dbPool)
3940
case "worker":
4041
RunWorker(ctx, driver, cancel, dbPool)
4142
default:

0 commit comments

Comments
 (0)