@@ -68,12 +68,6 @@ class PartitionSession:
6868 reader_reconnector_id : int
6969 reader_stream_id : int
7070 _next_message_start_commit_offset : int = field (init = False )
71- _send_commit_window_start : int = field (init = False )
72-
73- # todo: check if deque is optimal
74- _pending_commits : Deque [OffsetsRange ] = field (
75- init = False , default_factory = lambda : deque ()
76- )
7771
7872 # todo: check if deque is optimal
7973 _ack_waiters : Deque ["PartitionSession.CommitAckWaiter" ] = field (
@@ -89,45 +83,17 @@ class PartitionSession:
8983
9084 def __post_init__ (self ):
9185 self ._next_message_start_commit_offset = self .committed_offset
92- self ._send_commit_window_start = self .committed_offset
9386
9487 try :
9588 self ._loop = asyncio .get_running_loop ()
9689 except RuntimeError :
9790 self ._loop = None
9891
99- def add_commit (
100- self , new_commit : OffsetsRange
101- ) -> "PartitionSession.CommitAckWaiter" :
102- self ._ensure_not_closed ()
103-
104- self ._add_to_commits (new_commit )
105- return self ._add_waiter (new_commit .end )
106-
107- def _add_to_commits (self , new_commit : OffsetsRange ):
108- index = bisect .bisect_left (self ._pending_commits , new_commit )
109-
110- prev_commit = self ._pending_commits [index - 1 ] if index > 0 else None
111- commit = (
112- self ._pending_commits [index ] if index < len (self ._pending_commits ) else None
113- )
114-
115- for c in (prev_commit , commit ):
116- if c is not None and new_commit .is_intersected_with (c ):
117- raise ValueError (
118- "new commit intersected with existed. New range: %s, existed: %s"
119- % (new_commit , c )
120- )
121-
122- if commit is not None and commit .start == new_commit .end :
123- commit .start = new_commit .start
124- elif prev_commit is not None and prev_commit .end == new_commit .start :
125- prev_commit .end = new_commit .end
126- else :
127- self ._pending_commits .insert (index , new_commit )
128-
129- def _add_waiter (self , end_offset : int ) -> "PartitionSession.CommitAckWaiter" :
92+ def add_waiter (self , end_offset : int ) -> "PartitionSession.CommitAckWaiter" :
13093 waiter = PartitionSession .CommitAckWaiter (end_offset , self ._create_future ())
94+ if end_offset <= self .committed_offset :
95+ waiter ._finish_ok ()
96+ return waiter
13197
13298 # fast way
13399 if len (self ._ack_waiters ) > 0 and self ._ack_waiters [- 1 ].end_offset < end_offset :
@@ -143,26 +109,6 @@ def _create_future(self) -> asyncio.Future:
143109 else :
144110 return asyncio .Future ()
145111
146- def pop_commit_range (self ) -> Optional [OffsetsRange ]:
147- self ._ensure_not_closed ()
148-
149- if len (self ._pending_commits ) == 0 :
150- return None
151-
152- if self ._pending_commits [0 ].start != self ._send_commit_window_start :
153- return None
154-
155- res = self ._pending_commits .popleft ()
156- while (
157- len (self ._pending_commits ) > 0 and self ._pending_commits [0 ].start == res .end
158- ):
159- commit = self ._pending_commits .popleft ()
160- res .end = commit .end
161-
162- self ._send_commit_window_start = res .end
163-
164- return res
165-
166112 def ack_notify (self , offset : int ):
167113 self ._ensure_not_closed ()
168114
@@ -176,7 +122,7 @@ def ack_notify(self, offset: int):
176122 while len (self ._ack_waiters ) > 0 :
177123 if self ._ack_waiters [0 ].end_offset <= offset :
178124 waiter = self ._ack_waiters .popleft ()
179- waiter .future . set_result ( None )
125+ waiter ._finish_ok ( )
180126 else :
181127 break
182128
@@ -189,7 +135,7 @@ def close(self):
189135 self .state = PartitionSession .State .Stopped
190136 exception = topic_reader_asyncio .TopicReaderCommitToExpiredPartition ()
191137 for waiter in self ._ack_waiters :
192- waiter .future . set_exception (exception )
138+ waiter ._finish_error (exception )
193139
194140 def _ensure_not_closed (self ):
195141 if self .state == PartitionSession .State .Stopped :
@@ -204,6 +150,16 @@ class State(enum.Enum):
204150 class CommitAckWaiter :
205151 end_offset : int
206152 future : asyncio .Future = field (compare = False )
153+ _done : bool = field (default = False , init = False )
154+ _exception : Optional [Exception ] = field (default = None , init = False )
155+
156+ def _finish_ok (self ):
157+ self ._done = True
158+ self .future .set_result (None )
159+
160+ def _finish_error (self , error : Exception ):
161+ self ._exception = error
162+ self .future .set_exception (error )
207163
208164
209165@dataclass
0 commit comments