3131CONTAINER_ID = config .settings ['container_id' ]
3232
3333
34- def create_items (container , size ):
35- print (' Creating Items' )
34+ def create_items (container , size , partition_key_value ):
35+ print (" Creating Items with partition key value: {}" . format ( partition_key_value ) )
3636
37- for i in range (1 , size ):
37+ for i in range (size ):
3838 c = str (uuid .uuid4 ())
3939 item_definition = {'id' : 'item' + c ,
4040 'address' : {'street' : '1 Microsoft Way' + c ,
4141 'city' : 'Redmond' + c ,
42- 'state' : 'WA' ,
42+ 'state' : partition_key_value ,
4343 'zip code' : 98052
4444 }
4545 }
4646
4747 created_item = container .create_item (body = item_definition )
4848
49+ def clean_up (container ):
50+ print ('\n Clean up the container\n ' )
51+
52+ for item in container .query_items (query = 'SELECT * FROM c' , enable_cross_partition_query = True ):
53+ # Deleting the current item
54+ container .delete_item (item , partition_key = item ['address' ]['state' ])
4955
5056def read_change_feed (container ):
5157 print ('\n Reading Change Feed from the beginning\n ' )
5258
5359 # For a particular Partition Key Range we can use partition_key_range_id]
5460 # 'is_start_from_beginning = True' will read from the beginning of the history of the container
5561 # If no is_start_from_beginning is specified, the read change feed loop will pickup the items that happen while the loop / process is active
56- response = container .query_items_change_feed (is_start_from_beginning = True )
57- for doc in response :
62+ create_items (container , 10 , 'WA' )
63+ response_iterator = container .query_items_change_feed (is_start_from_beginning = True )
64+ for doc in response_iterator :
5865 print (doc )
5966
60- print ('\n Finished reading all the change feed\n ' )
61-
67+ def read_change_feed_with_start_time (container ):
68+ print ('\n Reading Change Feed from the start time\n ' )
69+ # You can read change feed from a specific time.
70+ # You must pass in a datetime object for the start_time field.
6271
63- def read_change_feed_with_start_time (container , start_time ):
72+ # Create items
73+ create_items (container , 10 , 'WA' )
74+ start_time = datetime .now (timezone .utc )
6475 time = start_time .strftime ('%a, %d %b %Y %H:%M:%S GMT' )
6576 print ('\n Reading Change Feed from start time of {}\n ' .format (time ))
77+ create_items (container , 5 , 'CA' )
78+ create_items (container , 5 , 'OR' )
6679
67- # You can read change feed from a specific time.
68- # You must pass in a datetime object for the start_time field.
69- response = container .query_items_change_feed (start_time = start_time )
70- for doc in response :
80+ # Read change feed from the beginning
81+ response_iterator = container .query_items_change_feed (start_time = "Beginning" )
82+ for doc in response_iterator :
7183 print (doc )
7284
73- print ('\n Finished reading all the change feed from start time of {}\n ' .format (time ))
85+ # Read change feed from a start time
86+ response_iterator = container .query_items_change_feed (start_time = start_time )
87+ for doc in response_iterator :
88+ print (doc )
89+
90+ def read_change_feed_with_partition_key (container ):
91+ print ('\n Reading Change Feed from the beginning of the partition key\n ' )
92+ # Create items
93+ create_items (container , 10 , 'WA' )
94+ create_items (container , 5 , 'CA' )
95+ create_items (container , 5 , 'OR' )
96+
97+ # Read change feed with partition key with LatestVersion mode.
98+ # Should only return change feed for the created items with 'CA' partition key
99+ response_iterator = container .query_items_change_feed (start_time = "Beginning" , partition_key = "CA" )
100+ for doc in response_iterator :
101+ print (doc )
74102
103+ def read_change_feed_with_continuation (container ):
104+ print ('\n Reading Change Feed from the continuation\n ' )
105+ # Create items
106+ create_items (container , 10 , 'WA' )
107+ response_iterator = container .query_items_change_feed (start_time = "Beginning" )
108+ for doc in response_iterator :
109+ print (doc )
110+ continuation_token = container .client_connection .last_response_headers ['etag' ]
75111
76- def read_change_feed_with_continuation (container , continuation ):
77- print ('\n Reading change feed from continuation\n ' )
112+ # Create additional items
113+ create_items (container , 5 , 'CA' )
114+ create_items (container , 5 , 'OR' )
78115
79116 # You can read change feed from a specific continuation token.
80117 # You must pass in a valid continuation token.
81- response = container .query_items_change_feed (continuation = continuation )
82- for doc in response :
118+ # From our continuation token above, you will get all items created after the continuation
119+ response_iterator = container .query_items_change_feed (continuation = continuation_token )
120+ for doc in response_iterator :
83121 print (doc )
84122
85- print ('\n Finished reading all the change feed from continuation\n ' )
86-
87- def delete_all_items (container ):
88- print ('\n Deleting all item\n ' )
89-
90- for item in container .query_items (query = 'SELECT * FROM c' , enable_cross_partition_query = True ):
91- # Deleting the current item
92- container .delete_item (item , partition_key = item ['address' ]['state' ])
93-
94- print ('Deleted all items' )
95-
96123def read_change_feed_with_all_versions_and_delete_mode (container ):
97- change_feed_mode = "AllVersionsAndDeletes"
98- print ("\n Reading change feed with 'AllVersionsAndDeletes' mode.\n " )
124+ print ('\n Reading Change Feed with AllVersionsAndDeletes mode\n ' )
125+ # Read the initial change feed with 'AllVersionsAndDeletes' mode.
126+ # This initial call was made to store a point in time in a 'continuation' token
127+ response_iterator = container .query_items_change_feed (mode = "AllVersionsAndDeletes" )
128+ for doc in response_iterator :
129+ print (doc )
130+ continuation_token = container .client_connection .last_response_headers ['etag' ]
99131
100- # You can read change feed with a specific change feed mode.
101- # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"].
102- response = container .query_items_change_feed (mode = change_feed_mode )
103- for doc in response :
132+ # Read all change feed with 'AllVersionsAndDeletes' mode after create items from a continuation
133+ create_items (container , 10 , 'CA' )
134+ create_items (container , 10 , 'OR' )
135+ response_iterator = container .query_items_change_feed (mode = "AllVersionsAndDeletes" , continuation = continuation_token )
136+ for doc in response_iterator :
104137 print (doc )
105138
106- print ("\n Finished reading all the change feed with 'AllVersionsAndDeletes' mode.\n " )
139+ # Read all change feed with 'AllVersionsAndDeletes' mode after delete items from a continuation
140+ clean_up (container )
141+ response_iterator = container .query_items_change_feed (mode = "AllVersionsAndDeletes" , continuation = continuation_token )
142+ for doc in response_iterator :
143+ print (doc )
107144
108- def read_change_feed_with_all_versions_and_delete_mode_from_continuation (container , continuation ):
109- change_feed_mode = "AllVersionsAndDeletes"
110- print ("\n Reading change feed with 'AllVersionsAndDeletes' mode.\n " )
145+ def read_change_feed_with_all_versions_and_delete_mode_with_partition_key (container ):
146+ print ('\n Reading Change Feed with AllVersionsAndDeletes mode from the partition key\n ' )
111147
112- # You can read change feed with a specific change feed mode from a specific continuation token.
113- # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"].
114- # You must pass in a valid continuation token.
115- response = container .query_items_change_feed (mode = change_feed_mode , continuation = continuation )
116- for doc in response :
148+ # Read the initial change feed with 'AllVersionsAndDeletes' mode with partition key('CA').
149+ # This initial call was made to store a point in time and 'partition_key' in a 'continuation' token
150+ response_iterator = container .query_items_change_feed (mode = "AllVersionsAndDeletes" , partition_key = "CA" )
151+ for doc in response_iterator :
152+ print (doc )
153+ continuation_token = container .client_connection .last_response_headers ['etag' ]
154+
155+ create_items (container , 10 , 'CA' )
156+ create_items (container , 10 , 'OR' )
157+ # Read change feed 'AllVersionsAndDeletes' mode with 'CA' partition key value from the previous continuation.
158+ # Should only print the created items with 'CA' partition key value
159+ response_iterator = container .query_items_change_feed (mode = 'AllVersionsAndDeletes' , continuation = continuation_token )
160+ for doc in response_iterator :
117161 print (doc )
162+ continuation_token = container .client_connection .last_response_headers ['etag' ]
118163
119- print ("\n Finished reading all the change feed with 'AllVersionsAndDeletes' mode.\n " )
164+ clean_up (container )
165+ # Read change feed 'AllVersionsAndDeletes' mode with 'CA' partition key value from the previous continuation.
166+ # Should only print the deleted items with 'CA' partition key value
167+ response_iterator = container .query_items_change_feed (mode = 'AllVersionsAndDeletes' , continuation = continuation_token )
168+ for doc in response_iterator :
169+ print (doc )
120170
121171def run_sample ():
122172 client = cosmos_client .CosmosClient (HOST , {'masterKey' : MASTER_KEY })
173+ # Delete pre-existing database
174+ try :
175+ client .delete_database (DATABASE_ID )
176+ except exceptions .CosmosResourceNotFoundError :
177+ pass
178+
123179 try :
124180 # setup database for this sample
125181 try :
@@ -131,34 +187,37 @@ def run_sample():
131187 try :
132188 container = db .create_container (
133189 id = CONTAINER_ID ,
134- partition_key = partition_key .PartitionKey (path = '/address/state' , kind = documents .PartitionKind .Hash )
190+ partition_key = partition_key .PartitionKey (path = '/address/state' , kind = documents .PartitionKind .Hash ),
191+ offer_throughput = 11000
135192 )
136193 print ('Container with id \' {0}\' created' .format (CONTAINER_ID ))
137194
138195 except exceptions .CosmosResourceExistsError :
139196 raise RuntimeError ("Container with id '{}' already exists" .format (CONTAINER_ID ))
140197
141- # Create items
142- create_items (container , 100 )
143- # Timestamp post item creations
144- timestamp = datetime .now (timezone .utc )
145- # Create more items after time stamp
146- create_items (container , 50 )
147198 # Read change feed from beginning
148199 read_change_feed (container )
200+ clean_up (container )
201+
149202 # Read Change Feed from timestamp
150- read_change_feed_with_start_time (container , timestamp )
151- # Delete all items from container
152- delete_all_items (container )
153- # Read change feed with 'AllVersionsAndDeletes' mode
154- read_change_feed_with_all_versions_and_delete_mode (container )
155- continuation_token = container .client_connection .last_response_headers ['etag' ]
156- # Read change feed with 'AllVersionsAndDeletes' mode after create item
157- create_items (container , 10 )
158- read_change_feed_with_all_versions_and_delete_mode_from_continuation (container ,continuation_token )
203+ read_change_feed_with_start_time (container )
204+ clean_up (container )
205+
206+ # Read Change Feed from continuation
207+ read_change_feed_with_continuation (container )
208+ clean_up (container )
209+
210+ # Read Change Feed by partition_key
211+ read_change_feed_with_partition_key (container )
212+ clean_up (container )
213+
159214 # Read change feed with 'AllVersionsAndDeletes' mode after create/delete item
160- delete_all_items (container )
161- read_change_feed_with_all_versions_and_delete_mode_from_continuation (container ,continuation_token )
215+ read_change_feed_with_all_versions_and_delete_mode (container )
216+ clean_up (container )
217+
218+ # Read change feed with 'AllVersionsAndDeletes' mode with partition key for create/delete items.
219+ read_change_feed_with_all_versions_and_delete_mode_with_partition_key (container )
220+ clean_up (container )
162221
163222 # cleanup database after sample
164223 try :
@@ -174,4 +233,4 @@ def run_sample():
174233
175234
176235if __name__ == '__main__' :
177- run_sample ()
236+ run_sample ()
0 commit comments