Skip to content

Commit 870c956

Browse files
committed
Reorganize repository & prepare pip package
1 parent df6aa8f commit 870c956

File tree

6 files changed

+77
-57
lines changed

6 files changed

+77
-57
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
*.pkl
22
.vscode
33
__pycache__
4-
*checkpoints
4+
*checkpoints
5+
build
6+
dist
7+
*.egg-info
File renamed without changes.

pandarallel/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__version__ = '0.1.0'
Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
import pandas as pd
2-
import pyarrow.plasma as plasma
3-
from pyarrow.lib import PlasmaStoreFull
4-
import multiprocessing
5-
import itertools
6-
from concurrent.futures import ProcessPoolExecutor
1+
import pandas as _pd
2+
import pyarrow.plasma as _plasma
3+
from pyarrow.lib import PlasmaStoreFull as _PlasmaStoreFull
4+
import multiprocessing as _multiprocessing
5+
import itertools as _itertools
6+
from concurrent.futures import ProcessPoolExecutor as _ProcessPoolExecutor
77

8-
plasma_store_ctx = None
9-
plasma_store_name = None
8+
__all__ = ['shm_size', 'nb_workers']
9+
10+
_plasma_store_ctx = None
11+
_plasma_store_name = None
1012

1113
shm_size = int(2e9) # 2 Go
12-
nb_workers = multiprocessing.cpu_count()
14+
nb_workers = _multiprocessing.cpu_count()
1315

14-
def chunk(nb_elem, nb_chunks):
16+
def _chunk(nb_elem, nb_chunks):
1517
quotient = nb_elem // nb_chunks
1618
remainder = nb_elem % nb_chunks
1719

@@ -22,7 +24,7 @@ def chunk(nb_elem, nb_chunks):
2224
quotient + remainder for quotient, remainder
2325
in zip(quotients, remainders)
2426
]
25-
accumulated = list(itertools.accumulate(nb_elems_per_chunk))
27+
accumulated = list(_itertools.accumulate(nb_elems_per_chunk))
2628
shifted_accumulated = accumulated.copy()
2729
shifted_accumulated.insert(0, 0)
2830
shifted_accumulated.pop()
@@ -32,25 +34,25 @@ def chunk(nb_elem, nb_chunks):
3234
in zip(shifted_accumulated, accumulated)
3335
]
3436

35-
def parallel(func):
37+
def _parallel(func):
3638
def wrapper(*args, **kwargs):
3739
try:
38-
global plasma_store_ctx
39-
global plasma_store_name
40+
global _plasma_store_ctx
41+
global _plasma_store_name
4042

41-
if not plasma_store_ctx:
43+
if not _plasma_store_ctx:
4244
mem_mo = round(shm_size / 1e6, 2)
4345
msg = f"New pandarallel shared memory created - \
4446
Size: {mem_mo} Mo"
4547
print(msg)
46-
plasma_store_ctx = plasma.start_plasma_store(shm_size)
47-
plasma_store_name, _ = plasma_store_ctx.__enter__()
48+
_plasma_store_ctx = _plasma.start_plasma_store(shm_size)
49+
_plasma_store_name, _ = _plasma_store_ctx.__enter__()
4850

4951
print(f"Running task on {nb_workers} workers")
5052

5153
return func(*args, **kwargs)
5254

53-
except PlasmaStoreFull:
55+
except _PlasmaStoreFull:
5456
msg = f"The pandarallel shared memory: \
5557
{round(shm_size / 1e6, 2)} Mo is too small to allow parallel computation. \
5658
Just after pandarallel import, please write: \
@@ -61,28 +63,28 @@ def wrapper(*args, **kwargs):
6163

6264
return wrapper
6365

64-
class Series:
66+
class _Series:
6567
@staticmethod
6668
def worker(plasma_store_name, object_id, chunk, func):
67-
client = plasma.connect(plasma_store_name)
69+
client = _plasma.connect(plasma_store_name)
6870
series = client.get(object_id)
6971
return client.put(series[chunk].map(func))
7072

7173
@staticmethod
72-
@parallel
74+
@_parallel
7375
def map(data, func):
74-
client = plasma.connect(plasma_store_name)
75-
chunks = chunk(data.size, nb_workers)
76+
client = _plasma.connect(_plasma_store_name)
77+
chunks = _chunk(data.size, nb_workers)
7678
object_id = client.put(data)
7779

78-
with ProcessPoolExecutor(max_workers=nb_workers) as executor:
80+
with _ProcessPoolExecutor(max_workers=nb_workers) as executor:
7981
futures = [
80-
executor.submit(Series.worker, plasma_store_name,
81-
object_id, chunk, func)
82-
for chunk in chunks
82+
executor.submit(_Series.worker, _plasma_store_name,
83+
object_id, _chunk, func)
84+
for _chunk in chunks
8385
]
8486

85-
result = pd.concat([
87+
result = _pd.concat([
8688
client.get(future.result())
8789
for future in futures
8890
], copy=False)
@@ -91,51 +93,51 @@ def map(data, func):
9193

9294
return result
9395

94-
pd.Series.parallel_map = Series.map
96+
_pd.Series.parallel_map = _Series.map
9597

96-
class DataFrameGroupBy:
98+
class _DataFrameGroupBy:
9799
@staticmethod
98100
def worker(plasma_store_name, object_id, func):
99-
client = plasma.connect(plasma_store_name)
101+
client = _plasma.connect(plasma_store_name)
100102
df = client.get(object_id)
101103
return client.put(func(df))
102104

103105
@staticmethod
104-
@parallel
106+
@_parallel
105107
def apply(data, func):
106-
client = plasma.connect(plasma_store_name)
108+
client = _plasma.connect(_plasma_store_name)
107109
keys = data.groups.keys()
108110

109-
with ProcessPoolExecutor(max_workers=nb_workers) as executor:
111+
with _ProcessPoolExecutor(max_workers=nb_workers) as executor:
110112
futures = [
111-
executor.submit(DataFrameGroupBy.worker,
112-
plasma_store_name,
113+
executor.submit(_DataFrameGroupBy.worker,
114+
_plasma_store_name,
113115
client.put(data.get_group(key)),
114116
func)
115117
for key in keys
116118
]
117119

118-
result = pd.DataFrame([
120+
result = _pd.DataFrame([
119121
client.get(future.result())
120122
for future in futures
121-
], index=pd.Series(list(data.grouper),
123+
], index=_pd.Series(list(data.grouper),
122124
name=data.keys))
123125

124126
client.delete(client.list().keys())
125127

126128
return result
127129

128-
pd.core.groupby.DataFrameGroupBy.parallel_apply = DataFrameGroupBy.apply
130+
_pd.core.groupby.DataFrameGroupBy.parallel_apply = _DataFrameGroupBy.apply
129131

130-
class DataFrame:
132+
class _DataFrame:
131133
@staticmethod
132134
def worker(plasma_store_name, object_id, chunk, func, **kwargs):
133-
client = plasma.connect(plasma_store_name)
135+
client = _plasma.connect(plasma_store_name)
134136
df = client.get(object_id)
135137
return client.put(df[chunk].apply(func, **kwargs))
136138

137139
@staticmethod
138-
@parallel
140+
@_parallel
139141
def apply(data, func, **kwargs):
140142
axis = kwargs.get("axis", 0)
141143
if axis == 0:
@@ -144,18 +146,18 @@ def apply(data, func, **kwargs):
144146
Implementation of dataframe.parallel_apply with axis=0 will come soon."
145147
raise NotImplementedError(msg)
146148

147-
client = plasma.connect(plasma_store_name)
148-
chunks = chunk(data.shape[0], nb_workers)
149+
client = _plasma.connect(_plasma_store_name)
150+
chunks = _chunk(data.shape[0], nb_workers)
149151
object_id = client.put(data)
150152

151-
with ProcessPoolExecutor(max_workers=nb_workers) as executor:
153+
with _ProcessPoolExecutor(max_workers=nb_workers) as executor:
152154
futures = [
153-
executor.submit(DataFrame.worker, plasma_store_name,
154-
object_id, chunk, func, **kwargs)
155-
for chunk in chunks
155+
executor.submit(_DataFrame.worker, _plasma_store_name,
156+
object_id, _chunk, func, **kwargs)
157+
for _chunk in chunks
156158
]
157159

158-
result = pd.concat([
160+
result = _pd.concat([
159161
client.get(future.result())
160162
for future in futures
161163
], copy=False)
@@ -165,4 +167,4 @@ def apply(data, func, **kwargs):
165167
return result
166168

167169

168-
pd.DataFrame.parallel_apply = DataFrame.apply
170+
_pd.DataFrame.parallel_apply = _DataFrame.apply

setup.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from setuptools import setup, find_packages
2+
3+
import pandarallel
4+
5+
setup(
6+
name='pandarallel',
7+
version=pandarallel.__version__,
8+
packages=find_packages(),
9+
author='Manu NALEPA',
10+
author_email='nalepae@gmail.com',
11+
description='An easy to use library to speed up computation (by parallelizing on multi CPUs) with pandas.',
12+
long_description=open('README.md').read(),
13+
url='https://github.com/nalepae/pandarallel',
14+
)

test.py renamed to tests/test.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import pandarallel
1+
import pandarallel.pandarallel
22

3-
import pandas as pd
3+
import pandas as _pd
44
import numpy as np
55
import math
66

@@ -14,12 +14,12 @@ def func_for_dataframe_groupby_apply(df):
1414
dum = 0
1515
for item in df.b:
1616
dum += math.log10(math.sqrt(math.exp(item**2)))
17-
17+
1818
return dum / len(df.b)
1919

2020
def test_dataframe_apply():
2121
df_size = int(1e1)
22-
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
22+
df = _pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
2323
b=np.random.rand(df_size)))
2424

2525
res = df.apply(func_for_dataframe_apply, axis=1)
@@ -28,15 +28,15 @@ def test_dataframe_apply():
2828

2929
def test_series_map():
3030
df_size = int(1e1)
31-
df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1))
31+
df = _pd.DataFrame(dict(a=np.random.rand(df_size) + 1))
3232

3333
res = df.a.map(func_for_series_map)
3434
res_parallel = df.a.parallel_map(func_for_series_map)
3535
assert res.equals(res_parallel)
3636

3737
def test_dataframe_groupby_apply():
3838
df_size = int(1e1)
39-
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
39+
df = _pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
4040
b=np.random.rand(df_size)))
4141

4242
res = df.groupby("a").apply(func_for_dataframe_groupby_apply)

0 commit comments

Comments
 (0)