36
36
37
37
38
38
class _LockedObject (object ):
39
+ """
40
+ A wrapper class that hides a specific object behind a lock.
39
41
42
+ The goal here is to provide a simple way to protect access to an object
43
+ that cannot safely be simultaneously accessed from multiple threads. The
44
+ intended use of this class is simple: take hold of it with a context
45
+ manager, which returns the protected object.
46
+ """
40
47
def __init__ (self , obj ):
41
48
self .lock = threading .RLock ()
42
49
self ._obj = obj
@@ -124,36 +131,35 @@ def __init__(self, host, port=None, secure=None, window_manager=None,
124
131
self .network_buffer_size = 65536
125
132
126
133
self .force_proto = force_proto
134
+
127
135
# Concurrency
128
136
#
129
- # Use one lock (_lock) to synchronize any interaction with the
130
- # connection. This is because the h2.connection.H2Connection is
131
- # completely unthreadsafe, for thread safety, all access to it must be
132
- # synchronized.
137
+ # Use one lock (_lock) to synchronize any interaction with global
138
+ # connection state, e.g. stream creation/deletion.
133
139
#
134
140
# It's ok to use the same in lock all these cases as they occur at
135
141
# different/linked points in the connection's lifecycle.
136
142
#
137
143
# Use another 2 locks (_write_lock, _read_lock) to synchronize
138
144
# - _send_cb
139
145
# - _recv_cb
140
- # respectively
146
+ # respectively.
141
147
#
142
- # I.e, sends/receve on the connection and its streams are serialized
143
- # and separately across the threads accessing the connection. This is
144
- # a simple way of providing thread-safety.
148
+ # I.e, send/recieve on the connection and its streams are serialized
149
+ # separately across the threads accessing the connection. This is a
150
+ # simple way of providing thread-safety.
145
151
#
146
152
# _write_lock and _read_lock synchronize all interactions between
147
- # streams and the connnection. There is a third i/o callback,
153
+ # streams and the connnection. There is a third I/O callback,
148
154
# _close_stream, passed to a stream's constructor. It does not need to
149
155
# be synchronized, it uses _send_cb internally (which is serialized);
150
- # it's other activity (safe deletion of the stream from self.streams)
156
+ # its other activity (safe deletion of the stream from self.streams)
151
157
# does not require synchronization.
152
158
#
153
159
# _read_lock may be acquired when already holding the _write_lock,
154
- # when they both held it is always by acquiring _write_lock first
160
+ # when they both held it is always by acquiring _write_lock first.
155
161
#
156
- # either _read_lock or _write_lock may be acquired whilst holding _lock
162
+ # Either _read_lock or _write_lock may be acquired whilst holding _lock
157
163
# which should always be acquired before either of the other two.
158
164
self ._lock = threading .RLock ()
159
165
self ._write_lock = threading .RLock ()
@@ -191,7 +197,7 @@ def __init_state(self):
191
197
#
192
198
# Finally, we add a set of streams that recently received data. When
193
199
# using multiple threads, this avoids reading on threads that have just
194
- # acquired the io lock whose streams have already had their data read
200
+ # acquired the I/O lock whose streams have already had their data read
195
201
# for them by prior threads.
196
202
self .streams = {}
197
203
self .recent_stream = None
@@ -232,15 +238,15 @@ def request(self, method, url, body=None, headers=None):
232
238
233
239
# Concurrency
234
240
#
235
- # It's necessary to hold a lock while this method runs to satisfy HTTP2
241
+ # It's necessary to hold a lock while this method runs to satisfy H2
236
242
# protocol requirements.
237
243
#
238
244
# - putrequest obtains the next valid new stream_id
239
245
# - endheaders sends a http2 message using the new stream_id
240
246
#
241
247
# If threads interleave these operations, it could result in messages
242
248
# being sent in the wrong order, which can lead to the out-of-order
243
- # messages with lower stream_ids being closed prematurely.
249
+ # messages with lower stream IDs being closed prematurely.
244
250
with self ._write_lock :
245
251
stream_id = self .putrequest (method , url )
246
252
@@ -381,7 +387,7 @@ def close(self, error_code=None):
381
387
# the connection see consistent state, and to prevent creation of
382
388
# of new streams while the connection is being closed.
383
389
#
384
- # i/o occurs while the lock is held; waiting threads will see a delay.
390
+ # I/O occurs while the lock is held; waiting threads will see a delay.
385
391
with self ._lock :
386
392
# Close all streams
387
393
for stream in list (self .streams .values ()):
@@ -404,10 +410,9 @@ def _send_outstanding_data(self, tolerate_peer_gone=False,
404
410
send_empty = True ):
405
411
# Concurrency
406
412
#
407
- # Hold _write_lock; getting and writing data from _conn are
408
- # synchronized
413
+ # Hold _write_lock; getting and writing data from _conn is synchronized
409
414
#
410
- # i/o occurs while the lock is held; waiting threads will see a delay.
415
+ # I/O occurs while the lock is held; waiting threads will see a delay.
411
416
with self ._write_lock :
412
417
with self ._conn as conn :
413
418
data = conn .data_to_send ()
@@ -424,7 +429,7 @@ def putrequest(self, method, selector, **kwargs):
424
429
-----------
425
430
426
431
This method is thread-safe. It can be called from multiple threads,
427
- with each thread should receive a unique stream_id .
432
+ and each thread should receive a unique stream ID .
428
433
429
434
:param method: The request method, e.g. ``'GET'``.
430
435
:param selector: The path selector.
@@ -498,7 +503,7 @@ def endheaders(self, message_body=None, final=False, stream_id=None):
498
503
499
504
# Concurrency:
500
505
#
501
- # Hold _write_lock: synchronize access to the connection's hpack
506
+ # Hold _write_lock: synchronize access to the connection's HPACK
502
507
# encoder and decoder and the subsquent write to the connection
503
508
with self ._write_lock :
504
509
stream .send_headers (headers_only )
@@ -539,7 +544,7 @@ def _new_stream(self, stream_id=None, local_closed=False):
539
544
# Hold _lock: ensure that threads accessing the connection see
540
545
# self.next_stream_id in a consistent state
541
546
#
542
- # No i/o occurs, the delay in waiting threads depends on their number.
547
+ # No I/O occurs, the delay in waiting threads depends on their number.
543
548
with self ._lock , self ._conn as conn :
544
549
s = Stream (
545
550
stream_id or self .next_stream_id ,
@@ -565,7 +570,7 @@ def _send_cb(self, data, tolerate_peer_gone=False):
565
570
#
566
571
# Hold _write_lock: ensures only writer at a time
567
572
#
568
- # i/o occurs while the lock is held; waiting threads will see a delay.
573
+ # I/O occurs while the lock is held; waiting threads will see a delay.
569
574
with self ._write_lock :
570
575
try :
571
576
self ._sock .sendall (data )
@@ -581,10 +586,10 @@ def _adjust_receive_window(self, frame_len):
581
586
"""
582
587
# Concurrency
583
588
#
584
- # Hold _write_lock; synchronize the window manager update, the
589
+ # Hold _write_lock; synchronize the window manager update and the
585
590
# subsequent potential write to the connection
586
591
#
587
- # i/o may occurs while the lock is held; waiting threads may see a
592
+ # I/O may occur while the lock is held; waiting threads may see a
588
593
# delay.
589
594
with self ._write_lock :
590
595
increment = self .window_manager ._handle_frame (frame_len )
@@ -607,7 +612,7 @@ def _single_read(self):
607
612
#
608
613
# Synchronizes reading the data
609
614
#
610
- # i/o occurs while the lock is held; waiting threads will see a delay.
615
+ # I/O occurs while the lock is held; waiting threads will see a delay.
611
616
with self ._read_lock :
612
617
self ._sock .fill ()
613
618
data = self ._sock .buffer .tobytes ()
@@ -696,10 +701,10 @@ def _recv_cb(self, stream_id=0):
696
701
# from the requested stream.
697
702
#
698
703
# The lock here looks broad, but is need to ensure correct behavior
699
- # when there are multiple readers of the same stream. It re-acquired
700
- # in the calls to self._single_read.
704
+ # when there are multiple readers of the same stream. It is
705
+ # re-acquired in the calls to self._single_read.
701
706
#
702
- # i/o occurs while the lock is held; waiting threads will see a delay.
707
+ # I/O occurs while the lock is held; waiting threads will see a delay.
703
708
with self ._read_lock :
704
709
log .debug ('recv for stream %d with %s already present' ,
705
710
stream_id ,
@@ -748,15 +753,15 @@ def _send_rst_frame(self, stream_id, error_code):
748
753
# Hold _write_lock; synchronize generating the reset frame and writing
749
754
# it
750
755
#
751
- # i/o occurs while the lock is held; waiting threads will see a delay.
756
+ # I/O occurs while the lock is held; waiting threads will see a delay.
752
757
with self ._write_lock :
753
758
with self ._conn as conn :
754
759
conn .reset_stream (stream_id , error_code = error_code )
755
760
self ._send_outstanding_data ()
756
761
757
762
# Concurrency
758
763
#
759
- # Hold _lock; the stream storage is being updated. No i/o occurs, any
764
+ # Hold _lock; the stream storage is being updated. No I/O occurs, any
760
765
# delay is proportional to the number of waiting threads.
761
766
with self ._lock :
762
767
try :
0 commit comments