Skip to content

Commit 266ca7c

Browse files
author
Jorge Ejarque
committed
adding minimal workflow
1 parent e79ee67 commit 266ca7c

File tree

4 files changed

+227
-0
lines changed

4 files changed

+227
-0
lines changed

minimal_workflow/wordcount/README

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
This is the Readme for:
2+
WordCount
3+
4+
[Name]: Word Count
5+
[Contact Person]: [email protected]
6+
[Access Level]: public
7+
[License Agreement]: Apache2
8+
[Platform]: COMPSs
9+
10+
[Body]
11+
== Description ==
12+
Wordcount is an application that counts the number of words for a given set of files
13+
To allow parallelism every file is treated separately and merged afterwards.
14+
15+
Wordcount_block is an application that counts the number of words for a given file. It allow parallelism for a block of a file.
16+
17+
== Execution instructions ==
18+
Usage:
19+
runcompss src/wordcount.py <datasetPath>
20+
21+
or
22+
23+
runcompss src/wordcount_blocks.py <datasetPath> <output_file_path> <block_size>
24+
25+
where:
26+
* - datasetPath: Absolute path of the file to parse (e.g. /home/compss/tutorial_apps/python/wordcount/data/)
27+
runcompss
28+

minimal_workflow/wordcount/spack.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
spack:
2+
specs:
3+
- compss
4+
concretization: together
5+
config:
6+
install_tree: /opt/software
7+
view: /opt/view
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#!/usr/bin/python
2+
#
3+
# Copyright 2002-2019 Barcelona Supercomputing Center (www.bsc.es)
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# -*- coding: utf-8 -*-
19+
20+
import sys
21+
import os
22+
import time
23+
24+
from pycompss.api.task import task
25+
from pycompss.api.parameter import *
26+
27+
28+
@task(file_path=FILE_IN, returns=list)
29+
def read_file(file_path):
30+
""" Read a file and return a list of words.
31+
:param file_path: file's path
32+
:return: list of words
33+
"""
34+
data = []
35+
with open(file_path, 'r') as f:
36+
for line in f:
37+
data += line.split()
38+
return data
39+
40+
41+
@task(returns=dict)
42+
def wordCount(data):
43+
""" Construct a frequency word dictorionary from a list of words.
44+
:param data: a list of words
45+
:return: a dictionary where key=word and value=#appearances
46+
"""
47+
partialResult = {}
48+
for entry in data:
49+
if entry in partialResult:
50+
partialResult[entry] += 1
51+
else:
52+
partialResult[entry] = 1
53+
return partialResult
54+
55+
56+
@task(returns=dict, priority=True)
57+
def merge_two_dicts(dic1, dic2):
58+
""" Update a dictionary with another dictionary.
59+
:param dic1: first dictionary
60+
:param dic2: second dictionary
61+
:return: dic1+=dic2
62+
"""
63+
for k in dic2:
64+
if k in dic1:
65+
dic1[k] += dic2[k]
66+
else:
67+
dic1[k] = dic2[k]
68+
return dic1
69+
70+
71+
if __name__ == "__main__":
72+
from pycompss.api.api import compss_wait_on
73+
74+
# Start counting time...
75+
start_time = time.time()
76+
77+
# Get the dataset path
78+
pathDataset = sys.argv[1]
79+
80+
# Construct a list with the file's paths from the dataset
81+
paths = []
82+
for fileName in os.listdir(pathDataset):
83+
paths.append(os.path.join(pathDataset, fileName))
84+
85+
# Read file's content execute a wordcount on each of them
86+
partialResult = []
87+
for p in paths:
88+
data = read_file(p)
89+
partialResult.append(wordCount(data))
90+
91+
# Accumulate the partial results to get the final result.
92+
result = {}
93+
for partial in partialResult:
94+
result = merge_two_dicts(result, partial)
95+
96+
# Wait for result
97+
result = compss_wait_on(result)
98+
99+
elapsed_time = time.time() - start_time
100+
101+
# Print the results and elapsed time
102+
print("Word appearances:")
103+
from pprint import pprint
104+
pprint(result)
105+
print("Elapsed Time (s): " + str(elapsed_time))
106+
print("Words: " + str(sum(result.values())))
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#!/usr/bin/python
2+
#
3+
# Copyright 2002-2019 Barcelona Supercomputing Center (www.bsc.es)
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# -*- coding: utf-8 -*-
19+
20+
'''Wordcount Block divide'''
21+
import sys
22+
import os
23+
import pickle
24+
import time
25+
from pycompss.api.task import task
26+
from pycompss.api.parameter import *
27+
from pycompss.api.api import compss_wait_on
28+
29+
30+
def read_word(file_object):
31+
for line in file_object:
32+
for word in line.split():
33+
yield word
34+
35+
36+
def read_word_by_word(fp, sizeBlock):
37+
"""Lazy function (generator) to read a file piece by piece in
38+
chunks of size approx sizeBlock"""
39+
data = open(fp)
40+
block = []
41+
for word in read_word(data):
42+
block.append(word)
43+
if sys.getsizeof(block) > sizeBlock:
44+
yield block
45+
block = []
46+
if block:
47+
yield block
48+
49+
50+
@task(returns=dict)
51+
def wordCount(data):
52+
partialResult = {}
53+
for entry in data:
54+
if entry not in partialResult:
55+
partialResult[entry] = 1
56+
else:
57+
partialResult[entry] += 1
58+
return partialResult
59+
60+
61+
@task(dic1=INOUT)
62+
def merge_two_dicts(dic1, dic2):
63+
for k in dic2:
64+
if k in dic1:
65+
dic1[k] += dic2[k]
66+
else:
67+
dic1[k] = dic2[k]
68+
69+
if __name__ == "__main__":
70+
pathFile = sys.argv[1]
71+
resultFile = sys.argv[2]
72+
sizeBlock = int(sys.argv[3])
73+
74+
start = time.time()
75+
result = {}
76+
for block in read_word_by_word(pathFile, sizeBlock):
77+
presult = wordCount(block)
78+
merge_two_dicts(result, presult)
79+
result = compss_wait_on(result)
80+
81+
elapsed = time.time() - start
82+
print("Elapsed Time: " + str(elapsed))
83+
84+
ff = open(resultFile, 'w')
85+
ff.write(str(result), ff)
86+
ff.close()

0 commit comments

Comments
 (0)