Skip to content

Commit 31877c9

Browse files
matheustavaresgitster
authored andcommitted
object-store: allow threaded access to object reading
Allow object reading to be performed by multiple threads protecting it with an internal lock, the obj_read_mutex. The lock usage can be toggled with enable_obj_read_lock() and disable_obj_read_lock(). Currently, the functions which can be safely called in parallel are: read_object_file_extended(), repo_read_object_file(), read_object_file(), read_object_with_reference(), read_object(), oid_object_info() and oid_object_info_extended(). It's also possible to use obj_read_lock() and obj_read_unlock() to protect other sections that cannot execute in parallel with object reading. Probably there are many spots in the functions listed above that could be executed unlocked (and thus, in parallel). But, for now, we are most interested in allowing parallel access to zlib inflation. This is one of the sections where object reading spends most of the time in (e.g. up to one-third of git-grep's execution time in the chromium repo corresponds to inflation) and it's already thread-safe. So, to take advantage of that, the obj_read_mutex is released when calling git_inflate() and re-acquired right after, for every calling spot in oid_object_info_extended()'s call chain. We may refine this lock to also exploit other possible parallel spots in the future, but for now, threaded zlib inflation should already give great speedups for threaded object reading callers. Note that add_delta_base_cache() was also modified to skip adding already present entries to the cache. This wasn't possible before, but it would be now, with the parallel inflation. Take for example the following situation, where two threads - A and B - are executing the code at unpack_entry(): 1. Thread A is performing the decompression of a base O (which is not yet in the cache) at PHASE II. Thread B is simultaneously trying to unpack O, but just starting at PHASE I. 2. Since O is not yet in the cache, B will go to PHASE II to also perform the decompression. 3. When they finish decompressing, one of them will get the object reading mutex and go to PHASE III while the other waits for the mutex. Let’s say A got the mutex first. 4. Thread A will add O to the cache, go throughout the rest of PHASE III and return. 5. Thread B gets the mutex, also add O to the cache (if the check wasn't there) and returns. Finally, it is also important to highlight that the object reading lock can only ensure thread-safety in the mentioned functions thanks to two complementary mechanisms: the use of 'struct raw_object_store's replace_mutex, which guards sections in the object reading machinery that would otherwise be thread-unsafe; and the 'struct pack_window's inuse_cnt, which protects window reading operations (such as the one performed during the inflation of a packed object), allowing them to execute without the acquisition of the obj_read_mutex. Signed-off-by: Matheus Tavares <[email protected]> Signed-off-by: Junio C Hamano <[email protected]>
1 parent b1fc9da commit 31877c9

File tree

3 files changed

+119
-5
lines changed

3 files changed

+119
-5
lines changed

object-store.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "list.h"
77
#include "sha1-array.h"
88
#include "strbuf.h"
9+
#include "thread-utils.h"
910

1011
struct object_directory {
1112
struct object_directory *next;
@@ -251,6 +252,40 @@ int has_loose_object_nonlocal(const struct object_id *);
251252

252253
void assert_oid_type(const struct object_id *oid, enum object_type expect);
253254

255+
/*
256+
* Enabling the object read lock allows multiple threads to safely call the
257+
* following functions in parallel: repo_read_object_file(), read_object_file(),
258+
* read_object_file_extended(), read_object_with_reference(), read_object(),
259+
* oid_object_info() and oid_object_info_extended().
260+
*
261+
* obj_read_lock() and obj_read_unlock() may also be used to protect other
262+
* section which cannot execute in parallel with object reading. Since the used
263+
* lock is a recursive mutex, these sections can even contain calls to object
264+
* reading functions. However, beware that in these cases zlib inflation won't
265+
* be performed in parallel, losing performance.
266+
*
267+
* TODO: oid_object_info_extended()'s call stack has a recursive behavior. If
268+
* any of its callees end up calling it, this recursive call won't benefit from
269+
* parallel inflation.
270+
*/
271+
void enable_obj_read_lock(void);
272+
void disable_obj_read_lock(void);
273+
274+
extern int obj_read_use_lock;
275+
extern pthread_mutex_t obj_read_mutex;
276+
277+
static inline void obj_read_lock(void)
278+
{
279+
if(obj_read_use_lock)
280+
pthread_mutex_lock(&obj_read_mutex);
281+
}
282+
283+
static inline void obj_read_unlock(void)
284+
{
285+
if(obj_read_use_lock)
286+
pthread_mutex_unlock(&obj_read_mutex);
287+
}
288+
254289
struct object_info {
255290
/* Request */
256291
enum object_type *typep;

packfile.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,23 @@ unsigned long get_size_from_delta(struct packed_git *p,
10861086
do {
10871087
in = use_pack(p, w_curs, curpos, &stream.avail_in);
10881088
stream.next_in = in;
1089+
/*
1090+
* Note: the window section returned by use_pack() must be
1091+
* available throughout git_inflate()'s unlocked execution. To
1092+
* ensure no other thread will modify the window in the
1093+
* meantime, we rely on the packed_window.inuse_cnt. This
1094+
* counter is incremented before window reading and checked
1095+
* before window disposal.
1096+
*
1097+
* Other worrying sections could be the call to close_pack_fd(),
1098+
* which can close packs even with in-use windows, and to
1099+
* reprepare_packed_git(). Regarding the former, mmap doc says:
1100+
* "closing the file descriptor does not unmap the region". And
1101+
* for the latter, it won't re-open already available packs.
1102+
*/
1103+
obj_read_unlock();
10891104
st = git_inflate(&stream, Z_FINISH);
1105+
obj_read_lock();
10901106
curpos += stream.next_in - in;
10911107
} while ((st == Z_OK || st == Z_BUF_ERROR) &&
10921108
stream.total_out < sizeof(delta_head));
@@ -1445,6 +1461,14 @@ static void add_delta_base_cache(struct packed_git *p, off_t base_offset,
14451461
struct delta_base_cache_entry *ent = xmalloc(sizeof(*ent));
14461462
struct list_head *lru, *tmp;
14471463

1464+
/*
1465+
* Check required to avoid redundant entries when more than one thread
1466+
* is unpacking the same object, in unpack_entry() (since its phases I
1467+
* and III might run concurrently across multiple threads).
1468+
*/
1469+
if (in_delta_base_cache(p, base_offset))
1470+
return;
1471+
14481472
delta_base_cached += base_size;
14491473

14501474
list_for_each_safe(lru, tmp, &delta_base_cache_lru) {
@@ -1574,7 +1598,15 @@ static void *unpack_compressed_entry(struct packed_git *p,
15741598
do {
15751599
in = use_pack(p, w_curs, curpos, &stream.avail_in);
15761600
stream.next_in = in;
1601+
/*
1602+
* Note: we must ensure the window section returned by
1603+
* use_pack() will be available throughout git_inflate()'s
1604+
* unlocked execution. Please refer to the comment at
1605+
* get_size_from_delta() to see how this is done.
1606+
*/
1607+
obj_read_unlock();
15771608
st = git_inflate(&stream, Z_FINISH);
1609+
obj_read_lock();
15781610
if (!stream.avail_out)
15791611
break; /* the payload is larger than it should be */
15801612
curpos += stream.next_in - in;

sha1-file.c

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,8 @@ static int unpack_loose_short_header(git_zstream *stream,
11471147
unsigned char *map, unsigned long mapsize,
11481148
void *buffer, unsigned long bufsiz)
11491149
{
1150+
int ret;
1151+
11501152
/* Get the data stream */
11511153
memset(stream, 0, sizeof(*stream));
11521154
stream->next_in = map;
@@ -1155,7 +1157,11 @@ static int unpack_loose_short_header(git_zstream *stream,
11551157
stream->avail_out = bufsiz;
11561158

11571159
git_inflate_init(stream);
1158-
return git_inflate(stream, 0);
1160+
obj_read_unlock();
1161+
ret = git_inflate(stream, 0);
1162+
obj_read_lock();
1163+
1164+
return ret;
11591165
}
11601166

11611167
int unpack_loose_header(git_zstream *stream,
@@ -1200,7 +1206,9 @@ static int unpack_loose_header_to_strbuf(git_zstream *stream, unsigned char *map
12001206
stream->avail_out = bufsiz;
12011207

12021208
do {
1209+
obj_read_unlock();
12031210
status = git_inflate(stream, 0);
1211+
obj_read_lock();
12041212
strbuf_add(header, buffer, stream->next_out - (unsigned char *)buffer);
12051213
if (memchr(buffer, '\0', stream->next_out - (unsigned char *)buffer))
12061214
return 0;
@@ -1240,8 +1248,11 @@ static void *unpack_loose_rest(git_zstream *stream,
12401248
*/
12411249
stream->next_out = buf + bytes;
12421250
stream->avail_out = size - bytes;
1243-
while (status == Z_OK)
1251+
while (status == Z_OK) {
1252+
obj_read_unlock();
12441253
status = git_inflate(stream, Z_FINISH);
1254+
obj_read_lock();
1255+
}
12451256
}
12461257
if (status == Z_STREAM_END && !stream->avail_in) {
12471258
git_inflate_end(stream);
@@ -1411,17 +1422,40 @@ static int loose_object_info(struct repository *r,
14111422
return (status < 0) ? status : 0;
14121423
}
14131424

1425+
int obj_read_use_lock = 0;
1426+
pthread_mutex_t obj_read_mutex;
1427+
1428+
void enable_obj_read_lock(void)
1429+
{
1430+
if (obj_read_use_lock)
1431+
return;
1432+
1433+
obj_read_use_lock = 1;
1434+
init_recursive_mutex(&obj_read_mutex);
1435+
}
1436+
1437+
void disable_obj_read_lock(void)
1438+
{
1439+
if (!obj_read_use_lock)
1440+
return;
1441+
1442+
obj_read_use_lock = 0;
1443+
pthread_mutex_destroy(&obj_read_mutex);
1444+
}
1445+
14141446
int fetch_if_missing = 1;
14151447

1416-
int oid_object_info_extended(struct repository *r, const struct object_id *oid,
1417-
struct object_info *oi, unsigned flags)
1448+
static int do_oid_object_info_extended(struct repository *r,
1449+
const struct object_id *oid,
1450+
struct object_info *oi, unsigned flags)
14181451
{
14191452
static struct object_info blank_oi = OBJECT_INFO_INIT;
14201453
struct pack_entry e;
14211454
int rtype;
14221455
const struct object_id *real = oid;
14231456
int already_retried = 0;
14241457

1458+
14251459
if (flags & OBJECT_INFO_LOOKUP_REPLACE)
14261460
real = lookup_replace_object(r, oid);
14271461

@@ -1497,7 +1531,7 @@ int oid_object_info_extended(struct repository *r, const struct object_id *oid,
14971531
rtype = packed_object_info(r, e.p, e.offset, oi);
14981532
if (rtype < 0) {
14991533
mark_bad_packed_object(e.p, real->hash);
1500-
return oid_object_info_extended(r, real, oi, 0);
1534+
return do_oid_object_info_extended(r, real, oi, 0);
15011535
} else if (oi->whence == OI_PACKED) {
15021536
oi->u.packed.offset = e.offset;
15031537
oi->u.packed.pack = e.p;
@@ -1508,6 +1542,17 @@ int oid_object_info_extended(struct repository *r, const struct object_id *oid,
15081542
return 0;
15091543
}
15101544

1545+
int oid_object_info_extended(struct repository *r, const struct object_id *oid,
1546+
struct object_info *oi, unsigned flags)
1547+
{
1548+
int ret;
1549+
obj_read_lock();
1550+
ret = do_oid_object_info_extended(r, oid, oi, flags);
1551+
obj_read_unlock();
1552+
return ret;
1553+
}
1554+
1555+
15111556
/* returns enum object_type or negative */
15121557
int oid_object_info(struct repository *r,
15131558
const struct object_id *oid,
@@ -1580,6 +1625,7 @@ void *read_object_file_extended(struct repository *r,
15801625
if (data)
15811626
return data;
15821627

1628+
obj_read_lock();
15831629
if (errno && errno != ENOENT)
15841630
die_errno(_("failed to read object %s"), oid_to_hex(oid));
15851631

@@ -1595,6 +1641,7 @@ void *read_object_file_extended(struct repository *r,
15951641
if ((p = has_packed_and_bad(r, repl->hash)) != NULL)
15961642
die(_("packed object %s (stored in %s) is corrupt"),
15971643
oid_to_hex(repl), p->pack_name);
1644+
obj_read_unlock();
15981645

15991646
return NULL;
16001647
}

0 commit comments

Comments
 (0)