Skip to content

Commit 408a06a

Browse files
chetanmehtysonnorris
authored andcommitted
Enable configuring CosmosDB client per collection (#4198)
* Enable entity specific config
1 parent e51b748 commit 408a06a

File tree

7 files changed

+277
-29
lines changed

7 files changed

+277
-29
lines changed

common/scala/src/main/resources/application.conf

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,49 @@ whisk {
154154

155155
# CosmosDB related configuration
156156
# For example:
157-
# cosmosdb {
158-
# endpoint = # Endpoint URL like https://<account>.documents.azure.com:443/
159-
# key = # Access key
160-
# db = # Database name
161-
# throughput = 1000 # Throughput configure for each collection within this db
162-
#}
157+
cosmosdb {
158+
# endpoint = # Endpoint URL like https://<account>.documents.azure.com:443/
159+
# key = # Access key
160+
# db = # Database name
161+
# Throughput configured for each collection within this db
162+
# This is configured only if collection is created fresh. If collection
163+
# already exists then existing throughput would be used
164+
throughput = 1000
165+
# Select from one of the supported
166+
# https://azure.github.io/azure-cosmosdb-java/1.0.0/com/microsoft/azure/cosmosdb/ConsistencyLevel.html
167+
consistency-level = "Session"
168+
connection-policy {
169+
max-pool-size = 1000
170+
# When the value of this property is true, the SDK will direct write operations to
171+
# available writable locations of geo-replicated database account
172+
using-multiple-write-locations = false
173+
174+
# Sets the preferred locations for geo-replicated database accounts e.g. "East US"
175+
# See names at https://azure.microsoft.com/en-in/global-infrastructure/locations/
176+
preferred-locations = []
177+
retry-options {
178+
# Sets the maximum number of retries in the case where the request fails
179+
# because the service has applied rate limiting on the client.
180+
max-retry-attempts-on-throttled-requests = 9
181+
182+
# Sets the maximum retry time
183+
# If the cumulative wait time exceeds this SDK will stop retrying and return the
184+
# error to the application.
185+
max-retry-wait-time = 30 s
186+
}
187+
}
188+
189+
# Specify entity specific overrides below. By default all config values would be picked from top level. To override
190+
# any config option for specific entity specify them below. For example if multiple writes need to be enabled
191+
# for activations then
192+
# collections {
193+
# WhiskActivation { # Add entity specific overrides here
194+
# connection-policy {
195+
# using-multiple-write-locations = true
196+
# }
197+
# }
198+
# }
199+
}
163200

164201
# transaction ID related configuration
165202
transactions {

common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
7474
private val countToken = createToken("count")
7575
private val putAttachmentToken = createToken("putAttachment", read = false)
7676

77+
logging.info(
78+
this,
79+
s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " +
80+
s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}]")
81+
7782
//Clone the returned instance as these are mutable
7883
def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson)
7984

common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,33 @@ import java.io.Closeable
2222
import akka.actor.ActorSystem
2323
import akka.stream.ActorMaterializer
2424
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
25-
import spray.json.RootJsonFormat
25+
import com.typesafe.config.ConfigFactory
2626
import org.apache.openwhisk.common.Logging
27+
import org.apache.openwhisk.core.ConfigKeys
2728
import org.apache.openwhisk.core.database._
28-
import pureconfig._
2929
import org.apache.openwhisk.core.entity.size._
30-
import org.apache.openwhisk.core.ConfigKeys
31-
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.createClient
3230
import org.apache.openwhisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity}
31+
import pureconfig._
32+
import spray.json.RootJsonFormat
3333

3434
import scala.reflect.ClassTag
3535

36-
case class CosmosDBConfig(endpoint: String, key: String, db: String, throughput: Int = 1000)
37-
3836
case class ClientHolder(client: AsyncDocumentClient) extends Closeable {
3937
override def close(): Unit = client.close()
4038
}
4139

4240
object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
4341
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
44-
private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
45-
private var clientRef: ReferenceCounted[ClientHolder] = _
42+
private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]()
4643

4744
override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
4845
implicit jsonFormat: RootJsonFormat[D],
4946
docReader: DocumentReader,
5047
actorSystem: ActorSystem,
5148
logging: Logging,
5249
materializer: ActorMaterializer): ArtifactStore[D] = {
50+
val tag = implicitly[ClassTag[D]]
51+
val config = CosmosDBConfig(ConfigFactory.load(), tag.runtimeClass.getSimpleName)
5352
makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore())
5453
}
5554

@@ -102,13 +101,17 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
102101
* Synchronization is required to ensure concurrent init of various store instances share same ref instance
103102
*/
104103
private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
105-
if (clientRef == null || clientRef.isClosed) {
106-
clientRef = createReference(config)
104+
val clientRef = clients.getOrElseUpdate(config, createReference(config))
105+
if (clientRef.isClosed) {
106+
val newRef = createReference(config)
107+
clients.put(config, newRef)
108+
newRef.reference()
109+
} else {
110+
clientRef.reference()
107111
}
108-
clientRef.reference()
109112
}
110113

111114
private def createReference(config: CosmosDBConfig) =
112-
new ReferenceCounted[ClientHolder](ClientHolder(createClient(config)))
115+
new ReferenceCounted[ClientHolder](ClientHolder(config.createClient()))
113116

114117
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.database.cosmosdb
19+
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
20+
import com.microsoft.azure.cosmosdb.{
21+
ConsistencyLevel,
22+
ConnectionPolicy => JConnectionPolicy,
23+
RetryOptions => JRetryOptions
24+
}
25+
import com.typesafe.config.Config
26+
import com.typesafe.config.ConfigUtil.joinPath
27+
import org.apache.openwhisk.core.ConfigKeys
28+
import pureconfig._
29+
30+
import scala.collection.JavaConverters._
31+
import scala.concurrent.duration._
32+
33+
case class CosmosDBConfig(endpoint: String,
34+
key: String,
35+
db: String,
36+
throughput: Int,
37+
consistencyLevel: ConsistencyLevel,
38+
connectionPolicy: ConnectionPolicy) {
39+
40+
def createClient(): AsyncDocumentClient = {
41+
new AsyncDocumentClient.Builder()
42+
.withServiceEndpoint(endpoint)
43+
.withMasterKeyOrResourceToken(key)
44+
.withConsistencyLevel(consistencyLevel)
45+
.withConnectionPolicy(connectionPolicy.asJava)
46+
.build()
47+
}
48+
}
49+
50+
case class ConnectionPolicy(maxPoolSize: Int,
51+
preferredLocations: Seq[String],
52+
usingMultipleWriteLocations: Boolean,
53+
retryOptions: RetryOptions) {
54+
def asJava: JConnectionPolicy = {
55+
val p = new JConnectionPolicy
56+
p.setMaxPoolSize(maxPoolSize)
57+
p.setUsingMultipleWriteLocations(usingMultipleWriteLocations)
58+
p.setPreferredLocations(preferredLocations.asJava)
59+
p.setRetryOptions(retryOptions.asJava)
60+
p
61+
}
62+
}
63+
64+
case class RetryOptions(maxRetryAttemptsOnThrottledRequests: Int, maxRetryWaitTime: Duration) {
65+
def asJava: JRetryOptions = {
66+
val o = new JRetryOptions
67+
o.setMaxRetryAttemptsOnThrottledRequests(maxRetryAttemptsOnThrottledRequests)
68+
o.setMaxRetryWaitTimeInSeconds(maxRetryWaitTime.toSeconds.toInt)
69+
o
70+
}
71+
}
72+
73+
object CosmosDBConfig {
74+
val collections = "collections"
75+
76+
def apply(globalConfig: Config, entityTypeName: String): CosmosDBConfig = {
77+
val config = globalConfig.getConfig(ConfigKeys.cosmosdb)
78+
val specificConfigPath = joinPath(collections, entityTypeName)
79+
80+
//Merge config specific to entity with common config
81+
val entityConfig = if (config.hasPath(specificConfigPath)) {
82+
config.getConfig(specificConfigPath).withFallback(config)
83+
} else {
84+
config
85+
}
86+
loadConfigOrThrow[CosmosDBConfig](entityConfig)
87+
}
88+
}

common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.openwhisk.core.database.cosmosdb
1919

20-
import com.microsoft.azure.cosmosdb._
2120
import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, E_TAG, ID, SELF_LINK}
22-
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
2321
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
2422

2523
import scala.collection.immutable.Iterable
@@ -81,14 +79,6 @@ private[cosmosdb] trait CosmosDBUtil {
8179
case x => x.toString
8280
}
8381

84-
def createClient(config: CosmosDBConfig): AsyncDocumentClient =
85-
new AsyncDocumentClient.Builder()
86-
.withServiceEndpoint(config.endpoint)
87-
.withMasterKeyOrResourceToken(config.key)
88-
.withConnectionPolicy(ConnectionPolicy.GetDefault)
89-
.withConsistencyLevel(ConsistencyLevel.Session)
90-
.build
91-
9282
/**
9383
* CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so
9484
* that need to be escaped. For that we use '|' as the replacement char
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.database.cosmosdb
19+
import com.typesafe.config.ConfigFactory
20+
import org.junit.runner.RunWith
21+
import org.scalatest.junit.JUnitRunner
22+
import org.scalatest.{FlatSpec, Matchers}
23+
import com.microsoft.azure.cosmosdb.{ConnectionPolicy => JConnectionPolicy}
24+
25+
import scala.collection.JavaConverters._
26+
27+
@RunWith(classOf[JUnitRunner])
28+
class CosmosDBConfigTests extends FlatSpec with Matchers {
29+
val globalConfig = ConfigFactory.defaultApplication()
30+
behavior of "CosmosDB Config"
31+
32+
it should "match SDK defaults" in {
33+
val config = ConfigFactory.parseString(s"""
34+
| whisk.cosmosdb {
35+
| endpoint = "http://localhost"
36+
| key = foo
37+
| db = openwhisk
38+
| }
39+
""".stripMargin).withFallback(globalConfig)
40+
val cosmos = CosmosDBConfig(config, "WhiskAuth")
41+
42+
//Cosmos SDK does not have equals defined so match them explicitly
43+
val policy = cosmos.connectionPolicy.asJava
44+
val defaultPolicy = JConnectionPolicy.GetDefault()
45+
policy.getConnectionMode shouldBe defaultPolicy.getConnectionMode
46+
policy.getEnableEndpointDiscovery shouldBe defaultPolicy.getEnableEndpointDiscovery
47+
policy.getIdleConnectionTimeoutInMillis shouldBe defaultPolicy.getIdleConnectionTimeoutInMillis
48+
policy.getMaxPoolSize shouldBe defaultPolicy.getMaxPoolSize
49+
policy.getPreferredLocations shouldBe defaultPolicy.getPreferredLocations
50+
policy.getRequestTimeoutInMillis shouldBe defaultPolicy.getRequestTimeoutInMillis
51+
policy.isUsingMultipleWriteLocations shouldBe defaultPolicy.isUsingMultipleWriteLocations
52+
53+
val retryOpts = policy.getRetryOptions
54+
val defaultOpts = defaultPolicy.getRetryOptions
55+
56+
retryOpts.getMaxRetryAttemptsOnThrottledRequests shouldBe defaultOpts.getMaxRetryAttemptsOnThrottledRequests
57+
retryOpts.getMaxRetryWaitTimeInSeconds shouldBe defaultOpts.getMaxRetryWaitTimeInSeconds
58+
}
59+
60+
it should "work with generic config" in {
61+
val config = ConfigFactory.parseString(s"""
62+
| whisk.cosmosdb {
63+
| endpoint = "http://localhost"
64+
| key = foo
65+
| db = openwhisk
66+
| }
67+
""".stripMargin).withFallback(globalConfig)
68+
val cosmos = CosmosDBConfig(config, "WhiskAuth")
69+
cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
70+
}
71+
72+
it should "work with extended config" in {
73+
val config = ConfigFactory.parseString(s"""
74+
| whisk.cosmosdb {
75+
| endpoint = "http://localhost"
76+
| key = foo
77+
| db = openwhisk
78+
| connection-policy {
79+
| max-pool-size = 42
80+
| }
81+
| }
82+
""".stripMargin).withFallback(globalConfig)
83+
val cosmos = CosmosDBConfig(config, "WhiskAuth")
84+
cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
85+
86+
cosmos.connectionPolicy.maxPoolSize shouldBe 42
87+
val policy = cosmos.connectionPolicy.asJava
88+
val defaultPolicy = JConnectionPolicy.GetDefault()
89+
policy.getConnectionMode shouldBe defaultPolicy.getConnectionMode
90+
policy.getRetryOptions.getMaxRetryAttemptsOnThrottledRequests shouldBe defaultPolicy.getRetryOptions.getMaxRetryAttemptsOnThrottledRequests
91+
policy.getRetryOptions.getMaxRetryWaitTimeInSeconds shouldBe defaultPolicy.getRetryOptions.getMaxRetryWaitTimeInSeconds
92+
}
93+
94+
it should "work with specific extended config" in {
95+
val config = ConfigFactory.parseString(s"""
96+
| whisk.cosmosdb {
97+
| endpoint = "http://localhost"
98+
| key = foo
99+
| db = openwhisk
100+
| connection-policy {
101+
| max-pool-size = 42
102+
| retry-options {
103+
| max-retry-wait-time = 2 m
104+
| }
105+
| }
106+
| collections {
107+
| WhiskAuth = {
108+
| connection-policy {
109+
| using-multiple-write-locations = true
110+
| preferred-locations = [a, b]
111+
| }
112+
| }
113+
| }
114+
| }
115+
""".stripMargin).withFallback(globalConfig)
116+
val cosmos = CosmosDBConfig(config, "WhiskAuth")
117+
cosmos should matchPattern { case CosmosDBConfig("http://localhost", "foo", "openwhisk", _, _, _) => }
118+
119+
val policy = cosmos.connectionPolicy.asJava
120+
policy.isUsingMultipleWriteLocations shouldBe true
121+
policy.getMaxPoolSize shouldBe 42
122+
policy.getPreferredLocations.asScala.toSeq should contain only ("a", "b")
123+
policy.getRetryOptions.getMaxRetryWaitTimeInSeconds shouldBe 120
124+
}
125+
}

tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ trait CosmosDBTestSupport extends FlatSpec with BeforeAndAfterAll with RxObserva
3030
private val dbsToDelete = ListBuffer[Database]()
3131

3232
lazy val storeConfigTry = Try { loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) }
33-
lazy val client = CosmosDBUtil.createClient(storeConfig)
33+
lazy val client = storeConfig.createClient()
3434
val useExistingDB = java.lang.Boolean.getBoolean("whisk.cosmosdb.useExistingDB")
3535

3636
def storeConfig = storeConfigTry.get

0 commit comments

Comments
 (0)