Skip to content

Commit f1aa2ba

Browse files
Merge pull request #5 from gbloisi-openaire/PR644
Changes to run on cloud environments
2 parents 2934d32 + a83135b commit f1aa2ba

File tree

4 files changed

+59
-50
lines changed

4 files changed

+59
-50
lines changed

AttRank.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@
6161
# Set the mode by default as local.
6262
# If data is read from hdfs we switch to cluster
6363
mode = 'local'
64-
if input_file.startswith('hdfs://'):
64+
# Detect execution mode from input path.
65+
# Any URI-scheme path (hdfs://, s3a://, gs://, etc.) is treated as distributed,
66+
if '://' in input_file:
6567
mode = 'distributed'
6668
############################################
6769
print("Mode is: " + mode)
@@ -114,7 +116,7 @@
114116
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file)
115117
elif mode == 'distributed':
116118
print ("Reading input from HDFS...")
117-
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'paper')
119+
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition('paper')
118120

119121
# Get number of nodes (one node-record per line)
120122
num_nodes = float(input_data.count())
@@ -134,7 +136,7 @@
134136
print ("Current Year: ", current_year)
135137
print ("Convergence Error: ", max_error)
136138
print ("Number of nodes: ", num_nodes)
137-
print ("Number of partitions: ", num_partitions)
139+
#print ("Number of partitions: ", num_partitions)
138140
print ("Checkpoint mode: " + str(checkpoint_mode))
139141
print ("Checkpoint dir: " + checkpoint_dir)
140142
print ("# ------------------------------------ #\n")
@@ -163,17 +165,17 @@
163165
.select('paper', 'cited_papers', F.expr('size(cited_papers)-2').alias('cited_paper_size'), 'pub_year')\
164166
.select('paper', F.expr('slice(cited_papers, 1, cited_paper_size)').alias('cited_papers'), 'pub_year')\
165167
.select('paper', F.array_join('cited_papers', '|').alias('cited_papers'), 'pub_year')\
166-
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition(num_partitions, 'paper').cache()
168+
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition('paper').cache()
167169

168170
# Create a DataFrame with nodes filtered based on whether they cite others or not
169-
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition(num_partitions, 'paper').cache()
171+
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition('paper').cache()
170172

171173
# Continue intialisation message
172174
print(".", end = '')
173175
sys.stdout.flush()
174176

175177
# Collect the dangling nodes from the data - cache it since it will be reused
176-
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition(num_partitions, 'paper').cache()
178+
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition('paper').cache()
177179

178180

179181
# Continue intialisation message
@@ -184,12 +186,12 @@
184186
# --> Create a DataFrame with the time-based exponential scores <--
185187
# 1. Get paper-publication year pairs.
186188
paper_years = input_data.select('paper', F.col('pub_year').alias('year')).withColumn('year_fixed', F.when( (F.col('year').cast(IntegerType()) < 1000) | (F.col('year').cast(IntegerType()) > int(current_year)) | (F.col('year') == "\\N"), 0).otherwise(F.col('year')))
187-
paper_years = paper_years.select('paper', F.col('year_fixed').alias('year')).repartition(num_partitions, 'paper').cache()
189+
paper_years = paper_years.select('paper', F.col('year_fixed').alias('year')).repartition('paper').cache()
188190
# 2. Get paper-exponential score-based pairs
189191
paper_exp = paper_years.withColumn('exp_score', F.lit(F.exp(exponential * (current_year+1-paper_years.year) )) ).drop('year')
190192
# 3. Normalize exponential scores so they add to one
191193
exp_score_sum = paper_exp.agg({'exp_score':'sum'}).collect()[0][0]
192-
paper_exp = paper_exp.select('paper', (paper_exp.exp_score/float(exp_score_sum)).alias('exp_score')).repartition(num_partitions, 'paper').cache()
194+
paper_exp = paper_exp.select('paper', (paper_exp.exp_score/float(exp_score_sum)).alias('exp_score')).repartition('paper').cache()
193195

194196
# Continue Initialisation message
195197
print(".", end = '')
@@ -212,13 +214,13 @@
212214
# 2. Get total number of citations made in the specified year range
213215
total_citations_in_range = paper_citations.agg({'citations_in_range':'sum'}).collect()[0][0]
214216
# 3. Calculate preferential attachment probabilities - cache them since they will be reused
215-
paper_attention = paper_citations.select('paper', (F.col('citations_in_range') / total_citations_in_range).alias('attention')).repartition(num_partitions, 'paper').cache()
217+
paper_attention = paper_citations.select('paper', (F.col('citations_in_range') / total_citations_in_range).alias('attention')).repartition('paper').cache()
216218
# Continue Initialisation message
217219
print(".", end = '')
218220
sys.stdout.flush()
219221
###########################################################
220222
# --> Get paper exponential scores and attention scores in a single 'materialized' table <--
221-
vector_scores = paper_attention.join(paper_exp, 'paper', 'right_outer').fillna(0.0, ['attention']).repartition(num_partitions, 'paper').cache()
223+
vector_scores = paper_attention.join(paper_exp, 'paper', 'right_outer').fillna(0.0, ['attention']).repartition('paper').cache()
222224
# vector_scores.count()
223225
# Continue Initialisation message
224226
print(".", end = '')
@@ -264,7 +266,7 @@
264266
.groupBy('paper')\
265267
.agg(F.sum('transferred_score').alias('transferred_score_sum'))\
266268
.join(vector_scores, 'paper', 'right_outer')\
267-
.repartition(num_partitions, 'paper')\
269+
.repartition('paper')\
268270
.fillna(0.0, ['transferred_score_sum'])\
269271
.select('paper', (alpha*(F.col('transferred_score_sum')+dangling_sum) + beta*F.col('attention') + gamma*F.col('exp_score') ).alias('score'))\
270272
.join(previous_scores, 'paper')\
@@ -310,7 +312,7 @@
310312
print ("\n# ------------------------------------ #\n")
311313
print("Finished score calculations. Preparing classes and normalized scores!")
312314

313-
scores = scores.repartition(num_partitions, 'paper').cache()
315+
scores = scores.repartition('paper').cache()
314316
max_score = scores.agg({'score': 'max'}).collect()[0]['max(score)']
315317

316318
# Define the top ranges in number of papers
@@ -322,15 +324,15 @@
322324
# ------------------------------------------------------------------------------------------------------ #
323325
# This code is included for small testing datasets. The percentages required may be < 1 for small datasets
324326
top_001_offset = 1 if top_001_offset <= 1 else top_001_offset
325-
top_01_offset = 1 if top_001_offset <= 1 else top_01_offset
327+
top_01_offset = 1 if top_01_offset <= 1 else top_01_offset
326328
top_1_offset = 1 if top_1_offset <= 1 else top_1_offset
327329
top_10_offset = 1 if top_10_offset <= 1 else top_10_offset
328330
# top_20_offset = 1 if top_20_offset <= 1 else top_20_offset
329331
# ------------------------------------------------------------------------------------------------------ #
330332
# Time calculations
331333
start_time = time.time()
332334
# Calculate a running count window of scores, in order to filter out papers w/ scores lower than that of the top 20%
333-
distinct_scores = scores.select(F.col('score')).repartition(num_partitions, 'score').groupBy('score').count()\
335+
distinct_scores = scores.select(F.col('score')).repartition('score').groupBy('score').count()\
334336
.withColumn('cumulative', F.sum('count').over(Window.orderBy(F.col('score').desc())))
335337
distinct_scores_count = distinct_scores.count()
336338
print ("Calculated distinct scores num (" + str(distinct_scores_count) + "), time: {} seconds ---".format(time.time() - start_time))

CC.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@
5959
# Set the mode by default as local.
6060
# If data is read from hdfs we switch to cluster
6161
mode = 'local'
62-
if input_file.startswith('hdfs://'):
62+
63+
# Detect execution mode from input path.
64+
# Any URI-scheme path (hdfs://, s3a://, gs://, etc.) is treated as distributed,
65+
if '://' in input_file:
6366
mode = 'distributed'
6467
#####################################################################################################
6568
# Create spark session & context - these are entry points to spark
@@ -96,13 +99,13 @@
9699
elif mode == 'distributed':
97100
print("\n\nReading input from hdfs\n\n")
98101
# Use spark session with schema instead of spark context and text file (this should spead up reading the file)
99-
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, "paper")
102+
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition("paper")
100103
#####################################################################################################
101104
# Time initialization
102105
initialisation_time = time.time()
103106
# Print out info messages about the program's parameters
104107
print ("Mode is: " + mode)
105-
print ("Num Partitions: " + str(num_partitions))
108+
#print ("Num Partitions: " + str(num_partitions))
106109
print ("Limit year: " + str(limit_year))
107110
print ("\n\n")
108111
# Initialise SPARK Data
@@ -113,51 +116,51 @@
113116
.select('paper', 'cited_papers', F.expr('size(cited_papers)-2').alias("cited_paper_size"), 'pub_year')\
114117
.select('paper', F.expr("slice(cited_papers, 1, cited_paper_size)").alias('cited_papers'), 'pub_year')\
115118
.select('paper', F.array_join('cited_papers', '|').alias('cited_papers'), 'pub_year')\
116-
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition(num_partitions, 'pub_year').cache()
119+
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition('pub_year').cache()
117120

118121
# Create a dataframe with nodes filtered based on whether they cite others or not. Here we keep those that make citations (i.e., remove dangling nodes)
119122
print ("Planning removal of dangling nodes...")
120123
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0')\
121-
.select('paper', F.explode(F.col('cited_papers')).alias('cited_paper') , F.col('pub_year')).repartition(num_partitions, "paper").cache()
124+
.select('paper', F.explode(F.col('cited_papers')).alias('cited_paper') , F.col('pub_year')).repartition("paper").cache()
122125

123126
# If offset year is given, we need to perform some filtering of citations based on pub year. Proceed by normally calculating the 3-year based CC
124127
if limit_year:
125128
# We now need to filter out those records where citing year - cited year > limit_year
126129
# a. join again with years, based on cited paper year - create a clone of the initial dataframe, because otherwise there will be an error due to similar column names
127130
print ("Gathering years of cited papers...")
128-
cited_paper_years = outlinks.select('paper', F.col('pub_year').alias('cited_paper_year')).withColumnRenamed('paper', 'cited_paper').repartition(num_partitions, 'cited_paper')
131+
cited_paper_years = outlinks.select('paper', F.col('pub_year').alias('cited_paper_year')).withColumnRenamed('paper', 'cited_paper').repartition('cited_paper')
129132
# Since here outlinks_actual is joined on cited paper, we need to repartition it
130-
valid_citations = outlinks_actual.repartition(num_partitions, 'cited_paper').join(cited_paper_years, outlinks_actual.cited_paper == cited_paper_years.cited_paper)\
133+
valid_citations = outlinks_actual.repartition('cited_paper').join(cited_paper_years, outlinks_actual.cited_paper == cited_paper_years.cited_paper)\
131134
.select(outlinks_actual.paper,
132135
cited_paper_years.cited_paper,
133136
outlinks_actual.pub_year.alias('citing_paper_year'),
134137
cited_paper_years.cited_paper_year)\
135-
.repartition(num_partitions, 'paper')
138+
.repartition('paper')
136139

137140
# b. Filter out those where citing paper year > cited paper year + 3
138141
print ("Filtering out citations based on pub year difference...")
139-
valid_citations = valid_citations.filter(valid_citations['citing_paper_year']-valid_citations['cited_paper_year'] <= limit_year).repartition(num_partitions, 'paper').cache()
142+
valid_citations = valid_citations.filter(valid_citations['citing_paper_year']-valid_citations['cited_paper_year'] <= limit_year).repartition('paper').cache()
140143
# Do nothing if no limit year was specified. For uniformity reasons we set the valid citations variable to point to outlinks_actual
141144
else:
142145
valid_citations = outlinks_actual
143146

144147
# Group by cited_paper and get counts
145148
print("Preparing count of citations...")
146-
valid_citations = valid_citations.repartition(num_partitions, 'cited_paper').groupBy('cited_paper').count().repartition(num_partitions, 'cited_paper')
149+
valid_citations = valid_citations.repartition('cited_paper').groupBy('cited_paper').count().repartition('cited_paper')
147150

148151
# Add papers which aren't cited
149152
print("Planning addition of dangling nodes...")
150153
# Join with papers that aren't cited
151154
valid_citations = valid_citations.join(outlinks.select('paper'), outlinks.paper == valid_citations.cited_paper, 'right_outer')\
152155
.select('paper', 'count')\
153-
.fillna(0).repartition(num_partitions, 'paper').cache()
156+
.fillna(0).repartition('paper').cache()
154157

155158
print ("\n# ------------------------------------ #\n")
156159
print("Finished planning calculations. Proceeding to calculation of scores and classes...\n")
157160

158161
# Time it
159162
start_time = time.time()
160-
max_score = valid_citations.select('count').repartition(num_partitions).distinct().agg({'count': 'max'}).collect()[0]['max(count)']
163+
max_score = valid_citations.agg(F.max('count')).collect()[0]['max(count)']
161164
print ("Got max score:" + str(max_score) + " - Took {} seconds".format(time.time() - start_time) + " to get here from initial file read (this is the first transformation)")
162165

163166
# Time it
@@ -174,7 +177,7 @@
174177
# ------------------------------------------------------------------------------------------------------ #
175178
# This code is included for small testing datasets. The percentages required may be < 1 for small datasets
176179
top_001_offset = 1 if top_001_offset <= 1 else top_001_offset
177-
top_01_offset = 1 if top_001_offset <= 1 else top_01_offset
180+
top_01_offset = 1 if top_01_offset <= 1 else top_01_offset
178181
top_1_offset = 1 if top_1_offset <= 1 else top_1_offset
179182
top_10_offset = 1 if top_10_offset <= 1 else top_10_offset
180183
# top_20_offset = 1 if top_20_offset <= 1 else top_20_offset
@@ -183,7 +186,7 @@
183186
# Time it
184187
start_time = time.time()
185188
# Calculate a running count window of scores, in order to filter out papers w/ scores lower than that of the top 20%
186-
distinct_scores = valid_citations.select(F.col('count').alias('cc')).repartition(num_partitions, 'cc').groupBy('cc').count()\
189+
distinct_scores = valid_citations.select(F.col('count').alias('cc')).repartition('cc').groupBy('cc').count()\
187190
.withColumn('cumulative', F.sum('count').over(Window.orderBy(F.col('cc').desc())))
188191
distinct_scores_count = distinct_scores.count()
189192
print ("Calculated distinct scores num (" + str(distinct_scores_count) + "), time: {} seconds ---".format(time.time() - start_time))
@@ -235,7 +238,7 @@
235238
.withColumn('normalized_' + column_name, F.lit(F.col(column_name)/float(max_score)))\
236239
.withColumn('three_point_class', F.lit('C'))
237240
valid_citations = valid_citations.withColumn('three_point_class', F.when(F.col(column_name) >= top_1_score, F.lit('B')).otherwise(F.col('three_point_class')) )
238-
valid_citations = valid_citations.withColumn('three_point_class', F.when(F.col(column_name) >= top_001_score, F.lit('A')).otherwise(F.col('three_point_class')) )
241+
valid_citations = valid_citations.withColumn('three_point_class', F.when(F.col(column_name) >= top_001_score, F.lit('A')).otherwise(F.col('three_point_class')) )
239242
valid_citations = valid_citations.select(F.regexp_replace('paper', 'comma_char', ',').alias('doi'), column_name, 'normalized_' + column_name, 'three_point_class')
240243

241244
# Add six point class to score dataframe

PageRank.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
# Set the mode by default as local.
5858
# If data is read from hdfs we switch to cluster
5959
mode = 'local'
60-
if input_file.startswith('hdfs://'):
60+
# Detect execution mode from input path.
61+
# Any URI-scheme path (hdfs://, s3a://, gs://, etc.) is treated as distributed,
62+
if '://' in input_file:
6163
mode = 'distributed'
6264

6365
# Set the mode by default as dfs.
@@ -116,7 +118,7 @@
116118
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).cache()
117119
elif mode == 'distributed':
118120
print ("Reading input from HDFS...")
119-
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'paper').cache()
121+
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition('paper').cache()
120122

121123

122124
# Get number of nodes (one node-record per line)
@@ -161,17 +163,17 @@
161163
.select('paper', 'cited_papers', F.expr('size(cited_papers)-2').alias('cited_paper_size'), 'pub_year')\
162164
.select('paper', F.expr('slice(cited_papers, 1, cited_paper_size)').alias('cited_papers'), 'pub_year')\
163165
.select('paper', F.array_join('cited_papers', '|').alias('cited_papers'), 'pub_year')\
164-
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition(num_partitions, 'paper').cache()
166+
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition('paper').cache()
165167

166168
# Create a DataFrame with nodes filtered based on whether they cite others or not
167-
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition(num_partitions, 'paper').cache()
169+
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition('paper').cache()
168170

169171
# Continue intialisation message
170172
print(".", end = '')
171173
sys.stdout.flush()
172174

173175
# Collect the dangling nodes from the data - cache it since it will be reused
174-
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition(num_partitions, 'paper').cache()
176+
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition('paper').cache()
175177

176178
# Continue intialisation message
177179
print(".", end = '')
@@ -228,7 +230,7 @@
228230
.agg(F.sum('transferred_score').alias('transferred_score_sum'))\
229231
.select('paper', (alpha * (F.col('transferred_score_sum')+dangling_sum) + (1-alpha)*random_jump_prob).alias('score'))\
230232
.join(previous_scores, 'paper', 'right_outer')\
231-
.repartition(num_partitions, 'paper')\
233+
.repartition('paper')\
232234
.fillna(uncited_node_score, ['score'])\
233235
.withColumn('score_diff', F.abs( F.col('score') - F.col('previous_score') ) )
234236
# We should keep the newly calculated scores in memory for further use.
@@ -257,7 +259,7 @@
257259

258260
# -------------------------------- #
259261
# 3. Calculate max error
260-
error = scores.select('score_diff').distinct().agg({'score_diff': 'max'}).collect()[0][0]
262+
error = scores.agg(F.max('score_diff')).collect()[0][0]
261263

262264
# -------------------------------- #
263265
# 4. Do required re-initialisations and update variables
@@ -275,7 +277,7 @@
275277
print ("\n# ------------------------------------ #\n")
276278
print("Finished score calculations. Preparing classes and normalized scores!")
277279

278-
scores = scores.repartition(num_partitions, 'paper').cache()
280+
scores = scores.repartition('paper').cache()
279281
max_score = scores.agg({'score': 'max'}).collect()[0]['max(score)']
280282

281283
# Define the top ranges in number of papers
@@ -288,15 +290,15 @@
288290
# ------------------------------------------------------------------------------------------------------ #
289291
# This code is included for small testing datasets. The percentages required may be < 1 for small datasets
290292
top_001_offset = 1 if top_001_offset <= 1 else top_001_offset
291-
top_01_offset = 1 if top_001_offset <= 1 else top_01_offset
293+
top_01_offset = 1 if top_01_offset <= 1 else top_01_offset
292294
top_1_offset = 1 if top_1_offset <= 1 else top_1_offset
293295
top_10_offset = 1 if top_10_offset <= 1 else top_10_offset
294296
# top_20_offset = 1 if top_20_offset <= 1 else top_20_offset
295297
# ------------------------------------------------------------------------------------------------------ #
296298
# Time calculations
297299
start_time = time.time()
298300
# Calculate a running count window of scores, in order to filter out papers w/ scores lower than that of the top 20%
299-
distinct_scores = scores.select(F.col('score')).repartition(num_partitions, 'score').groupBy('score').count()\
301+
distinct_scores = scores.select(F.col('score')).repartition('score').groupBy('score').count()\
300302
.withColumn('cumulative', F.sum('count').over(Window.orderBy(F.col('score').desc())))
301303
distinct_scores_count = distinct_scores.count()
302304
print ("Calculated distinct scores num (" + str(distinct_scores_count) + "), time: {} seconds ---".format(time.time() - start_time))

0 commit comments

Comments
 (0)