|
| 1 | +package writers_test |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "math/rand" |
| 6 | + "runtime" |
| 7 | + "sort" |
| 8 | + "strconv" |
| 9 | + "testing" |
| 10 | + |
| 11 | + "github.com/apache/arrow/go/v15/arrow" |
| 12 | + "github.com/apache/arrow/go/v15/arrow/array" |
| 13 | + "github.com/apache/arrow/go/v15/arrow/memory" |
| 14 | + "github.com/cloudquery/plugin-sdk/v4/message" |
| 15 | + "github.com/cloudquery/plugin-sdk/v4/schema" |
| 16 | + "github.com/cloudquery/plugin-sdk/v4/writers" |
| 17 | + "github.com/cloudquery/plugin-sdk/v4/writers/batchwriter" |
| 18 | + "github.com/cloudquery/plugin-sdk/v4/writers/mixedbatchwriter" |
| 19 | + "github.com/cloudquery/plugin-sdk/v4/writers/streamingbatchwriter" |
| 20 | + "golang.org/x/exp/maps" |
| 21 | +) |
| 22 | + |
| 23 | +type bCase struct { |
| 24 | + name string |
| 25 | + wr writers.Writer |
| 26 | + rec func() arrow.Record |
| 27 | +} |
| 28 | + |
| 29 | +func BenchmarkWriterMemory(b *testing.B) { |
| 30 | + batchwriterOpts := map[string][]batchwriter.Option{ |
| 31 | + "defaults": nil, |
| 32 | + "batch10k bytes100M": {batchwriter.WithBatchSizeBytes(100000000), batchwriter.WithBatchSize(10000)}, |
| 33 | + } |
| 34 | + mixedbatchwriterOpts := map[string][]mixedbatchwriter.Option{ |
| 35 | + "defaults": nil, |
| 36 | + "batch10k bytes100M": {mixedbatchwriter.WithBatchSizeBytes(100000000), mixedbatchwriter.WithBatchSize(10000)}, |
| 37 | + } |
| 38 | + streamingbatchwriterOpts := map[string][]streamingbatchwriter.Option{ |
| 39 | + "defaults": nil, |
| 40 | + "bytes100M": {streamingbatchwriter.WithBatchSizeBytes(100000000)}, |
| 41 | + } |
| 42 | + |
| 43 | + var bCases []bCase |
| 44 | + bCases = append(bCases, writerMatrix("BatchWriter", batchwriter.New, newBatchWriterClient(), makeRecord, batchwriterOpts)...) |
| 45 | + bCases = append(bCases, writerMatrix("BatchWriter wide", batchwriter.New, newBatchWriterClient(), makeWideRecord, batchwriterOpts)...) |
| 46 | + bCases = append(bCases, writerMatrix("MixedBatchWriter", mixedbatchwriter.New, newMixedBatchWriterClient(), makeRecord, mixedbatchwriterOpts)...) |
| 47 | + bCases = append(bCases, writerMatrix("MixedBatchWriter wide", mixedbatchwriter.New, newMixedBatchWriterClient(), makeWideRecord, mixedbatchwriterOpts)...) |
| 48 | + bCases = append(bCases, writerMatrix("StreamingBatchWriter", streamingbatchwriter.New, newStreamingBatchWriterClient(), makeRecord, streamingbatchwriterOpts)...) |
| 49 | + bCases = append(bCases, writerMatrix("StreamingBatchWriter wide", streamingbatchwriter.New, newStreamingBatchWriterClient(), makeWideRecord, streamingbatchwriterOpts)...) |
| 50 | + |
| 51 | + for _, c := range bCases { |
| 52 | + c := c |
| 53 | + b.Run(c.name, func(b *testing.B) { |
| 54 | + var ( |
| 55 | + mStart runtime.MemStats |
| 56 | + mEnd runtime.MemStats |
| 57 | + ) |
| 58 | + |
| 59 | + ch := make(chan message.WriteMessage) |
| 60 | + errCh := make(chan error) |
| 61 | + go func() { |
| 62 | + defer close(errCh) |
| 63 | + errCh <- c.wr.Write(context.Background(), ch) |
| 64 | + }() |
| 65 | + |
| 66 | + runtime.ReadMemStats(&mStart) |
| 67 | + b.ResetTimer() |
| 68 | + for i := 0; i < b.N; i++ { |
| 69 | + rec := c.rec() |
| 70 | + ch <- &message.WriteInsert{ |
| 71 | + Record: rec, |
| 72 | + } |
| 73 | + } |
| 74 | + close(ch) |
| 75 | + err := <-errCh |
| 76 | + |
| 77 | + b.StopTimer() |
| 78 | + |
| 79 | + if err != nil { |
| 80 | + b.Fatal(err) |
| 81 | + } |
| 82 | + |
| 83 | + runtime.ReadMemStats(&mEnd) |
| 84 | + |
| 85 | + allocatedBytes := mEnd.Alloc - mStart.Alloc |
| 86 | + b.ReportMetric(float64(allocatedBytes)/float64(b.N), "bytes/op") // this is different from -benchmem result "B/op" |
| 87 | + }) |
| 88 | + } |
| 89 | +} |
| 90 | + |
| 91 | +func makeRecord() func() arrow.Record { |
| 92 | + table := &schema.Table{ |
| 93 | + Name: "test_table", |
| 94 | + Columns: schema.ColumnList{ |
| 95 | + { |
| 96 | + Name: "col1", |
| 97 | + Type: arrow.BinaryTypes.String, |
| 98 | + }, |
| 99 | + }, |
| 100 | + } |
| 101 | + sc := table.ToArrowSchema() |
| 102 | + |
| 103 | + return func() arrow.Record { |
| 104 | + bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) |
| 105 | + bldr.Field(0).(*array.StringBuilder).Append("test") |
| 106 | + return bldr.NewRecord() |
| 107 | + } |
| 108 | +} |
| 109 | + |
| 110 | +func makeWideRecord() func() arrow.Record { |
| 111 | + table := &schema.Table{ |
| 112 | + Name: "test_wide_table", |
| 113 | + Columns: schema.ColumnList{ |
| 114 | + { |
| 115 | + Name: "col1", |
| 116 | + Type: arrow.BinaryTypes.String, |
| 117 | + }, |
| 118 | + }, |
| 119 | + } |
| 120 | + |
| 121 | + const numWideCols = 200 |
| 122 | + randVals := make([]int64, numWideCols) |
| 123 | + for i := 0; i < numWideCols; i++ { |
| 124 | + table.Columns = append(table.Columns, schema.Column{ |
| 125 | + Name: "wide_col" + strconv.Itoa(i), |
| 126 | + Type: arrow.PrimitiveTypes.Int64, |
| 127 | + }) |
| 128 | + randVals[i] = rand.Int63() |
| 129 | + } |
| 130 | + sc := table.ToArrowSchema() |
| 131 | + |
| 132 | + return func() arrow.Record { |
| 133 | + bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) |
| 134 | + bldr.Field(0).(*array.StringBuilder).Append("test") |
| 135 | + for i := 0; i < numWideCols; i++ { |
| 136 | + bldr.Field(i + 1).(*array.Int64Builder).Append(randVals[i]) |
| 137 | + } |
| 138 | + return bldr.NewRecord() |
| 139 | + } |
| 140 | +} |
| 141 | + |
| 142 | +func writerMatrix[T writers.Writer, C any, O ~func(T)](prefix string, constructor func(C, ...O) (T, error), client C, recordMaker func() func() arrow.Record, optsMatrix map[string][]O) []bCase { |
| 143 | + bCases := make([]bCase, 0, len(optsMatrix)) |
| 144 | + |
| 145 | + k := maps.Keys(optsMatrix) |
| 146 | + sort.Strings(k) |
| 147 | + |
| 148 | + for _, name := range k { |
| 149 | + opts := optsMatrix[name] |
| 150 | + wr, err := constructor(client, opts...) |
| 151 | + if err != nil { |
| 152 | + panic(err) |
| 153 | + } |
| 154 | + bCases = append(bCases, bCase{ |
| 155 | + name: prefix + " " + name, |
| 156 | + wr: wr, |
| 157 | + rec: recordMaker(), |
| 158 | + }) |
| 159 | + } |
| 160 | + return bCases |
| 161 | +} |
| 162 | + |
| 163 | +type mixedbatchwriterClient struct { |
| 164 | + mixedbatchwriter.IgnoreMigrateTableBatch |
| 165 | + mixedbatchwriter.UnimplementedDeleteStaleBatch |
| 166 | + mixedbatchwriter.UnimplementedDeleteRecordsBatch |
| 167 | +} |
| 168 | + |
| 169 | +func newMixedBatchWriterClient() mixedbatchwriter.Client { |
| 170 | + return &mixedbatchwriterClient{} |
| 171 | +} |
| 172 | + |
| 173 | +func (mixedbatchwriterClient) InsertBatch(_ context.Context, msgs message.WriteInserts) error { |
| 174 | + for _, m := range msgs { |
| 175 | + m.Record.Release() |
| 176 | + } |
| 177 | + return nil |
| 178 | +} |
| 179 | + |
| 180 | +var _ mixedbatchwriter.Client = (*mixedbatchwriterClient)(nil) |
| 181 | + |
| 182 | +type batchwriterClient struct { |
| 183 | + batchwriter.IgnoreMigrateTables |
| 184 | + batchwriter.UnimplementedDeleteStale |
| 185 | + batchwriter.UnimplementedDeleteRecord |
| 186 | +} |
| 187 | + |
| 188 | +func newBatchWriterClient() batchwriter.Client { |
| 189 | + return &batchwriterClient{} |
| 190 | +} |
| 191 | + |
| 192 | +func (batchwriterClient) WriteTableBatch(_ context.Context, _ string, msgs message.WriteInserts) error { |
| 193 | + for _, m := range msgs { |
| 194 | + m.Record.Release() |
| 195 | + } |
| 196 | + return nil |
| 197 | +} |
| 198 | + |
| 199 | +var _ batchwriter.Client = (*batchwriterClient)(nil) |
| 200 | + |
| 201 | +type streamingbatchwriterClient struct { |
| 202 | + streamingbatchwriter.IgnoreMigrateTable |
| 203 | + streamingbatchwriter.UnimplementedDeleteStale |
| 204 | + streamingbatchwriter.UnimplementedDeleteRecords |
| 205 | +} |
| 206 | + |
| 207 | +func newStreamingBatchWriterClient() streamingbatchwriter.Client { |
| 208 | + return &streamingbatchwriterClient{} |
| 209 | +} |
| 210 | + |
| 211 | +func (streamingbatchwriterClient) WriteTable(_ context.Context, ch <-chan *message.WriteInsert) error { |
| 212 | + for m := range ch { |
| 213 | + m.Record.Release() |
| 214 | + } |
| 215 | + return nil |
| 216 | +} |
| 217 | + |
| 218 | +var _ streamingbatchwriter.Client = (*streamingbatchwriterClient)(nil) |
0 commit comments