Skip to content

Commit 7095e9e

Browse files
committed
Initial commit
1 parent 9013080 commit 7095e9e

File tree

9 files changed

+1158
-41
lines changed

9 files changed

+1158
-41
lines changed

Lib/test/_test_multiprocessing.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,12 @@ def _resource_unlink(name, rtype):
137137

138138
WAIT_ACTIVE_CHILDREN_TIMEOUT = 5.0
139139

140-
HAVE_GETVALUE = not getattr(_multiprocessing,
141-
'HAVE_BROKEN_SEM_GETVALUE', False)
140+
# Since gh-125828, we no longer need HAVE_GETVALUE.
141+
# This value should be remove from Modules/_multiprocessing/multiprocessing.c.
142+
# when cleanup is complete.
143+
# -------------------
144+
# HAVE_GETVALUE = not getattr(_multiprocessing,
145+
# 'HAVE_BROKEN_SEM_GETVALUE', False)
142146

143147
WIN32 = (sys.platform == "win32")
144148

@@ -6741,6 +6745,61 @@ def f(x): return x*x
67416745
self.assertEqual("332833500", out.decode('utf-8').strip())
67426746
self.assertFalse(err, msg=err.decode('utf-8'))
67436747

6748+
#
6749+
# Tests for workaround macOSX Semaphore
6750+
#
6751+
6752+
ACQUIRE, RELEASE = range(2)
6753+
@unittest.skipIf(sys.platform != "darwin", "MacOSX only")
6754+
class _TestMacOSXSemaphore(BaseTestCase):
6755+
ALLOWED_TYPES = ('processes',)
6756+
@classmethod
6757+
def _run_thread(cls, sem, meth, ntime, delay):
6758+
if meth == ACQUIRE:
6759+
for _ in range(ntime):
6760+
sem.acquire()
6761+
time.sleep(delay)
6762+
else:
6763+
for _ in range(ntime):
6764+
sem.release()
6765+
time.sleep(delay)
6766+
6767+
@classmethod
6768+
def _run_process(cls, sem, sem_meth, nthread=1, ntime=10, delay=0.1):
6769+
ts = []
6770+
for _ in range(nthread):
6771+
t = threading.Thread(target=cls._run_thread,
6772+
args=(sem, sem_meth, ntime, delay))
6773+
ts.append(t)
6774+
for t in ts:
6775+
t.start()
6776+
for t in ts:
6777+
t.join()
6778+
6779+
def test_mix_several_acquire_release(self):
6780+
# n processes, threads per process and loops per threads
6781+
n_p_acq, n_th_acq, n_loop_acq = 15, 5, 20
6782+
n_p_rel, n_th_rel, n_loop_rel = 8, 8, 8
6783+
6784+
n_acq = n_p_acq*n_th_acq*n_loop_acq
6785+
n_rel = n_p_rel*n_th_rel*n_loop_rel
6786+
sem = self.Semaphore(n_acq)
6787+
ps = []
6788+
for _ in range(n_p_acq):
6789+
p = self.Process(target=self._run_process,
6790+
args=(sem, ACQUIRE, n_th_acq, n_loop_acq, 0.01))
6791+
ps.append(p)
6792+
6793+
for _ in range(n_p_rel):
6794+
p = self.Process(target=self._run_process,
6795+
args=(sem, RELEASE, n_th_rel, n_loop_rel, 0.005))
6796+
ps.append(p)
6797+
6798+
for p in ps:
6799+
p.start()
6800+
for p in ps:
6801+
p.join()
6802+
self.assertEqual(sem.get_value(), n_rel)
67446803

67456804
#
67466805
# Mixins
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#include <unistd.h>
2+
#include <stdio.h> // puts, printf, scanf
3+
#include <time.h> // ctime, time
4+
#include <string.h> // memcpy, memcmp
5+
6+
#include <semaphore.h> // sem_t
7+
typedef sem_t *SEM_HANDLE;
8+
9+
#define MAX_SEMAPHORES_SHOW 32
10+
11+
#include "../semaphore_macosx.h"
12+
#include "shared_mem.h"
13+
14+
// Static datas for each process.
15+
CountersWorkaround shm_semlock_counters = {
16+
.state_this = THIS_NOT_OPEN,
17+
.name_shm = "/shm_gh125828",
18+
.handle_shm = (MEMORY_HANDLE)0,
19+
.create_shm = 0,
20+
.name_shm_lock = "/mp_gh125828",
21+
.handle_shm_lock = (SEM_HANDLE)0,
22+
.header = (HeaderObject *)NULL,
23+
.counters = (CounterObject *)NULL,
24+
};
25+
26+
HeaderObject *header = NULL;
27+
CounterObject *counter = NULL;
28+
29+
static char *show_counter(char *p, CounterObject *counter) {
30+
sprintf(p, "p:%p, n:%s, v:%d, u:%d, t:%s", counter,
31+
counter->sem_name,
32+
counter->internal_value,
33+
counter->unlink_done,
34+
ctime(&counter->ctimestamp));
35+
return p;
36+
}
37+
38+
static void dump_shm_semlock_counters(void) {
39+
puts(__func__);
40+
41+
char buf[256];
42+
int i = 0, j = 0;
43+
44+
if (shm_semlock_counters.state_this == THIS_AVAILABLE) {
45+
CounterObject *counter = shm_semlock_counters.counters;
46+
HeaderObject *header = shm_semlock_counters.header;
47+
dump_shm_semlock_header_counters();
48+
dump_shm_semlock_header();
49+
int show_max = header->n_semlocks > MAX_SEMAPHORES_SHOW ? MAX_SEMAPHORES_SHOW : header->n_semlocks;
50+
for(; i < header->n_slots && j < show_max; i++, counter++ ) {
51+
if (counter->sem_name[0] != 0) {
52+
printf("%s", show_counter(buf, counter));
53+
++j;
54+
}
55+
}
56+
if (show_max < header->n_semlocks) {
57+
printf("......\n--------- More %d Semphores ---------\n", header->n_semlocks-show_max);
58+
}
59+
}
60+
}
61+
62+
int main(int argc, char *argv[]) {
63+
int repeat = 0;
64+
long udelay = 5000;
65+
HeaderObject save = {0};
66+
int unlink = 0;
67+
int force_open = 1;
68+
int release_lock = 1;
69+
70+
puts("--------");
71+
printf("PID:%d, PPID:%d\n", getpid(), getppid());
72+
connect_shm_semlock_counters(unlink, force_open, release_lock);
73+
puts("+++++++++");
74+
if (argc > 1) {
75+
sscanf(argv[1], "%d", &repeat);
76+
if (argc >= 2) {
77+
puts(argv[2]);
78+
sscanf(argv[2], "%lu", &udelay);
79+
}
80+
} else {
81+
puts("dump_shared_mem <repeat> <delay> where:\n repeat (-1 "
82+
"is infinite) and a delay (us) between two dumps \n");
83+
return 1;
84+
}
85+
86+
printf("Repeat:%d, udelay:%lu\n", repeat, udelay);
87+
88+
if (shm_semlock_counters.state_this == THIS_AVAILABLE) {
89+
memset(&save, '\0', sizeof(save));
90+
do {
91+
if (memcmp(&save, shm_semlock_counters.header, sizeof(HeaderObject)) ) {
92+
time_t timestamp = time(NULL);
93+
puts(ctime(&timestamp));
94+
dump_shm_semlock_counters();
95+
memcpy(&save, shm_semlock_counters.header, sizeof(HeaderObject));
96+
puts("==========");
97+
}
98+
usleep(udelay);
99+
} while(repeat--);
100+
}
101+
return 1;
102+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
gcc -o ./dump_shm ./dump_shared_mem.c ./shared_mem.c
2+
gcc -o ./reset_shm ./reset_shared_mem.c ./shared_mem.c
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
** For MacOSX only **
2+
---
3+
4+
This directory contains 2 programs :
5+
6+
* dump_shared_mem: view content of shared memory.
7+
* reset_shared_mem: erase all stored datas of shared memory.
8+
9+
the `make_all.sh` batch builds these 2 programs.
10+
11+
# dump_shm.
12+
13+
`dump_shm` tries to connect to the shared memory only if its exists.
14+
This program doesn't use synchronization primitive to read the shared memory.
15+
To quit this program, press `Ctrl+C`.
16+
17+
```zsh
18+
dump_shm -1 300
19+
```
20+
Executes this program forever, and check all 300 *us* if shared memory changes.
21+
22+
When there are changes in the shared memory (only about sempahore count), program prints the new content of shared memory as below:
23+
24+
```zsh
25+
==========
26+
Tue Feb 25 17:04:05 2025
27+
28+
dump_shm_semlock_counters
29+
header:0x1022b4000 - counter array:0x1022b4010
30+
n sems:2 - n sem_slots:87551, n procs:1, size_shm:2801664
31+
p:0x1022b4010, n:/mp-fwl20ahw, v:6, r:0, t:Tue Feb 25 17:04:05 2025
32+
p:0x1022b4030, n:/mp-z3635cdr, v:6, r:0, t:Tue Feb 25 17:04:04 2025
33+
34+
```
35+
36+
# reset_shm.
37+
38+
`reset_shm` tries to connect to the shared memory only if its exists.
39+
This program uses synchronization primitive to read the shared memory.
40+
41+
When exits, this program calls `shm_unlink`.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#include <unistd.h>
2+
#include <stdio.h> // puts, printf, scanf
3+
#include <string.h> // memcpy, memcmp, memset
4+
5+
#include <semaphore.h>
6+
typedef sem_t *SEM_HANDLE;
7+
8+
#include "../semaphore_macosx.h"
9+
#include "shared_mem.h"
10+
11+
// Static datas for each process.
12+
CountersWorkaround shm_semlock_counters = {
13+
.state_this = THIS_NOT_OPEN,
14+
.name_shm = "/shm_gh125828",
15+
.handle_shm = (MEMORY_HANDLE)0,
16+
.create_shm = 0,
17+
.name_shm_lock = "/mp_gh125828",
18+
.handle_shm_lock = (SEM_HANDLE)0,
19+
.header = (HeaderObject *)NULL,
20+
.counters = (CounterObject *)NULL,
21+
};
22+
23+
HeaderObject *header = NULL;
24+
CounterObject *counter = NULL;
25+
26+
static void reset_shm_semlock_counters(int size, int nb_slots) {
27+
puts(__func__);
28+
29+
if (shm_semlock_counters.state_this == THIS_AVAILABLE) {
30+
if (ACQUIRE_SHM_LOCK) {
31+
CounterObject *counter = shm_semlock_counters.counters;
32+
HeaderObject *header = shm_semlock_counters.header;
33+
dump_shm_semlock_header_counters();
34+
dump_shm_semlock_header();
35+
long size_to_reset = header->size_shm-sizeof(HeaderObject);
36+
printf("1 - size to reset:%lu\n", size_to_reset);
37+
if (size && size <= size_to_reset) {
38+
memset(counter, 0, size);
39+
40+
} else {
41+
memset(counter, 0, size_to_reset);
42+
}
43+
puts("2 - Reset all header parameters");
44+
if (nb_slots) {
45+
header->n_slots = nb_slots;
46+
}
47+
header->n_semlocks = 0;
48+
header->n_slots = CALC_NB_SLOTS(header->size_shm);
49+
header->n_procs = 0;
50+
dump_shm_semlock_header();
51+
RELEASE_SHM_LOCK;
52+
}
53+
} else {
54+
puts("No datas");
55+
}
56+
}
57+
58+
int main(int argc, char *argv[]) {
59+
char c;
60+
int size = CALC_SIZE_SHM;
61+
int nb_slots = CALC_NB_SLOTS(size);
62+
int unlink = 1;
63+
int force_open = 1;
64+
int release_lock = 1;
65+
66+
if (argc >= 2) {
67+
sscanf(argv[2], "%d", &size);
68+
nb_slots = CALC_NB_SLOTS(size);
69+
}
70+
puts("--------");
71+
printf("size:%d, sem slots:%d\n", size, nb_slots);
72+
connect_shm_semlock_counters(unlink, force_open, release_lock);
73+
puts("+++++++++");
74+
dump_shm_semlock_header_counters();
75+
dump_shm_semlock_header();
76+
if (shm_semlock_counters.state_this == THIS_AVAILABLE) {
77+
puts("confirm (Y/N):");
78+
c = getchar();
79+
if ( c == 'Y' || c == 'y') {
80+
reset_shm_semlock_counters(size, nb_slots);
81+
}
82+
}
83+
return 1;
84+
}

0 commit comments

Comments
 (0)