Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions test/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func BenchmarkV1BulkInsert100KDocs(b *testing.B) {
}

func bulkRead(b *testing.B, docSize int) {
db, col := setup(b)
_, col := setup(b)

// -----------------------------
// Prepare and insert documents
Expand All @@ -110,31 +110,24 @@ func bulkRead(b *testing.B, docSize int) {
require.NoError(b, err)

// -----------------------------------------
// Sub-benchmark 1: Read entire collection
// Sub-benchmark 1: Read entire collection using ReadDocuments
// -----------------------------------------
b.Run("ReadAllDocsOnce", func(b *testing.B) {
query := fmt.Sprintf("FOR d IN %s RETURN d", col.Name())
// Prepare keys for reading
keys := make([]string, docSize)
for j := 0; j < docSize; j++ {
keys[j] = fmt.Sprintf("doc_%d", j)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
cursor, err := db.Query(ctx, query, nil)
readDocs := make([]TestDoc, docSize)
_, _, err := col.ReadDocuments(ctx, keys, readDocs)
require.NoError(b, err)

count := 0
for {
var doc TestDoc
_, err := cursor.ReadDocument(ctx, &doc)
if driver.IsNoMoreDocuments(err) {
break
}
require.NoError(b, err)
count++
}
// require.Equal(b, docSize, count, "expected to read all documents")
_ = cursor.Close()
// sanity check
if count != docSize {
b.Fatalf("expected to read %d docs, got %d", docSize, count)
if len(readDocs) != docSize {
b.Fatalf("expected to read %d docs, got %d", docSize, len(readDocs))
}
}
})
Expand Down
1 change: 1 addition & 0 deletions v2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Add missing endpoints from authentication to v2
- Add missing endpoints from general-request-handling to v2
- Add benchmark tests for v1 and v2 to compare performance
- Add len() for Read methods

## [2.1.5](https://github.com/arangodb/go-driver/tree/v2.1.5) (2025-08-31)
- Add tasks endpoints to v2
Expand Down
2 changes: 2 additions & 0 deletions v2/arangodb/collection_documents_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ type CollectionDocumentCreate interface {
}

type CollectionDocumentCreateResponseReader interface {
shared.ReadAllReadable[CollectionDocumentCreateResponse]
Read() (CollectionDocumentCreateResponse, error)
Len() int
}

type CollectionDocumentCreateResponse struct {
Expand Down
70 changes: 67 additions & 3 deletions v2/arangodb/collection_documents_create_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"io"
"net/http"
"reflect"

"github.com/pkg/errors"

Expand Down Expand Up @@ -129,10 +130,19 @@ func newCollectionDocumentCreateResponseReader(array *connection.Array, options
c := &collectionDocumentCreateResponseReader{array: array, options: options}

if c.options != nil {
// Cache reflection types once during initialization for performance
if c.options.OldObject != nil {
c.oldType = reflect.TypeOf(c.options.OldObject).Elem()
}
if c.options.NewObject != nil {
c.newType = reflect.TypeOf(c.options.NewObject).Elem()
}

c.response.Old = newUnmarshalInto(c.options.OldObject)
c.response.New = newUnmarshalInto(c.options.NewObject)
}

c.ReadAllReader = shared.ReadAllReader[CollectionDocumentCreateResponse, *collectionDocumentCreateResponseReader]{Reader: c}
return c
}

Expand All @@ -147,22 +157,51 @@ type collectionDocumentCreateResponseReader struct {
Old *UnmarshalInto `json:"old,omitempty"`
New *UnmarshalInto `json:"new,omitempty"`
}
shared.ReadAllReader[CollectionDocumentCreateResponse, *collectionDocumentCreateResponseReader]

// Cache for len() method - allows Read() to work after Len() is called
cachedResults []CollectionDocumentCreateResponse
cachedErrors []error
cached bool
readIndex int // Track position in cache for Read() after Len()

// Performance: Cache reflection types to avoid repeated lookups
oldType reflect.Type
newType reflect.Type
}

func (c *collectionDocumentCreateResponseReader) Read() (CollectionDocumentCreateResponse, error) {
// If Len() was called, serve from cache
if c.cached {
if c.readIndex >= len(c.cachedResults) {
return CollectionDocumentCreateResponse{}, shared.NoMoreDocumentsError{}
}
result := c.cachedResults[c.readIndex]
err := c.cachedErrors[c.readIndex]
c.readIndex++
return result, err
}

// Normal streaming read
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Pointer Reuse in Collection Document Read

In the Read() method of collectionDocumentCreateResponseReader, the Old and New fields are directly assigned from c.options.OldObject and c.options.NewObject pointers without creating new instances for each document. This causes pointer reuse where all documents in the response share the same pointer references to the original option objects. This should use reflection to create new instances for each document, similar to the pattern used in collection_documents_delete_impl.go, collection_documents_replace_impl.go, and collection_documents_update_impl.go to avoid data being overwritten across iterations.

Fix in Cursor Fix in Web

if !c.array.More() {
return CollectionDocumentCreateResponse{}, shared.NoMoreDocumentsError{}
}

var meta CollectionDocumentCreateResponse

if c.options != nil {
meta.Old = c.options.OldObject
meta.New = c.options.NewObject
// Create new instances for each document to avoid pointer reuse
// Use cached types for performance
if c.oldType != nil {
meta.Old = reflect.New(c.oldType).Interface()
}
if c.newType != nil {
meta.New = reflect.New(c.newType).Interface()
}

c.response.DocumentMeta = &meta.DocumentMeta
c.response.ResponseStruct = &meta.ResponseStruct
c.response.Old = newUnmarshalInto(meta.Old)
c.response.New = newUnmarshalInto(meta.New)

if err := c.array.Unmarshal(&c.response); err != nil {
if err == io.EOF {
Expand All @@ -175,5 +214,30 @@ func (c *collectionDocumentCreateResponseReader) Read() (CollectionDocumentCreat
return meta, meta.AsArangoError()
}

// Copy data from the new instances back to the original option objects for backward compatibility
if c.options != nil {
if c.options.OldObject != nil && meta.Old != nil {
oldValue := reflect.ValueOf(meta.Old).Elem()
originalValue := reflect.ValueOf(c.options.OldObject).Elem()
originalValue.Set(oldValue)
}
if c.options.NewObject != nil && meta.New != nil {
newValue := reflect.ValueOf(meta.New).Elem()
originalValue := reflect.ValueOf(c.options.NewObject).Elem()
originalValue.Set(newValue)
}
}

return meta, nil
}

// Len returns the number of items in the response.
// After calling Len(), you can still use Read() to iterate through items.
func (c *collectionDocumentCreateResponseReader) Len() int {
if !c.cached {
c.cachedResults, c.cachedErrors = c.ReadAll()
c.cached = true
c.readIndex = 0 // Reset read position to allow Read() after Len()
}
return len(c.cachedResults)
}
3 changes: 3 additions & 0 deletions v2/arangodb/collection_documents_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ type CollectionDocumentDeleteResponse struct {
}

type CollectionDocumentDeleteResponseReader interface {
shared.ReadAllIntoReadable[CollectionDocumentDeleteResponse]
Read(i interface{}) (CollectionDocumentDeleteResponse, error)
Len() int
}

type CollectionDocumentDeleteOptions struct {
Expand All @@ -82,6 +84,7 @@ type CollectionDocumentDeleteOptions struct {
WithWaitForSync *bool

// Return additionally the complete previous revision of the changed document
// Should be a pointer to an object
OldObject interface{}

// If set to true, an empty object is returned as response if the document operation succeeds.
Expand Down
46 changes: 45 additions & 1 deletion v2/arangodb/collection_documents_delete_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"io"
"net/http"
"reflect"

"github.com/pkg/errors"

Expand All @@ -42,6 +43,7 @@ var _ CollectionDocumentDelete = &collectionDocumentDelete{}

type collectionDocumentDelete struct {
collection *collection
shared.ReadAllIntoReader[CollectionDocumentDeleteResponse, *collectionDocumentDeleteResponseReader]
}

func (c collectionDocumentDelete) DeleteDocument(ctx context.Context, key string) (CollectionDocumentDeleteResponse, error) {
Expand Down Expand Up @@ -103,6 +105,7 @@ func (c collectionDocumentDelete) DeleteDocumentsWithOptions(ctx context.Context
func newCollectionDocumentDeleteResponseReader(array *connection.Array, options *CollectionDocumentDeleteOptions) *collectionDocumentDeleteResponseReader {
c := &collectionDocumentDeleteResponseReader{array: array, options: options}

c.ReadAllIntoReader = shared.ReadAllIntoReader[CollectionDocumentDeleteResponse, *collectionDocumentDeleteResponseReader]{Reader: c}
return c
}

Expand All @@ -111,9 +114,28 @@ var _ CollectionDocumentDeleteResponseReader = &collectionDocumentDeleteResponse
type collectionDocumentDeleteResponseReader struct {
array *connection.Array
options *CollectionDocumentDeleteOptions
shared.ReadAllIntoReader[CollectionDocumentDeleteResponse, *collectionDocumentDeleteResponseReader]
// Cache for len() method - allows Read() to work after Len() is called
cachedResults []CollectionDocumentDeleteResponse
cachedErrors []error
cached bool
readIndex int // Track position in cache for Read() after Len()
}

func (c *collectionDocumentDeleteResponseReader) Read(i interface{}) (CollectionDocumentDeleteResponse, error) {
// If Len() was called, serve from cache
// Note: When serving from cache, the 'i' parameter is not populated with document data
if c.cached {
if c.readIndex >= len(c.cachedResults) {
return CollectionDocumentDeleteResponse{}, shared.NoMoreDocumentsError{}
}
result := c.cachedResults[c.readIndex]
err := c.cachedErrors[c.readIndex]
c.readIndex++
return result, err
}

// Normal streaming read
if !c.array.More() {
return CollectionDocumentDeleteResponse{}, shared.NoMoreDocumentsError{}
}
Expand Down Expand Up @@ -146,11 +168,33 @@ func (c *collectionDocumentDeleteResponseReader) Read(i interface{}) (Collection
}

if c.options != nil && c.options.OldObject != nil {
meta.Old = c.options.OldObject
// Create a new instance for each document to avoid reusing the same pointer
oldObjectType := reflect.TypeOf(c.options.OldObject).Elem()
meta.Old = reflect.New(oldObjectType).Interface()

// Extract old data into the new instance
if err := response.Object.Object.Extract("old").Inject(meta.Old); err != nil {
return CollectionDocumentDeleteResponse{}, err
}

// Copy data from the new instance to the original OldObject for backward compatibility
oldValue := reflect.ValueOf(meta.Old).Elem()
originalValue := reflect.ValueOf(c.options.OldObject).Elem()
originalValue.Set(oldValue)
}

return meta, nil
}

// Len returns the number of items in the response.
// After calling Len(), you can still use Read() to iterate through items.
// Note: When Read() serves from cache, the document data parameter is not populated.
func (c *collectionDocumentDeleteResponseReader) Len() int {
if !c.cached {
var dummySlice []interface{}
c.cachedResults, c.cachedErrors = c.ReadAll(&dummySlice)
c.cached = true
c.readIndex = 0 // Reset read position to allow Read() after Len()
}
return len(c.cachedResults)
}
2 changes: 2 additions & 0 deletions v2/arangodb/collection_documents_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type CollectionDocumentRead interface {

type CollectionDocumentReadResponseReader interface {
Read(i interface{}) (CollectionDocumentReadResponse, error)
shared.ReadAllIntoReadable[CollectionDocumentReadResponse]
Len() int
}

type CollectionDocumentReadResponse struct {
Expand Down
34 changes: 33 additions & 1 deletion v2/arangodb/collection_documents_read_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c collectionDocumentRead) ReadDocumentWithOptions(ctx context.Context, key

func newCollectionDocumentReadResponseReader(array *connection.Array, options *CollectionDocumentReadOptions) *collectionDocumentReadResponseReader {
c := &collectionDocumentReadResponseReader{array: array, options: options}

c.ReadAllIntoReader = shared.ReadAllIntoReader[CollectionDocumentReadResponse, *collectionDocumentReadResponseReader]{Reader: c}
return c
}

Expand All @@ -106,9 +106,28 @@ var _ CollectionDocumentReadResponseReader = &collectionDocumentReadResponseRead
type collectionDocumentReadResponseReader struct {
array *connection.Array
options *CollectionDocumentReadOptions
shared.ReadAllIntoReader[CollectionDocumentReadResponse, *collectionDocumentReadResponseReader]
// Cache for len() method - allows Read() to work after Len() is called
cachedResults []CollectionDocumentReadResponse
cachedErrors []error
cached bool
readIndex int // Track position in cache for Read() after Len()
}

func (c *collectionDocumentReadResponseReader) Read(i interface{}) (CollectionDocumentReadResponse, error) {
// If Len() was called, serve from cache
// Note: When serving from cache, the 'i' parameter is not populated with document data
if c.cached {
if c.readIndex >= len(c.cachedResults) {
return CollectionDocumentReadResponse{}, shared.NoMoreDocumentsError{}
}
result := c.cachedResults[c.readIndex]
err := c.cachedErrors[c.readIndex]
c.readIndex++
return result, err
}

// Normal streaming read
if !c.array.More() {
return CollectionDocumentReadResponse{}, shared.NoMoreDocumentsError{}
}
Expand Down Expand Up @@ -142,3 +161,16 @@ func (c *collectionDocumentReadResponseReader) Read(i interface{}) (CollectionDo

return meta, nil
}

// Len returns the number of items in the response.
// After calling Len(), you can still use Read() to iterate through items.
// Note: When Read() serves from cache, the document data parameter is not populated.
func (c *collectionDocumentReadResponseReader) Len() int {
if !c.cached {
var dummySlice []interface{}
c.cachedResults, c.cachedErrors = c.ReadAll(&dummySlice)
c.cached = true
c.readIndex = 0 // Reset read position to allow Read() after Len()
}
return len(c.cachedResults)
}
2 changes: 2 additions & 0 deletions v2/arangodb/collection_documents_replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ type CollectionDocumentReplace interface {
}

type CollectionDocumentReplaceResponseReader interface {
shared.ReadAllReadable[CollectionDocumentReplaceResponse]
Read() (CollectionDocumentReplaceResponse, error)
Len() int
}

type CollectionDocumentReplaceResponse struct {
Expand Down
Loading