Skip to content

Commit 5221018

Browse files
authored
take3. fix for slow LocalStack/py 3.14 (#60)
1 parent ca62276 commit 5221018

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

kinesis/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def __init__(
6868
self._reconnect_timeout = time.monotonic()
6969
self.create_stream = create_stream
7070
self.create_stream_shards = create_stream_shards
71+
self._just_created = False
7172

7273
async def __aenter__(self) -> "Base":
7374

@@ -158,16 +159,16 @@ async def start(self):
158159

159160
await self.get_client()
160161

161-
just_created = False
162162
if self.create_stream:
163163
await self._create_stream()
164164
self.create_stream = False
165-
just_created = True
165+
self._just_created = True
166166

167167
if self.skip_describe_stream:
168168
log.debug("Skipping Describe stream '{}'. Assuming it exists and is active.".format(self.stream_name))
169169
self.shards = []
170170
self.stream_status = self.ACTIVE
171+
self._just_created = False
171172
return
172173

173174
log.debug("Checking stream '{}' is active".format(self.stream_name))
@@ -178,6 +179,7 @@ async def start(self):
178179
try:
179180
self.shards = await self.list_shards()
180181
self.stream_status = self.ACTIVE
182+
self._just_created = False
181183
except Exception as e:
182184
# Fall back to DescribeStream if ListShards fails
183185
log.warning(f"ListShards failed ({e}), falling back to DescribeStream")
@@ -191,7 +193,7 @@ async def start(self):
191193
try:
192194
stream_info = await self.get_stream_description()
193195
except exceptions.StreamDoesNotExist:
194-
if just_created:
196+
if self._just_created:
195197
# Stream was just created but may not be visible yet (eventual consistency)
196198
log.debug(
197199
"Stream '{}' not yet visible after creation, retrying...".format(self.stream_name)
@@ -203,6 +205,7 @@ async def start(self):
203205

204206
if stream_status == self.ACTIVE:
205207
self.stream_status = stream_status
208+
self._just_created = False
206209
break
207210

208211
if stream_status in ["CREATING", "UPDATING"]:

tests/test_producer.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,3 +594,44 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
594594
# The key test is that the code runs without errors and includes partition key in calculation
595595
assert throttle_sizes[0] >= 1, "Should include partition key in size calculation"
596596
assert throttle_sizes[1] >= 1, "Should calculate size for item without partition key"
597+
598+
@pytest.mark.asyncio
599+
async def test_just_created_survives_reconnection(self, random_stream_name, endpoint_url):
600+
"""Test that _just_created state persists across reconnection attempts (Issue #56).
601+
602+
When create_stream=True and the backend is slow to make the stream visible,
603+
start() sets self.create_stream = False after _create_stream(). If the 60s
604+
describe timeout expires and get_conn() triggers a reconnection, the second
605+
start() call must still know the stream was just created so it retries
606+
StreamDoesNotExist instead of propagating immediately.
607+
"""
608+
producer = Producer(
609+
stream_name=random_stream_name,
610+
endpoint_url=endpoint_url,
611+
region_name="ap-southeast-2",
612+
create_stream=True,
613+
create_stream_shards=1,
614+
)
615+
616+
assert producer._just_created is False
617+
assert producer.create_stream is True
618+
619+
# Simulate what start() does: create stream, flip flags
620+
await producer.get_client()
621+
await producer._create_stream()
622+
producer.create_stream = False
623+
producer._just_created = True
624+
625+
# After first start() sets create_stream=False, a reconnection calls start() again.
626+
# _just_created must still be True so the retry logic handles StreamDoesNotExist.
627+
assert producer.create_stream is False
628+
assert producer._just_created is True
629+
630+
# Now simulate a successful second start() — stream becomes ACTIVE
631+
await producer.start()
632+
633+
# After stream confirmed ACTIVE, _just_created should be reset
634+
assert producer._just_created is False
635+
assert producer.stream_status == producer.ACTIVE
636+
637+
await producer.close()

0 commit comments

Comments
 (0)