Skip to content

Commit ca62276

Browse files
authored
try again (#59)
1 parent 7729fb9 commit ca62276

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

kinesis/base.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,11 @@ async def start(self):
158158

159159
await self.get_client()
160160

161+
just_created = False
161162
if self.create_stream:
162163
await self._create_stream()
163164
self.create_stream = False
165+
just_created = True
164166

165167
if self.skip_describe_stream:
166168
log.debug("Skipping Describe stream '{}'. Assuming it exists and is active.".format(self.stream_name))
@@ -186,7 +188,17 @@ async def start(self):
186188
async with timeout(60) as cm:
187189
try:
188190
while True:
189-
stream_info = await self.get_stream_description()
191+
try:
192+
stream_info = await self.get_stream_description()
193+
except exceptions.StreamDoesNotExist:
194+
if just_created:
195+
# Stream was just created but may not be visible yet (eventual consistency)
196+
log.debug(
197+
"Stream '{}' not yet visible after creation, retrying...".format(self.stream_name)
198+
)
199+
await asyncio.sleep(0.25)
200+
continue
201+
raise
190202
stream_status = stream_info["StreamStatus"]
191203

192204
if stream_status == self.ACTIVE:

0 commit comments

Comments
 (0)