Skip to content

Commit 9975435

Browse files
committed
Support functools.partial with FunctionExecutor
Fixes #1428
1 parent ce25605 commit 9975435

File tree

5 files changed

+53
-16
lines changed

5 files changed

+53
-16
lines changed

CHANGELOG.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@
66
-
77

88
### Changed
9-
-
9+
-
1010

1111
### Fixed
12-
-
12+
- [Executor] Support use of `functools.partial` with FunctionExecutor's
13+
`call_async` and `map` methods
1314

1415
## [v3.6.3]
1516

1617
### Fixed
17-
- Fixed memory available options for aws batch: 4 cpus
18-
- Fixed race condition and improving monitor stability
18+
- Fixed memory available options for aws batch: 4 cpus
19+
- Fixed race condition and improving monitor stability
1920

2021

2122
## [v3.6.2]
@@ -103,7 +104,7 @@
103104
- [AWS Batch] Updated CPU and Memory resource limits
104105

105106
### Fixed
106-
- [AWS Lambda] Fixed wrong AWS Lambda delete runtime_name match semantics
107+
- [AWS Lambda] Fixed wrong AWS Lambda delete runtime_name match semantics
107108
- [Worker] Fixed potential issue that can appear during 'func_obj' loading from cache
108109
- [Monitor] Fixed potential 'keyerror' exceptions
109110
- [Swift] Fixed OpenStack Swift parameters and authentication by adding domain information
@@ -175,7 +176,7 @@
175176
### Added
176177
- [k8s] Added a new way of invoking functions using a RabbitMQ work queue
177178
- [IBM VPC] Added "zone" config parameter
178-
- [IBM Code Engine] Get and print an error message in case of container execution failure
179+
- [IBM Code Engine] Get and print an error message in case of container execution failure
179180

180181
### Changed
181182
- [OpenWhisk] Updated default runtimes
@@ -723,7 +724,7 @@
723724
- [Config] Allow 'log_stream' and 'log_filename' keys in configuration
724725
- [Config] Allow 'runtime' being configured at serverless backend level
725726
- [Config] Allow 'invoke_pool_threads' being configured at serverless backend level
726-
- [Multiprocessing] Added generic Manager
727+
- [Multiprocessing] Added generic Manager
727728
- [Kubernetes] Add kubernetes job backend
728729
- [CLI] Extended lithops cli with storage put, get, delete and list options
729730
- [Azure] Added missing azure functions backend methods
@@ -970,7 +971,7 @@
970971
- [Core] Generic compute client logic
971972
- [Core] IBM IAM service client lib
972973
- [Core] IBM VPC service client lib
973-
- [Docker] Docker backend compatible with IBM VPC VM
974+
- [Docker] Docker backend compatible with IBM VPC VM
974975

975976
### Changed
976977
- [Docker] Improved Docker executor
@@ -1599,8 +1600,8 @@
15991600
- Docs updated
16001601
- Storage separation
16011602
- Project update. 'bx' and 'wsk' CLI tools are no longer necessary
1602-
- Updated setup.py
1603-
- Deleted requirements.txt
1603+
- Updated setup.py
1604+
- Deleted requirements.txt
16041605
- Updated default_preinstalls
16051606

16061607
### Fixed
@@ -1658,7 +1659,7 @@ First release.
16581659
- When a new *executor class* is instantiated, it is created a *storage_handler* used in the all PyWren execution.
16591660
- Now it is possible to specify the **runtime** when the user instantiates the *executor class* instead of changing the config file every time (In the config file is specified the default runtime).
16601661
- The **logging level** is now specified when the user instantiates the *executor class* instead of put it in the first line of the code within an env variable.
1661-
- The PyWren code which is executed remotely as a wrapper of the function now uses the main storage handler as the rest of the PyWren code. In previous versions, PyWren creates a new storage client directly with *boto3* library instead of using pywren/storage/storage.py wrapper.
1662+
- The PyWren code which is executed remotely as a wrapper of the function now uses the main storage handler as the rest of the PyWren code. In previous versions, PyWren creates a new storage client directly with *boto3* library instead of using pywren/storage/storage.py wrapper.
16621663
- Added support for multiple parameters in the functions which are executed remotely as a cloud functions. Previous versions just allows one parameter.
16631664
- Eased the usage of the storage backend within a function. By simply specifying *storage_handler* as a parameter of the function, the user will get access to the storage backend.
16641665
- Added a new method for retrieving the results of an execution called **fetch_all_resuslts()**. Previous PyWren versions already includes a method called *get_all_results()*, but this is a sequential method and it takes long time to retrieve all the results. It was also included a *wait()* class which is more similar to *get_all_results()* method, the main difference is that the new method is all based on *list the available objects in a bucket*, and it returns when all the tasks are finished. The new method also has the possibility to activate a progress bar in order to track the current status of the execution (really useful for larger executions).

lithops/job/serialize.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import cloudpickle
2525
from pathlib import Path
2626
from dis import Bytecode
27-
from functools import reduce
27+
from functools import partial, reduce
2828
from importlib import import_module
2929
from types import CodeType, FunctionType, ModuleType
3030

@@ -151,6 +151,9 @@ def _module_inspect(self, obj):
151151
for k, v in linspect.getmembers_static(param):
152152
if inspect.isfunction(v) or (inspect.ismethod(v) and inspect.isfunction(v.__func__)):
153153
worklist.append(v)
154+
elif isinstance(obj, partial):
155+
found_methods = ["__call__"]
156+
worklist.append(obj.func)
154157
else:
155158
# The obj is the user's function but in form of a class
156159
found_methods = []
@@ -159,8 +162,10 @@ def _module_inspect(self, obj):
159162
found_methods.append(k)
160163
worklist.append(v)
161164
if "__call__" not in found_methods:
162-
raise Exception('The class you passed as the function to '
163-
'run must contain the "__call__" method')
165+
raise ValueError(
166+
"The class you passed as the function to "
167+
'run must contain the "__call__" method'
168+
)
164169

165170
# The worklist is only used for analyzing functions
166171
for fn in worklist:
@@ -199,7 +204,7 @@ def _module_inspect(self, obj):
199204
elif inspect.iscode(v):
200205
codeworklist.append(v)
201206

202-
return set([mod_name.split('.')[0] for mod_name in mods])
207+
return {mod_name.split(".")[0] for mod_name in mods}
203208

204209
def _inner_module_inspect(self, inst):
205210
"""

lithops/tests/functions.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ def _func(x):
3838
return futures
3939

4040

41+
def lithops_return_futures_map_over_partial(x):
42+
from functools import partial
43+
44+
def _func(x, y):
45+
return x * y
46+
47+
fexec = lithops.FunctionExecutor()
48+
futures = fexec.map(partial(_func, 2), range(x))
49+
50+
# this while loop is required to pass localhost tests on Windows
51+
while not all(f.running or f.ready for f in futures):
52+
time.sleep(0.1)
53+
54+
return futures
55+
56+
4157
def lithops_return_futures_call_async(x):
4258
def _func(x):
4359
return x + 1

lithops/tests/test_call_async.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ def test_set_iterdata(self):
4949
result = fexec.get_result()
5050
assert result == 10
5151

52+
def test_partial_function(self):
53+
from functools import partial
54+
55+
fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
56+
fexec.call_async(partial(simple_map_function, 4), 6)
57+
result = fexec.get_result()
58+
assert result == 10
59+
5260
def test_dict_iterdata(self):
5361
fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
5462
fexec.call_async(simple_map_function, {'x': 2, 'y': 8})

lithops/tests/test_map.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
hello_world,
2222
lithops_inside_lithops_map_function,
2323
lithops_return_futures_map,
24+
lithops_return_futures_map_over_partial,
2425
lithops_return_futures_call_async,
2526
lithops_return_futures_map_multiple,
26-
concat
27+
concat,
2728
)
2829

2930

@@ -106,6 +107,12 @@ def test_lithops_return_futures_map(self):
106107
result = fexec.get_result()
107108
assert result == [1, 2, 3]
108109

110+
def test_lithops_return_futures_map_over_partial(self):
111+
fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
112+
fexec.call_async(lithops_return_futures_map_over_partial, 3)
113+
result = fexec.get_result()
114+
assert result == [0, 2, 4]
115+
109116
def test_lithops_return_futures_call_async(self):
110117
fexec = lithops.FunctionExecutor(config=pytest.lithops_config)
111118
fexec.call_async(lithops_return_futures_call_async, 3)

0 commit comments

Comments
 (0)