4
4
SPDX-License-Identifier: Apache-2.0
5
5
'''
6
6
from __future__ import print_function
7
- import sys , random , time , argparse
8
- from boto import kinesis
9
7
10
- def get_stream_status (conn , stream_name ):
8
+ import argparse
9
+ import sys
10
+ import time
11
+
12
+ import boto3
13
+
14
+ def get_stream_status (kinesis , stream_name ):
11
15
'''
12
16
Query this provided connection object for the provided stream's status.
13
-
14
- :type conn: boto.kinesis.layer1.KinesisConnection
17
+ :type conn: Kinesis.Client
15
18
:param conn: A connection to Amazon Kinesis
16
-
17
19
:type stream_name: str
18
20
:param stream_name: The name of a stream.
19
-
20
21
:rtype: str
21
22
:return: The stream's status
22
23
'''
23
- r = conn .describe_stream (stream_name )
24
+ r = kinesis .describe_stream (StreamName = stream_name )
24
25
description = r .get ('StreamDescription' )
25
26
return description .get ('StreamStatus' )
26
27
27
- def wait_for_stream (conn , stream_name ):
28
+ def wait_for_stream (kinesis , stream_name ):
28
29
'''
29
30
Wait for the provided stream to become active.
30
-
31
- :type conn: boto.kinesis.layer1.KinesisConnection
32
- :param conn: A connection to Amazon Kinesis
33
-
31
+ :type kinesis: Kinesis.Client
32
+ :param kinesis: A low-level client representing Amazon Kinesis
34
33
:type stream_name: str
35
34
:param stream_name: The name of a stream.
36
35
'''
37
36
SLEEP_TIME_SECONDS = 3
38
- status = get_stream_status (conn , stream_name )
37
+ status = get_stream_status (kinesis , stream_name )
39
38
while status != 'ACTIVE' :
40
39
print ('{stream_name} has status: {status}, sleeping for {secs} seconds' .format (
41
40
stream_name = stream_name ,
42
41
status = status ,
43
42
secs = SLEEP_TIME_SECONDS ))
44
43
time .sleep (SLEEP_TIME_SECONDS ) # sleep for 3 seconds
45
- status = get_stream_status (conn , stream_name )
44
+ status = get_stream_status (kinesis , stream_name )
46
45
47
- def put_words_in_stream (conn , stream_name , words ):
46
+ def put_words_in_stream (kinesis , stream_name , words ):
48
47
'''
49
48
Put each word in the provided list of words into the stream.
50
-
51
- :type conn: boto.kinesis.layer1.KinesisConnection
52
- :param conn: A connection to Amazon Kinesis
53
-
49
+ :type kinesis: Kinesis.Client
50
+ :param kinesis: A connection to Amazon Kinesis
54
51
:type stream_name: str
55
52
:param stream_name: The name of a stream.
56
-
57
53
:type words: list
58
54
:param words: A list of strings to put into the stream.
59
55
'''
60
56
for w in words :
61
57
try :
62
- conn .put_record (stream_name , w , w )
58
+ kinesis .put_record (StreamName = stream_name , Data = w , PartitionKey = w )
63
59
print ("Put word: " + w + " into stream: " + stream_name )
64
60
except Exception as e :
65
61
sys .stderr .write ("Encountered an exception while trying to put a word: "
@@ -69,16 +65,12 @@ def put_words_in_stream_periodically(conn, stream_name, words, period_seconds):
69
65
'''
70
66
Puts words into a stream, then waits for the period to elapse then puts the words in again. There is no strict
71
67
guarantee about how frequently we put each word into the stream, just that we will wait between iterations.
72
-
73
68
:type conn: boto.kinesis.layer1.KinesisConnection
74
69
:param conn: A connection to Amazon Kinesis
75
-
76
70
:type stream_name: str
77
71
:param stream_name: The name of a stream.
78
-
79
72
:type words: list
80
73
:param words: A list of strings to put into the stream.
81
-
82
74
:type period_seconds: int
83
75
:param period_seconds: How long to wait, in seconds, between iterations over the list of words.
84
76
'''
@@ -90,10 +82,8 @@ def put_words_in_stream_periodically(conn, stream_name, words, period_seconds):
90
82
if __name__ == '__main__' :
91
83
parser = argparse .ArgumentParser ('''
92
84
Puts words into a stream.
93
-
94
85
# Using the -w option multiple times
95
86
sample_wordputter.py -s STREAM_NAME -w WORD1 -w WORD2 -w WORD3 -p 3
96
-
97
87
# Passing input from STDIN
98
88
echo "WORD1\\ nWORD2\\ nWORD3" | sample_wordputter.py -s STREAM_NAME -p 3
99
89
''' )
@@ -115,25 +105,26 @@ def put_words_in_stream_periodically(conn, stream_name, words, period_seconds):
115
105
one of the standard credentials providers.
116
106
'''
117
107
print ("Connecting to stream: {s} in {r}" .format (s = stream_name , r = args .region ))
118
- conn = kinesis .connect_to_region (region_name = args .region )
108
+ kinesis = boto3 .client ('kinesis' , region_name = args .region )
109
+
119
110
try :
120
- status = get_stream_status (conn , stream_name )
111
+ status = get_stream_status (kinesis , stream_name )
121
112
if 'DELETING' == status :
122
113
print ('The stream: {s} is being deleted, please rerun the script.' .format (s = stream_name ))
123
114
sys .exit (1 )
124
115
elif 'ACTIVE' != status :
125
- wait_for_stream (conn , stream_name )
116
+ wait_for_stream (kinesis , stream_name )
126
117
except :
127
118
# We'll assume the stream didn't exist so we will try to create it with just one shard
128
- conn .create_stream (stream_name , 1 )
129
- wait_for_stream (conn , stream_name )
119
+ kinesis .create_stream (StreamName = stream_name , ShardCount = 1 )
120
+ wait_for_stream (kinesis , stream_name )
130
121
# Now the stream should exist
131
122
if len (args .words ) == 0 :
132
123
print ('No -w options provided. Waiting on input from STDIN' )
133
124
words = [l .strip () for l in sys .stdin .readlines () if l .strip () != '' ]
134
125
else :
135
126
words = args .words
136
127
if args .period != None :
137
- put_words_in_stream_periodically (conn , stream_name , words , args .period )
128
+ put_words_in_stream_periodically (kinesis , stream_name , words , args .period )
138
129
else :
139
- put_words_in_stream (conn , stream_name , words )
130
+ put_words_in_stream (kinesis , stream_name , words )
0 commit comments