Skip to content

Commit b31a752

Browse files
authored
Cosmos DB: Add support for Span (Azure#23268)
* adding internal * tests * Fixing tests * adding spans * adding span validator * adding span validator * wiring through tests * bailing out * changelog * spelling * format * more format * Update sdk/data/azcosmos/CHANGELOG.md * Adding az.namespace * add constants * Wire new methods * tests * Update cosmos_client_test.go * Update cosmos_client_test.go * Fixing tests * notes * Refactoring to add properties * updating dependency * wiring attributes * Tests * adding more tests * last batch of tests * not logging 443
1 parent 969cfcd commit b31a752

24 files changed

+852
-133
lines changed

sdk/data/azcosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 1.0.4 (Unreleased)
44

55
### Features Added
6+
* Added support for OpenTelemetry trace spans. See [PR 23268](https://github.com/Azure/azure-sdk-for-go/pull/23268)
67

78
### Breaking Changes
89

sdk/data/azcosmos/cosmos_client.go

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ const (
2727

2828
// Client is used to interact with the Azure Cosmos DB database service.
2929
type Client struct {
30-
endpoint string
31-
pipeline azruntime.Pipeline
32-
gem *globalEndpointManager
30+
endpoint string
31+
internal *azcore.Client
32+
gem *globalEndpointManager
33+
endpointUrl *url.URL
3334
}
3435

3536
// Endpoint used to create the client.
@@ -42,24 +43,38 @@ func (c *Client) Endpoint() string {
4243
// cred - The credential used to authenticate with the cosmos service.
4344
// options - Optional Cosmos client options. Pass nil to accept default values.
4445
func NewClientWithKey(endpoint string, cred KeyCredential, o *ClientOptions) (*Client, error) {
46+
endpointUrl, err := url.Parse(endpoint)
47+
if err != nil {
48+
return nil, err
49+
}
4550
preferredRegions := []string{}
4651
enableCrossRegionRetries := true
4752
if o != nil {
4853
preferredRegions = o.PreferredRegions
4954
}
55+
5056
gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newSharedKeyCredPolicy(cred), o), preferredRegions, 0, enableCrossRegionRetries)
5157
if err != nil {
5258
return nil, err
5359
}
54-
return &Client{endpoint: endpoint, pipeline: newPipeline(newSharedKeyCredPolicy(cred), gem, o), gem: gem}, nil
60+
61+
internalClient, err := newClient(newSharedKeyCredPolicy(cred), gem, o)
62+
if err != nil {
63+
return nil, err
64+
}
65+
return &Client{endpoint: endpoint, endpointUrl: endpointUrl, internal: internalClient, gem: gem}, nil
5566
}
5667

5768
// NewClient creates a new instance of Cosmos client with Azure AD access token authentication. It uses the default pipeline configuration.
5869
// endpoint - The cosmos service endpoint to use.
5970
// cred - The credential used to authenticate with the cosmos service.
6071
// options - Optional Cosmos client options. Pass nil to accept default values.
6172
func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (*Client, error) {
62-
scope, err := createScopeFromEndpoint(endpoint)
73+
endpointUrl, err := url.Parse(endpoint)
74+
if err != nil {
75+
return nil, err
76+
}
77+
scope, err := createScopeFromEndpoint(endpointUrl)
6378
if err != nil {
6479
return nil, err
6580
}
@@ -72,7 +87,12 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
7287
if err != nil {
7388
return nil, err
7489
}
75-
return &Client{endpoint: endpoint, pipeline: newPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o), gem: gem}, nil
90+
91+
internalClient, err := newClient(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o)
92+
if err != nil {
93+
return nil, err
94+
}
95+
return &Client{endpoint: endpoint, endpointUrl: endpointUrl, internal: internalClient, gem: gem}, nil
7696
}
7797

7898
// NewClientFromConnectionString creates a new instance of Cosmos client from connection string. It uses the default pipeline configuration.
@@ -111,11 +131,11 @@ func NewClientFromConnectionString(connectionString string, o *ClientOptions) (*
111131
return NewClientWithKey(endpoint, cred, o)
112132
}
113133

114-
func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *ClientOptions) azruntime.Pipeline {
134+
func newClient(authPolicy policy.Policy, gem *globalEndpointManager, options *ClientOptions) (*azcore.Client, error) {
115135
if options == nil {
116136
options = &ClientOptions{}
117137
}
118-
return azruntime.NewPipeline("azcosmos", serviceLibVersion,
138+
return azcore.NewClient(moduleName, serviceLibVersion,
119139
azruntime.PipelineOptions{
120140
AllowedHeaders: getAllowedHeaders(),
121141
PerCall: []policy.Policy{
@@ -128,6 +148,9 @@ func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *
128148
authPolicy,
129149
&clientRetryPolicy{gem: gem},
130150
},
151+
Tracing: azruntime.TracingOptions{
152+
Namespace: "Microsoft.DocumentDB",
153+
},
131154
},
132155
&options.ClientOptions)
133156
}
@@ -136,7 +159,7 @@ func newInternalPipeline(authPolicy policy.Policy, options *ClientOptions) azrun
136159
if options == nil {
137160
options = &ClientOptions{}
138161
}
139-
return azruntime.NewPipeline("azcosmos", serviceLibVersion,
162+
return azruntime.NewPipeline(moduleName, serviceLibVersion,
140163
azruntime.PipelineOptions{
141164
AllowedHeaders: getAllowedHeaders(),
142165
PerRetry: []policy.Policy{
@@ -146,13 +169,8 @@ func newInternalPipeline(authPolicy policy.Policy, options *ClientOptions) azrun
146169
&options.ClientOptions)
147170
}
148171

149-
func createScopeFromEndpoint(endpoint string) ([]string, error) {
150-
u, err := url.Parse(endpoint)
151-
if err != nil {
152-
return nil, err
153-
}
154-
155-
return []string{fmt.Sprintf("%s://%s/.default", u.Scheme, u.Hostname())}, nil
172+
func createScopeFromEndpoint(endpoint *url.URL) ([]string, error) {
173+
return []string{fmt.Sprintf("%s://%s/.default", endpoint.Scheme, endpoint.Hostname())}, nil
156174
}
157175

158176
// NewDatabase returns a struct that represents a database and allows database level operations.
@@ -193,6 +211,14 @@ func (c *Client) CreateDatabase(
193211
ctx context.Context,
194212
databaseProperties DatabaseProperties,
195213
o *CreateDatabaseOptions) (DatabaseResponse, error) {
214+
var err error
215+
spanName, err := getSpanNameForDatabases(c.accountEndpointUrl(), operationTypeCreate, resourceTypeDatabase, databaseProperties.ID)
216+
if err != nil {
217+
return DatabaseResponse{}, err
218+
}
219+
ctx, endSpan := azruntime.StartSpan(ctx, spanName.name, c.internal.Tracer(), &spanName.options)
220+
defer func() { endSpan(err) }()
221+
196222
if o == nil {
197223
o = &CreateDatabaseOptions{}
198224
}
@@ -224,7 +250,8 @@ func (c *Client) CreateDatabase(
224250
return DatabaseResponse{}, err
225251
}
226252

227-
return newDatabaseResponse(azResponse)
253+
response, err := newDatabaseResponse(azResponse)
254+
return response, err
228255
}
229256

230257
// NewQueryDatabasesPager executes query for databases.
@@ -249,6 +276,14 @@ func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions)
249276
return page.ContinuationToken != nil
250277
},
251278
Fetcher: func(ctx context.Context, page *QueryDatabasesResponse) (QueryDatabasesResponse, error) {
279+
var err error
280+
// Move the span to the pager once https://github.com/Azure/azure-sdk-for-go/issues/23294 is fixed
281+
spanName, err := getSpanNameForClient(c.accountEndpointUrl(), operationTypeQuery, resourceTypeDatabase, c.accountEndpointUrl().Hostname())
282+
if err != nil {
283+
return QueryDatabasesResponse{}, err
284+
}
285+
ctx, endSpan := azruntime.StartSpan(ctx, spanName.name, c.internal.Tracer(), &spanName.options)
286+
defer func() { endSpan(err) }()
252287
if page != nil {
253288
if page.ContinuationToken != nil {
254289
// Use the previous page continuation if available
@@ -271,6 +306,7 @@ func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions)
271306

272307
return newDatabasesQueryResponse(azResponse)
273308
},
309+
Tracer: c.internal.Tracer(),
274310
})
275311
}
276312

@@ -473,7 +509,7 @@ func (c *Client) attachContent(content interface{}, req *policy.Request) error {
473509

474510
func (c *Client) executeAndEnsureSuccessResponse(request *policy.Request) (*http.Response, error) {
475511
log.Write(azlog.EventResponse, fmt.Sprintf("\n===== Client preferred regions:\n%v\n=====\n", c.gem.preferredLocations))
476-
response, err := c.pipeline.Do(request)
512+
response, err := c.internal.Pipeline().Do(request)
477513
if err != nil {
478514
return nil, err
479515
}
@@ -486,6 +522,10 @@ func (c *Client) executeAndEnsureSuccessResponse(request *policy.Request) (*http
486522
return nil, azruntime.NewResponseErrorWithErrorCode(response, response.Status)
487523
}
488524

525+
func (c *Client) accountEndpointUrl() *url.URL {
526+
return c.endpointUrl
527+
}
528+
489529
type pipelineRequestOptions struct {
490530
headerOptionsOverride *headerOptionsOverride
491531
resourceType resourceType

sdk/data/azcosmos/cosmos_client_retry_policy_test.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1516
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
1617
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
1718
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
@@ -43,7 +44,7 @@ func TestSessionNotAvailableSingleMaster(t *testing.T) {
4344
retryPolicy := &clientRetryPolicy{gem: gem}
4445
verifier := clientRetryPolicyVerifier{}
4546

46-
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
47+
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
4748

4849
// Setting up responses for consistent failures
4950
srv.AppendResponse(
@@ -53,7 +54,7 @@ func TestSessionNotAvailableSingleMaster(t *testing.T) {
5354
mock.WithHeader("x-ms-substatus", "1002"),
5455
mock.WithStatusCode(404))
5556

56-
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
57+
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
5758
db, _ := client.NewDatabase("database_id")
5859
container, _ := db.NewContainer("container_id")
5960
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
@@ -130,7 +131,7 @@ func TestSessionNotAvailableMultiMaster(t *testing.T) {
130131
retryPolicy := &clientRetryPolicy{gem: gem}
131132
verifier := clientRetryPolicyVerifier{}
132133

133-
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
134+
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
134135

135136
// Setting up responses for using all retries and failing
136137
srv.AppendResponse(
@@ -146,7 +147,7 @@ func TestSessionNotAvailableMultiMaster(t *testing.T) {
146147
mock.WithHeader("x-ms-substatus", "1002"),
147148
mock.WithStatusCode(404))
148149

149-
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
150+
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
150151
db, _ := client.NewDatabase("database_id")
151152
container, _ := db.NewContainer("container_id")
152153
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
@@ -238,7 +239,7 @@ func TestReadEndpointFailure(t *testing.T) {
238239
retryPolicy := &clientRetryPolicy{gem: gem}
239240
verifier := clientRetryPolicyVerifier{}
240241

241-
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
242+
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
242243

243244
// Setting up responses for retrying twice
244245
srv.AppendResponse(
@@ -250,7 +251,7 @@ func TestReadEndpointFailure(t *testing.T) {
250251
srv.AppendResponse(
251252
mock.WithStatusCode(200))
252253

253-
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
254+
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
254255
db, _ := client.NewDatabase("database_id")
255256
container, _ := db.NewContainer("container_id")
256257
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
@@ -291,9 +292,9 @@ func TestWriteEndpointFailure(t *testing.T) {
291292
retryPolicy := &clientRetryPolicy{gem: gem}
292293
verifier := clientRetryPolicyVerifier{}
293294

294-
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
295+
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
295296

296-
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
297+
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
297298
db, _ := client.NewDatabase("database_id")
298299
container, _ := db.NewContainer("container_id")
299300

@@ -354,9 +355,9 @@ func TestReadServiceUnavailable(t *testing.T) {
354355
retryPolicy := &clientRetryPolicy{gem: gem}
355356
verifier := clientRetryPolicyVerifier{}
356357

357-
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
358+
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
358359

359-
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
360+
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
360361
db, _ := client.NewDatabase("database_id")
361362
container, _ := db.NewContainer("container_id")
362363

@@ -427,9 +428,9 @@ func TestWriteServiceUnavailable(t *testing.T) {
427428
retryPolicy := &clientRetryPolicy{gem: gem}
428429
verifier := clientRetryPolicyVerifier{}
429430

430-
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
431+
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
431432

432-
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
433+
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
433434
db, _ := client.NewDatabase("database_id")
434435
container, _ := db.NewContainer("container_id")
435436

@@ -506,9 +507,9 @@ func TestDnsErrorRetry(t *testing.T) {
506507
retryPolicy := &clientRetryPolicy{gem: gem}
507508
verifier := clientRetryPolicyVerifier{}
508509

509-
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
510+
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
510511

511-
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
512+
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
512513
db, _ := client.NewDatabase("database_id")
513514
container, _ := db.NewContainer("container_id")
514515

0 commit comments

Comments
 (0)