Skip to content

Commit 39ef477

Browse files
authored
Merge pull request #213 from JosepSampe/pywren-dev
Docs updated & fixes
2 parents 9f56e60 + 1143073 commit 39ef477

File tree

18 files changed

+126
-113
lines changed

18 files changed

+126
-113
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def add_seven(x):
9898
if __name__ == '__main__':
9999
ibmcf = pywren.ibm_cf_executor()
100100
ibmcf.call_async(add_seven, 3)
101-
print (ibmcf.get_result())
101+
print(ibmcf.get_result())
102102
```
103103

104104
### Functions

config/README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ pw = pywren.ibm_cf_executor(rabbitmq_monitor=True)
116116

117117
|Group|Key|Default|Mandatory|Additional info|
118118
|---|---|---|---|---|
119-
|ibm_cf| endpoint | |yes | IBM Cloud Functions endpoint from [here](https://cloud.ibm.com/docs/openwhisk?topic=cloud-functions-cloudfunctions_regions#cloud-functions-endpoints). Make sure to use https:// prefix |
119+
|ibm_cf| endpoint | |yes | IBM Cloud Functions endpoint from [here](https://cloud.ibm.com/docs/openwhisk?topic=cloud-functions-cloudfunctions_regions#cloud-functions-endpoints). Make sure to use https:// prefix, for example: https://us-east.functions.cloud.ibm.com |
120120
|ibm_cf| namespace | |yes | Value of CURRENT NAMESPACE from [here](https://cloud.ibm.com/functions/namespace-settings) |
121121
|ibm_cf| api_key | | no | **Mandatory** if using Cloud Foundry-based namespace. Value of 'KEY' from [here](https://cloud.ibm.com/functions/namespace-settings)|
122122
|ibm_cf| namespace_id | |no | **Mandatory** if using IAM-based namespace with IAM API Key. Value of 'GUID' from [here](https://cloud.ibm.com/functions/namespace-settings)|
@@ -137,4 +137,13 @@ pw = pywren.ibm_cf_executor(rabbitmq_monitor=True)
137137

138138
|Group|Key|Default|Mandatory|Additional info|
139139
|---|---|---|---|---|
140-
| rabbitmq |amqp_url | |no | AMQP URL |
140+
| rabbitmq |amqp_url | |no | AMQP URL from RabbitMQ service. Make sure to use amqp:// prefix |
141+
142+
143+
### Summary of configuration keys for Knative:
144+
145+
|Group|Key|Default|Mandatory|Additional info|
146+
|---|---|---|---|---|
147+
|knative | endpoint | |no | Istio IngressGateway Endpoint. Make sure to use http:// prefix |
148+
|knative | docker_user | |yes | Docker hub username |
149+
|knative | docker_token | |yes | Login to your docker hub account and generate a new access token [here](https://hub.docker.com/settings/security)|

config/config_template.yaml

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,22 @@ ibm_cf:
2121
api_key : <API_KEY>
2222

2323
ibm_cos:
24-
endpoint : <REGION_ENDPOINT>
25-
api_key : <API_KEY>
24+
endpoint : <REGION_ENDPOINT>
25+
api_key : <API_KEY>
2626
#access_key : <ACCESS_KEY> # Optional
2727
#secret_key : <SECRET_KEY> # Optional
28-
28+
29+
#rabbitmq:
30+
#amqp_url : <RABBIT_AMQP_URL> # amqp://
31+
32+
#knative:
33+
# endpoint : <ISTIO_INGRESS_ENDPOINT>
34+
# docker_user : <DOCKER_HUB_USERNAME>
35+
# docker_token: <DOCKER_HUB_TOKEN>
36+
2937
#swift:
3038
#auth_url : <SWIFT_AUTH_URL>
3139
#region : <SWIFT_REGION>
3240
#user_id : <SWIFT_USER_ID>
3341
#project_id : <SWIFT_PROJECT_ID>
3442
#password : <SWIFT_PASSWORD>
35-
36-
#rabbitmq:
37-
#amqp_url : <RABBIT_AMQP_URL> # amqp://

docs/data-processing.md

Lines changed: 45 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,80 +6,71 @@ Additionally, the built-in data-processing logic integrates a **data partitioner
66

77

88
## Processing data from IBM Cloud Object Storage
9-
The input to the partitioner may be either a list of data objects, a list of URLs or the entire bucket itself. The partitioner is activated inside PyWren and it responsible to split the objects into smaller chunks. It executes one *`my_map_function`* for each object chunk and when all executions are completed, the partitioner executes the *`my_reduce_function`*. The reduce function will wait for all the partial results before processing them.
9+
This mode is activated when you write the parameter **obj** into the function arguments. The input to the partitioner may be either a list of buckets, a list of buckets with object prefix, or a list of data objects. If you set the *size of the chunk* or the *number of chunks*, the partitioner is activated inside PyWren and it is responsible to split the objects into smaller chunks, eventually running one function activation for each generated chunk. If *size of the chunk* and *number of chunks* are not set, chunk is an entire object, so one function activation is executed for each individual object. For example consider the following function:
1010

1111

12-
#### Partitioner get a list of objects
12+
The *obj* parameter is a python class from where you can access all the information related to the object (or chunk) that the function is processing. For example, consider the following function that shows all the available attributes in *obj*:
13+
1314

1415
```python
15-
import pywren_ibm_cloud as pywren
16+
def my_map_function(obj):
17+
print(obj.bucket)
18+
print(obj.key)
19+
print(obj.data_stream.read())
20+
print(obj.part)
21+
print(obj.data_byte_range)
22+
print(obj.chunk_size)
23+
```
1624

17-
iterdata = ['cos://bucket1/object1', 'cos://bucket1/object2', 'cos://bucket1/object3']
25+
As stated above, the allowed inputs of the function can be:
1826

19-
def my_map_function(obj):
20-
for line in obj.data_stream:
21-
# Do some process
22-
return partial_intersting_data
27+
- Input data is a bucket or a list of buckets. See a complete example in [map_reduce_cos_bucket.py](../examples/map_reduce_cos_bucket.py):
28+
```python
29+
iterdata = 'cos://bucket1'
30+
```
2331

24-
def my_reduce_function(results):
25-
for partial_intersting_data in results:
26-
# Do some process
27-
return final_result
32+
- Input data is a bucket(s) with object prefix. See a complete example in [map_cos_prefix.py](../examples/map_cos_prefix.py):
33+
```python
34+
iterdata = ['cos://bucket1/images/', 'cos://bucket1/videos/']
35+
```
36+
Notice that you must write the end slash (/) to inform partitioner you are providing an object prefix.
2837

29-
chunk_size = 4*1024**2 # 4MB
38+
- Input data is a list of object keys. See a complete example in [map_reduce_cos_key.py](../examples/map_reduce_cos_key.py):
39+
```python
40+
iterdata = ['cos://bucket1/object1', 'cos://bucket1/object2', 'cos://bucket1/object3']
41+
```
42+
43+
Notice that *iterdata* must be only one of the previous 3 types. Intermingled types are not allowed. For example, you cannot set in the same *iterdata* list a bucket and some object keys:
3044

31-
pw = pywren.ibm_cf_executor()
32-
pw.map_reduce(my_map_function, iterdata, my_reduce_function, chunk_size=chunk_size)
33-
result = pw.get_result()
45+
```python
46+
iterdata = ['cos://bucket1', 'cos://bucket1/object2', 'cos://bucket1/object3']
3447
```
3548

36-
| method | method signature |
37-
|---| ---|
38-
| `pw.map_reduce`(`my_map_function`, `iterdata`, `my_reduce_function`, `chunk_size`)| `iterdata` contains list of objects in the format of `bucket_name/object_name` |
39-
| `my_map_function`(`obj`) | `obj` is a Python class that contains the *bucket*, *key* and *data_stream* of the object assigned to the activation|
40-
41-
#### Partitioner gets entire bucket
49+
Once iterdata is defined, you can execute PyWren as usual, either using *map()* or **map_reduce()* calls. If you need to split the files in smaller chunks, you can set (optionally) the *chunk_size* or *chunk_n* parameters.
4250

43-
Commonly, a dataset may contains hundreds or thousands of files, so the previous approach where you have to specify each object one by one is not well suited in this case. With this new `map_reduce()` method you can specify, instead, the bucket name which contains all the object of the dataset.
44-
4551
```python
4652
import pywren_ibm_cloud as pywren
4753

48-
bucket_name = 'cos://my_data_bucket'
49-
50-
def my_map_function(obj, ibm_cos):
51-
for line in obj.data_stream:
52-
# Do some process
53-
return partial_intersting_data
54-
55-
def my_reduce_function(results):
56-
for partial_intersting_data in results:
57-
# Do some process
58-
return final_result
59-
6054
chunk_size = 4*1024**2 # 4MB
6155

6256
pw = pywren.ibm_cf_executor()
63-
pw.map_reduce(my_map_function, bucket_name, my_reduce_function, chunk_size=chunk_size)
57+
pw.map_reduce(my_map_function, iterdata, chunk_size=chunk_size)
6458
result = pw.get_result()
6559
```
6660

67-
* If `chunk_size=None` then partitioner's granularity is a single object.
68-
69-
| method | method signature |
70-
|---| ---|
71-
| `pw.map_reduce`(`my_map_function`, `bucket_name`, `my_reduce_function`, `chunk_size`)| `bucket_name` contains the name of the bucket |
72-
| `my_map_function`(`obj`, `ibm_cos`) | `obj` is a Python class that contains the *bucket*, *key* and *data_stream* of the object assigned to the activation. `ibm_cos` is an optional parameter which provides a `ibm_boto3.Client()`|
73-
74-
7561
## Processing data from public URLs
62+
This mode is activated when you write the parameter **url** into the function arguments. The input to the partitioner must be a list of object URls. As with COS data processing, if you set the *size of the chunk* or the *number of chunks*, the partitioner is activated inside PyWren and it is responsible to split the objects into smaller chunks, as long as the remote storage server allows requests in chunks (ranges). If range requests are not allowed in the remote storage server, each URL is treated as a single object. For example consider the following code that shows all the available attributes in *url*:
7663

7764
```python
7865
import pywren_ibm_cloud as pywren
7966

80-
iterdata = ['http://myurl/myobject1', 'http://myurl/myobject1']
81-
8267
def my_map_function(url):
68+
print(url.path)
69+
print(url.data_stream.read())
70+
print(url.part)
71+
print(url.data_byte_range)
72+
print(url.chunk_size)
73+
8374
for line in url.data_stream:
8475
# Do some process
8576
return partial_intersting_data
@@ -89,24 +80,22 @@ def my_reduce_function(results):
8980
# Do some process
9081
return final_result
9182

92-
chunk_size = 4*1024**2 # 4MB
83+
iterdata = ['http://myurl/myobject1', 'http://myurl/myobject1']
84+
chunk_n = 5
9385

9486
pw = pywren.ibm_cf_executor()
95-
pw.map_reduce(my_map_function, iterdata, my_reduce_function, chunk_size=chunk_size)
87+
pw.map_reduce(my_map_function, iterdata, my_reduce_function, chunk_n=chunk_n)
9688
result = pw.get_result()
9789
```
9890

99-
| method | method signature |
100-
|---| ---|
101-
| `pw.map_reduce`(`my_map_function`, `iterdata`, `my_reduce_function`, `chunk_size`)| `iterdata` contains list of objects in the format of `http://myurl/myobject.data` |
102-
| `my_map_function`(`url`) | `url` is an object Pytnon class that contains the url *path* assigned to the activation (an entry of iterdata) and the *data_stream*|
91+
See a complete example in [map_reduce_url.py](../examples/map_reduce_url.py).
92+
10393

10494
## Reducer granularity
105-
By default there will be one reducer for all the objects. If you need one reducer for each object, you must set the parameter
106-
`reducer_one_per_object=True` into the **map_reduce()** method.
95+
By default there will be one reducer for all the object chunks. If you need one reducer for each object, you must set the parameter
96+
`reducer_one_per_object=True` into the *map()* or *map_reduce()* methods.
10797

10898
```python
10999
pw.map_reduce(my_map_function, bucket_name, my_reduce_function,
110100
chunk_size=chunk_size, reducer_one_per_object=True)
111101
```
112-

docs/knative.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ The easiest way to make it working is to create an IBM Kubernetes (IKS) cluster
44
- Install Kubernetes v1.15.3
55
- Select a **single zone** to place the worker nodes
66
- *Master service endpoint*: Public endpoint only
7-
- You must create a cluster with at least 3 worker nodes, each one with a minimum flavor of 4vCPU and 16GB RAM
7+
- Your cluster must have 3 or more worker nodes with at least 4 cores and 16GB RAM.
88
- No need to encrypt local disk
99

1010
Once the cluster is running, follow the instructions of the "Access" tab to configure the *kubectl* client in your local machine. Then, follow one of this two options to install the PyWren environment:
1111

1212
- Option 1 (IBM IKS):
1313

14-
1. In the Dashboard of your cluster, go to the "Add-ons" tab and install knative v0.8.0. It automatically installs Istio v1.2.5 and Tekton v0.3.1.
14+
1. In the Dashboard of your cluster, go to the "Add-ons" tab and install knative v0.8.0. It automatically installs Istio v1.3.0 and Tekton v0.3.1.
1515

1616

1717
- Option 2 (IBM IKS or any other Kubernetes Cluster):
@@ -41,7 +41,7 @@ knative:
4141
docker_user: my-username
4242
docker_token: 12e9075f-6cd7-4147-a01e-8e34ffe9196e
4343
```
44-
- **docker_token**: Login to your docker hub account and generate a new docker access token [here](https://hub.docker.com/settings/security)
44+
- **docker_token**: Login to your docker hub account and generate a new access token [here](https://hub.docker.com/settings/security)
4545

4646

4747

@@ -63,6 +63,5 @@ if __name__ == '__main__':
6363
#### Check how pods and other resources are created:
6464

6565
```
66-
export KUBECONFIG=/home/... (Same as before in "Access" tab)
6766
watch kubectl get pod,revision,service,deployment -o wide
6867
```

examples/knative.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
11
"""
2-
Simple PyWren example using one single function invocation
2+
Simple PyWren example using the map method.
3+
In this example the map() method will launch one
4+
map function for each entry in 'iterdata'. Finally
5+
it will print the results for each invocation with
6+
pw.get_result()
37
"""
48
import pywren_ibm_cloud as pywren
59

610

7-
#iterdata = [1, 2, 3, 4]
8-
iterdata = range(10)
9-
#iterdata = [2, 3, 4]
10-
11-
def my_function(x):
11+
def my_function(id, x):
12+
print("I'm activation number {}".format(id))
1213
return x + 7
1314

14-
config = {'pywren': {'runtime': '<>','compute_backend': 'knative', 'storage_bucket': 'pywren-knative', 'storage_prefix': 'pywren.jobs'},
15-
#'knative': {'docker_user': 'iamapikey', 'docker_password': '<iamkey>', 'docker_repo': 'uk.icr.io'},
16-
'knative': {'docker_user': '<docker-hub user>', 'docker_password': 'docker-hub password', 'docker_repo': 'docker.io'},
17-
'ibm_cos': {}}
1815

1916
if __name__ == '__main__':
20-
pw = pywren.ibm_cf_executor(config=config)
21-
#pw.call_async(my_function, 3)
17+
iterdata = [1, 2, 3, 4]
18+
pw = pywren.knative_executor()
2219
pw.map(my_function, iterdata)
23-
print (pw.get_result())
20+
print(pw.get_result())

examples/map.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
In this example the map() method will launch one
44
map function for each entry in 'iterdata'. Finally
55
it will print the results for each invocation with
6-
pw.get_all_result()
6+
pw.get_result()
77
"""
88
import pywren_ibm_cloud as pywren
99

pywren_ibm_cloud/compute/backends/knative/knative.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import urllib3
1515
urllib3.disable_warnings()
1616
logging.getLogger('kubernetes').setLevel(logging.CRITICAL)
17+
logging.getLogger('urllib3.connectionpool').setLevel(logging.CRITICAL)
1718

1819
#Monkey patch for issue: https://github.com/kubernetes-client/python/issues/895
1920
from kubernetes.client.models.v1_container_image import V1ContainerImage
@@ -411,7 +412,7 @@ def list_runtimes(self, docker_image_name='all'):
411412
runtimes = [[docker_image_name, 256]]
412413
return runtimes
413414

414-
def invoke(self, docker_image_name, memory, payload):
415+
def invoke(self, docker_image_name, memory, payload, return_result=False):
415416
"""
416417
Invoke -- return information about this invocation
417418
"""
@@ -425,12 +426,16 @@ def invoke(self, docker_image_name, memory, payload):
425426
route = payload.get("service_route", '/')
426427

427428
try:
429+
logger.debug('ExecutorID {} | JobID {} - Starting function invocation {}'
430+
.format(exec_id, job_id, call_id))
428431
start = time.time()
429432
parsed_url = urlparse(self.endpoint)
430433
conn = http.client.HTTPConnection(parsed_url.netloc, timeout=600)
431434
conn.request("POST", route,
432435
body=json.dumps(payload),
433436
headers=self.headers)
437+
logger.debug('ExecutorID {} | JobID {} - Function invocation {} done. Waiting '
438+
'for a response'.format(exec_id, job_id, call_id))
434439
resp = conn.getresponse()
435440
resp_status = resp.status
436441
resp_data = resp.read().decode("utf-8")
@@ -440,28 +445,30 @@ def invoke(self, docker_image_name, memory, payload):
440445

441446
if resp_status in [200, 202]:
442447
data = json.loads(resp_data)
443-
log_msg = ('ExecutorID {} - Function invocation {} done! ({}s) '
444-
.format(exec_id, call_id, resp_time))
448+
log_msg = ('ExecutorID {} | JobID {} - Function activation {} finished! ({}s) '
449+
.format(exec_id, job_id, call_id, resp_time))
445450
logger.debug(log_msg)
446-
return exec_id + job_id + call_id, data
451+
if return_result:
452+
return data
453+
return data["activationId"]
447454
elif resp_status == 404:
448455
raise Exception("PyWren runtime is not deployed in your k8s cluster")
449456
else:
450-
log_msg = ('ExecutorID {} - Function invocation {} failed: {} {}'
451-
.format(exec_id, call_id, resp_status, resp_data))
457+
log_msg = ('ExecutorID {} | JobID {} - Function invocation {} failed: {} {}'
458+
.format(exec_id, job_id, call_id, resp_status, resp_data))
452459
logger.debug(log_msg)
453460

454461
except Exception as e:
455462
conn.close()
456-
log_msg = ('ExecutorID {} - Function invocation {} failed: {}'
457-
.format(exec_id, call_id, str(e)))
463+
log_msg = ('ExecutorID {} | JobID {} - Function invocation {} failed: {}'
464+
.format(exec_id, job_id, call_id, str(e)))
458465
logger.debug(log_msg)
459466

460467
def invoke_with_result(self, docker_image_name, memory, payload={}):
461468
"""
462469
Invoke waiting for a result -- return information about this invocation
463470
"""
464-
return self.invoke(docker_image_name, memory, payload)
471+
return self.invoke(docker_image_name, memory, payload, return_result=True)
465472

466473
def get_runtime_key(self, docker_image_name, runtime_memory):
467474
"""
@@ -484,11 +491,11 @@ def _generate_runtime_meta(self, docker_image_name, memory):
484491
payload['service_route'] = "/preinstalls"
485492
logger.debug("Extracting Python modules list from: {}".format(docker_image_name))
486493
try:
487-
_, runtime_meta = self.invoke_with_result(docker_image_name, memory, payload)
494+
runtime_meta = self.invoke_with_result(docker_image_name, memory, payload)
488495
except Exception as e:
489496
raise Exception("Unable to invoke 'modules' action {}".format(e))
490497

491498
if not runtime_meta or 'preinstalls' not in runtime_meta:
492-
raise Exception(runtime_meta)
499+
raise Exception('Failed getting runtime metadata: {}'.format(runtime_meta))
493500

494501
return runtime_meta

pywren_ibm_cloud/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
DATA_CLEANER_DEFAULT = False
3131
MAX_AGG_DATA_SIZE = 4e6
3232
INVOCATION_RETRY_DEFAULT = True
33-
RETRY_SLEEPS_DEFAULT = [1, 2, 4, 8]
33+
RETRY_SLEEPS_DEFAULT = [4, 8, 16, 24]
3434
RETRIES_DEFAULT = 5
3535

3636
CONFIG_DIR = os.path.expanduser('~/.pywren')

0 commit comments

Comments
 (0)