Skip to content

Commit b2b011b

Browse files
authored
Merge pull request #216 from JosepSampe/pywren-dev
Take into account pywren version in knative runtimes
2 parents 2cc3f65 + afe6655 commit b2b011b

File tree

7 files changed

+53
-51
lines changed

7 files changed

+53
-51
lines changed

docs/data-processing.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ Additionally, the built-in data-processing logic integrates a **data partitioner
66

77

88
## Processing data from IBM Cloud Object Storage
9-
This mode is activated when you write the parameter **obj** into the function arguments. The input to the partitioner may be either a list of buckets, a list of buckets with object prefix, or a list of data objects. If you set the *size of the chunk* or the *number of chunks*, the partitioner is activated inside PyWren and it is responsible to split the objects into smaller chunks, eventually running one function activation for each generated chunk. If *size of the chunk* and *number of chunks* are not set, chunk is an entire object, so one function activation is executed for each individual object. For example consider the following function:
10-
9+
This mode is activated when you write the parameter **obj** into the function arguments. The input to the partitioner may be either a list of buckets, a list of buckets with object prefix, or a list of data objects. If you set the *size of the chunk* or the *number of chunks*, the partitioner is activated inside PyWren and it is responsible to split the objects into smaller chunks, eventually running one function activation for each generated chunk. If *size of the chunk* and *number of chunks* are not set, chunk is an entire object, so one function activation is executed for each individual object.
1110

1211
The *obj* parameter is a python class from where you can access all the information related to the object (or chunk) that the function is processing. For example, consider the following function that shows all the available attributes in *obj*:
1312

@@ -24,26 +23,26 @@ def my_map_function(obj):
2423

2524
As stated above, the allowed inputs of the function can be:
2625

27-
- Input data is a bucket or a list of buckets. See a complete example in [map_reduce_cos_bucket.py](../examples/map_reduce_cos_bucket.py):
26+
- Input data is a bucket or a list of buckets. See an example in [map_reduce_cos_bucket.py](../examples/map_reduce_cos_bucket.py):
2827
```python
2928
iterdata = 'cos://bucket1'
3029
```
3130

32-
- Input data is a bucket(s) with object prefix. See a complete example in [map_cos_prefix.py](../examples/map_cos_prefix.py):
31+
- Input data is a bucket(s) with object prefix. See an example in [map_cos_prefix.py](../examples/map_cos_prefix.py):
3332
```python
3433
iterdata = ['cos://bucket1/images/', 'cos://bucket1/videos/']
3534
```
3635
Notice that you must write the end slash (/) to inform partitioner you are providing an object prefix.
3736

38-
- Input data is a list of object keys. See a complete example in [map_reduce_cos_key.py](../examples/map_reduce_cos_key.py):
37+
- Input data is a list of object keys. See an example in [map_reduce_cos_key.py](../examples/map_reduce_cos_key.py):
3938
```python
4039
iterdata = ['cos://bucket1/object1', 'cos://bucket1/object2', 'cos://bucket1/object3']
4140
```
4241

43-
Notice that *iterdata* must be only one of the previous 3 types. Intermingled types are not allowed. For example, you cannot set in the same *iterdata* list a bucket and some object keys:
42+
Notice that *iterdata* must be only one of the previous 3 types. Intermingled types are not allowed. For example, you cannot set in the same *iterdata* a bucket and some object keys:
4443

4544
```python
46-
iterdata = ['cos://bucket1', 'cos://bucket1/object2', 'cos://bucket1/object3']
45+
iterdata = ['cos://bucket1', 'cos://bucket1/object2', 'cos://bucket1/object3'] # Not allowed
4746
```
4847

4948
Once iterdata is defined, you can execute PyWren as usual, either using *map()* or **map_reduce()* calls. If you need to split the files in smaller chunks, you can set (optionally) the *chunk_size* or *chunk_n* parameters.
@@ -92,8 +91,7 @@ See a complete example in [map_reduce_url.py](../examples/map_reduce_url.py).
9291

9392

9493
## Reducer granularity
95-
By default there will be one reducer for all the object chunks. If you need one reducer for each object, you must set the parameter
96-
`reducer_one_per_object=True` into the *map()* or *map_reduce()* methods.
94+
When using the `map_reduce()` API call with `chunk_size` or `chunk_n`, by default there will be only one reducer for all the object chunks from all the objects. Alternatively, you can spawn one reducer for each object by setting the parameter `reducer_one_per_object=True`.
9795

9896
```python
9997
pw.map_reduce(my_map_function, bucket_name, my_reduce_function,

pywren_ibm_cloud/compute/backends/knative/config.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,6 @@
9999
value: .
100100
- name: pathToDockerFile
101101
value: runtime/knative/Dockerfile
102-
- name: imageTag
103-
value: latest
104102
serviceAccount: pywren-build-pipeline
105103
"""
106104

pywren_ibm_cloud/compute/backends/knative/knative.py

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,18 @@ def _create_build_resources(self):
163163
task_def = yaml.safe_load(kconfig.task_def)
164164
task_name = task_def['metadata']['name']
165165

166-
git_url_param = {'name': 'url', 'value': kconfig.GIT_URL_DEFAULT}
167-
git_rev_param = {'name': 'revision', 'value': kconfig.GIT_REV_DEFAULT}
168-
params = [git_url_param, git_rev_param]
166+
if 'git_url' in self.knative_config:
167+
git_url_param = {'name': 'url', 'value': self.knative_config['git_url']}
168+
else:
169+
git_url_param = {'name': 'url', 'value': kconfig.GIT_URL_DEFAULT}
170+
171+
if 'git_rev' in self.knative_config:
172+
git_rev_param = {'name': 'revision', 'value': self.knative_config['git_rev']}
173+
else:
174+
revision = 'master' if 'SNAPSHOT' in __version__ else __version__
175+
git_rev_param = {'name': 'revision', 'value': revision}
169176

177+
params = [git_url_param, git_rev_param]
170178
git_res['spec']['params'] = params
171179

172180
try:
@@ -216,16 +224,22 @@ def _build_docker_image_from_git(self, docker_image_name):
216224
Builds the docker image and pushes it to the docker container registry
217225
"""
218226
logger.debug("Building default docker image from git")
227+
228+
revision = 'latest' if 'SNAPSHOT' in __version__ else __version__
229+
230+
if self.knative_config['docker_repo'] == 'docker.io' and revision != 'latest':
231+
resp = requests.get('https://index.docker.io/v1/repositories/{}/tags/{}'
232+
.format(docker_image_name, revision))
233+
if resp.status_code == 200:
234+
logger.debug('Docker image docker.io/{}:{} already created in Dockerhub. '
235+
'Skipping build process.'.format(docker_image_name, revision))
236+
return
237+
219238
task_run = yaml.safe_load(kconfig.task_run)
220239
image_url = {'name': 'imageUrl', 'value': '/'.join([self.knative_config['docker_repo'], docker_image_name])}
221240
task_run['spec']['inputs']['params'].append(image_url)
222-
#image_tag = {'name': 'imageTag', 'value': __version__}
223-
#task_run['spec']['inputs']['params'].append(image_tag)
224-
225-
resp = requests.get('https://index.docker.io/v1/repositories/{}/tags/latest'.format(docker_image_name))
226-
if resp.status_code == 200:
227-
logger.debug('Docker image already created in Dockerhub. Skipping build process.')
228-
return
241+
image_tag = {'name': 'imageTag', 'value': revision}
242+
task_run['spec']['inputs']['params'].append(image_tag)
229243

230244
self._create_account_resources()
231245
self._create_build_resources()
@@ -251,6 +265,7 @@ def _build_docker_image_from_git(self, docker_image_name):
251265
body=task_run
252266
)
253267

268+
logger.debug("Building image...")
254269
pod_name = None
255270
w = watch.Watch()
256271
for event in w.stream(self.api.list_namespaced_custom_object, namespace=self.namespace,
@@ -290,15 +305,15 @@ def _create_service(self, docker_image_name, runtime_memory, timeout):
290305
logger.debug("Creating PyWren runtime service resource in k8s")
291306
svc_res = yaml.safe_load(kconfig.service_res)
292307

308+
revision = 'latest' if 'SNAPSHOT' in __version__ else __version__
309+
# TODO: Take into account revision in service name
293310
service_name = self._format_service_name(docker_image_name, runtime_memory)
294311
svc_res['metadata']['name'] = service_name
295312
svc_res['metadata']['namespace'] = self.namespace
296313

297314
svc_res['spec']['template']['spec']['timeoutSeconds'] = timeout
298-
299315
docker_image = '/'.join([self.knative_config['docker_repo'], docker_image_name])
300-
svc_res['spec']['template']['spec']['container']['image'] = docker_image
301-
316+
svc_res['spec']['template']['spec']['container']['image'] = '{}:{}'.format(docker_image, revision)
302317
svc_res['spec']['template']['spec']['container']['resources']['limits']['memory'] = '{}Mi'.format(runtime_memory)
303318

304319
try:
@@ -353,8 +368,9 @@ def create_runtime(self, docker_image_name, memory, timeout=kconfig.RUNTIME_TIME
353368
"""
354369
default_runtime_img_name = self._get_default_runtime_image_name()
355370
if docker_image_name in ['default', default_runtime_img_name]:
356-
# We only build default image. rest of images must already exist
371+
# We only build the default image. rest of images must already exist
357372
# in the docker registry.
373+
docker_image_name = default_runtime_img_name
358374
self._build_docker_image_from_git(default_runtime_img_name)
359375

360376
service_url = self._create_service(docker_image_name, memory, timeout)
@@ -399,28 +415,16 @@ def delete_runtime(self, docker_image_name, memory):
399415
plural="services",
400416
body=client.V1DeleteOptions()
401417
)
402-
except Exception as e:
403-
if json.loads(e.body)['code'] == 404:
404-
log_msg = 'Knative service: resource "{}" Not Found'.format(service_name)
405-
logger.debug(log_msg)
418+
except Exception:
419+
pass
406420

407421
def delete_all_runtimes(self):
408422
"""
409423
Deletes all runtimes deployed in knative
410424
"""
411425
runtimes = self.list_runtimes()
412-
for image_name, memory in runtimes:
413-
service_name = self._format_service_name(image_name, memory)
414-
log_msg = 'Deleting runtime: {}'.format(service_name)
415-
logger.debug(log_msg)
416-
self.api.delete_namespaced_custom_object(
417-
group="serving.knative.dev",
418-
version="v1alpha1",
419-
name=service_name,
420-
namespace=self.namespace,
421-
plural="services",
422-
body=client.V1DeleteOptions()
423-
)
426+
for docker_image_name, memory in runtimes:
427+
self.delete_runtime(docker_image_name, memory)
424428

425429
def list_runtimes(self, docker_image_name='all'):
426430
"""

pywren_ibm_cloud/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def map_reduce(self, map_function, map_iterdata, reduce_function, extra_params=N
275275
self._state = ExecutorState.running
276276

277277
if reducer_wait_local:
278-
self.monitor(futures=map_futures)
278+
self.wait(fs=map_futures)
279279

280280
reduce_job_id = 'R{}'.format(job_id)
281281

@@ -548,7 +548,7 @@ def clean(self, local_execution=True, delete_all=False):
548548
'STORE_RESULT': False}
549549
old_stdout = sys.stdout
550550
sys.stdout = open(os.devnull, 'w')
551-
self.executor.call_async(clean_os_bucket, [storage_bucket, storage_prerix], extra_env=extra_env)
551+
self.call_async(clean_os_bucket, [storage_bucket, storage_prerix], extra_env=extra_env)
552552
sys.stdout = old_stdout
553553

554554
self._state = ExecutorState.finished

pywren_ibm_cloud/runtime/function_handler/handler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import logging
2424
import traceback
2525
import subprocess
26-
from multiprocessing import Value
26+
from multiprocessing import Pipe
2727
from distutils.util import strtobool
2828
from pywren_ibm_cloud import version
2929
from pywren_ibm_cloud.utils import sizeof_fmt
@@ -136,8 +136,8 @@ def function_handler(event):
136136
setup_time = time.time()
137137
response_status['setup_time'] = round(setup_time - start_time, 8)
138138

139-
jr_success_flag = Value('i', 0)
140-
tr = JobRunner(jobrunner_config, jr_success_flag)
139+
handler_conn, jobrunner_conn = Pipe()
140+
tr = JobRunner(jobrunner_config, jobrunner_conn)
141141
tr.daemon = True
142142
logger.debug('Starting JobRunner process')
143143
tr.start()
@@ -152,7 +152,9 @@ def function_handler(event):
152152
'seconds and was killed'.format(execution_timeout))
153153
raise Exception('OUTATIME', msg)
154154

155-
if jr_success_flag.value == 0:
155+
try:
156+
handler_conn.recv()
157+
except EOFError:
156158
logger.error('No completion message received from JobRunner process')
157159
logger.debug('Assuming memory overflow...')
158160
# Only 1 message is returned by jobrunner when it finishes.

pywren_ibm_cloud/runtime/function_handler/jobrunner.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ def __del__(self):
5555

5656
class JobRunner(Process):
5757

58-
def __init__(self, jr_config, jr_success_flag):
58+
def __init__(self, jr_config, jobrunner_conn):
5959
super().__init__()
6060
start_time = time.time()
6161
self.jr_config = jr_config
62-
self.success_flag = jr_success_flag
62+
self.jobrunner_conn = jobrunner_conn
6363

6464
log_level = self.jr_config['log_level']
6565
cloud_logging_config(log_level)
@@ -312,5 +312,5 @@ def run(self):
312312
self.internal_storage.put_data(self.output_key, pickled_output)
313313
output_upload_timestamp_t2 = time.time()
314314
self.stats.write("output_upload_time", round(output_upload_timestamp_t2 - output_upload_timestamp_t1, 8))
315-
self.success_flag.value = 1
315+
self.jobrunner_conn.send("Finished")
316316
logger.info("Finished")

pywren_ibm_cloud/storage/storage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ def put_runtime_meta(self, key, runtime_meta):
217217

218218
if not is_remote_cluster():
219219
filename_local_path = os.path.join(CACHE_DIR, *path)
220-
logger.debug("Saving runtime metadata into local cache: {}".format(filename_local_path))
220+
logger.debug("Storing runtime metadata into local cache: {}".format(filename_local_path))
221221

222222
if not os.path.exists(os.path.dirname(filename_local_path)):
223223
os.makedirs(os.path.dirname(filename_local_path))

0 commit comments

Comments
 (0)