Skip to content

Commit a83135b

Browse files
repartition without explicit number (inherit from shufflePartitions spark setting)
Optimized max score calculation avoiding distinct
1 parent 2d2c556 commit a83135b

File tree

4 files changed

+42
-42
lines changed

4 files changed

+42
-42
lines changed

AttRank.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file)
117117
elif mode == 'distributed':
118118
print ("Reading input from HDFS...")
119-
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'paper')
119+
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition('paper')
120120

121121
# Get number of nodes (one node-record per line)
122122
num_nodes = float(input_data.count())
@@ -136,7 +136,7 @@
136136
print ("Current Year: ", current_year)
137137
print ("Convergence Error: ", max_error)
138138
print ("Number of nodes: ", num_nodes)
139-
print ("Number of partitions: ", num_partitions)
139+
#print ("Number of partitions: ", num_partitions)
140140
print ("Checkpoint mode: " + str(checkpoint_mode))
141141
print ("Checkpoint dir: " + checkpoint_dir)
142142
print ("# ------------------------------------ #\n")
@@ -165,17 +165,17 @@
165165
.select('paper', 'cited_papers', F.expr('size(cited_papers)-2').alias('cited_paper_size'), 'pub_year')\
166166
.select('paper', F.expr('slice(cited_papers, 1, cited_paper_size)').alias('cited_papers'), 'pub_year')\
167167
.select('paper', F.array_join('cited_papers', '|').alias('cited_papers'), 'pub_year')\
168-
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition(num_partitions, 'paper').cache()
168+
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition('paper').cache()
169169

170170
# Create a DataFrame with nodes filtered based on whether they cite others or not
171-
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition(num_partitions, 'paper').cache()
171+
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition('paper').cache()
172172

173173
# Continue intialisation message
174174
print(".", end = '')
175175
sys.stdout.flush()
176176

177177
# Collect the dangling nodes from the data - cache it since it will be reused
178-
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition(num_partitions, 'paper').cache()
178+
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition('paper').cache()
179179

180180

181181
# Continue intialisation message
@@ -186,12 +186,12 @@
186186
# --> Create a DataFrame with the time-based exponential scores <--
187187
# 1. Get paper-publication year pairs.
188188
paper_years = input_data.select('paper', F.col('pub_year').alias('year')).withColumn('year_fixed', F.when( (F.col('year').cast(IntegerType()) < 1000) | (F.col('year').cast(IntegerType()) > int(current_year)) | (F.col('year') == "\\N"), 0).otherwise(F.col('year')))
189-
paper_years = paper_years.select('paper', F.col('year_fixed').alias('year')).repartition(num_partitions, 'paper').cache()
189+
paper_years = paper_years.select('paper', F.col('year_fixed').alias('year')).repartition('paper').cache()
190190
# 2. Get paper-exponential score-based pairs
191191
paper_exp = paper_years.withColumn('exp_score', F.lit(F.exp(exponential * (current_year+1-paper_years.year) )) ).drop('year')
192192
# 3. Normalize exponential scores so they add to one
193193
exp_score_sum = paper_exp.agg({'exp_score':'sum'}).collect()[0][0]
194-
paper_exp = paper_exp.select('paper', (paper_exp.exp_score/float(exp_score_sum)).alias('exp_score')).repartition(num_partitions, 'paper').cache()
194+
paper_exp = paper_exp.select('paper', (paper_exp.exp_score/float(exp_score_sum)).alias('exp_score')).repartition('paper').cache()
195195

196196
# Continue Initialisation message
197197
print(".", end = '')
@@ -214,13 +214,13 @@
214214
# 2. Get total number of citations made in the specified year range
215215
total_citations_in_range = paper_citations.agg({'citations_in_range':'sum'}).collect()[0][0]
216216
# 3. Calculate preferential attachment probabilities - cache them since they will be reused
217-
paper_attention = paper_citations.select('paper', (F.col('citations_in_range') / total_citations_in_range).alias('attention')).repartition(num_partitions, 'paper').cache()
217+
paper_attention = paper_citations.select('paper', (F.col('citations_in_range') / total_citations_in_range).alias('attention')).repartition('paper').cache()
218218
# Continue Initialisation message
219219
print(".", end = '')
220220
sys.stdout.flush()
221221
###########################################################
222222
# --> Get paper exponential scores and attention scores in a single 'materialized' table <--
223-
vector_scores = paper_attention.join(paper_exp, 'paper', 'right_outer').fillna(0.0, ['attention']).repartition(num_partitions, 'paper').cache()
223+
vector_scores = paper_attention.join(paper_exp, 'paper', 'right_outer').fillna(0.0, ['attention']).repartition('paper').cache()
224224
# vector_scores.count()
225225
# Continue Initialisation message
226226
print(".", end = '')
@@ -266,7 +266,7 @@
266266
.groupBy('paper')\
267267
.agg(F.sum('transferred_score').alias('transferred_score_sum'))\
268268
.join(vector_scores, 'paper', 'right_outer')\
269-
.repartition(num_partitions, 'paper')\
269+
.repartition('paper')\
270270
.fillna(0.0, ['transferred_score_sum'])\
271271
.select('paper', (alpha*(F.col('transferred_score_sum')+dangling_sum) + beta*F.col('attention') + gamma*F.col('exp_score') ).alias('score'))\
272272
.join(previous_scores, 'paper')\
@@ -312,7 +312,7 @@
312312
print ("\n# ------------------------------------ #\n")
313313
print("Finished score calculations. Preparing classes and normalized scores!")
314314

315-
scores = scores.repartition(num_partitions, 'paper').cache()
315+
scores = scores.repartition('paper').cache()
316316
max_score = scores.agg({'score': 'max'}).collect()[0]['max(score)']
317317

318318
# Define the top ranges in number of papers
@@ -332,7 +332,7 @@
332332
# Time calculations
333333
start_time = time.time()
334334
# Calculate a running count window of scores, in order to filter out papers w/ scores lower than that of the top 20%
335-
distinct_scores = scores.select(F.col('score')).repartition(num_partitions, 'score').groupBy('score').count()\
335+
distinct_scores = scores.select(F.col('score')).repartition('score').groupBy('score').count()\
336336
.withColumn('cumulative', F.sum('count').over(Window.orderBy(F.col('score').desc())))
337337
distinct_scores_count = distinct_scores.count()
338338
print ("Calculated distinct scores num (" + str(distinct_scores_count) + "), time: {} seconds ---".format(time.time() - start_time))

CC.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@
9999
elif mode == 'distributed':
100100
print("\n\nReading input from hdfs\n\n")
101101
# Use spark session with schema instead of spark context and text file (this should spead up reading the file)
102-
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, "paper")
102+
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition("paper")
103103
#####################################################################################################
104104
# Time initialization
105105
initialisation_time = time.time()
106106
# Print out info messages about the program's parameters
107107
print ("Mode is: " + mode)
108-
print ("Num Partitions: " + str(num_partitions))
108+
#print ("Num Partitions: " + str(num_partitions))
109109
print ("Limit year: " + str(limit_year))
110110
print ("\n\n")
111111
# Initialise SPARK Data
@@ -116,51 +116,51 @@
116116
.select('paper', 'cited_papers', F.expr('size(cited_papers)-2').alias("cited_paper_size"), 'pub_year')\
117117
.select('paper', F.expr("slice(cited_papers, 1, cited_paper_size)").alias('cited_papers'), 'pub_year')\
118118
.select('paper', F.array_join('cited_papers', '|').alias('cited_papers'), 'pub_year')\
119-
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition(num_partitions, 'pub_year').cache()
119+
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition('pub_year').cache()
120120

121121
# Create a dataframe with nodes filtered based on whether they cite others or not. Here we keep those that make citations (i.e., remove dangling nodes)
122122
print ("Planning removal of dangling nodes...")
123123
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0')\
124-
.select('paper', F.explode(F.col('cited_papers')).alias('cited_paper') , F.col('pub_year')).repartition(num_partitions, "paper").cache()
124+
.select('paper', F.explode(F.col('cited_papers')).alias('cited_paper') , F.col('pub_year')).repartition("paper").cache()
125125

126126
# If offset year is given, we need to perform some filtering of citations based on pub year. Proceed by normally calculating the 3-year based CC
127127
if limit_year:
128128
# We now need to filter out those records where citing year - cited year > limit_year
129129
# a. join again with years, based on cited paper year - create a clone of the initial dataframe, because otherwise there will be an error due to similar column names
130130
print ("Gathering years of cited papers...")
131-
cited_paper_years = outlinks.select('paper', F.col('pub_year').alias('cited_paper_year')).withColumnRenamed('paper', 'cited_paper').repartition(num_partitions, 'cited_paper')
131+
cited_paper_years = outlinks.select('paper', F.col('pub_year').alias('cited_paper_year')).withColumnRenamed('paper', 'cited_paper').repartition('cited_paper')
132132
# Since here outlinks_actual is joined on cited paper, we need to repartition it
133-
valid_citations = outlinks_actual.repartition(num_partitions, 'cited_paper').join(cited_paper_years, outlinks_actual.cited_paper == cited_paper_years.cited_paper)\
133+
valid_citations = outlinks_actual.repartition('cited_paper').join(cited_paper_years, outlinks_actual.cited_paper == cited_paper_years.cited_paper)\
134134
.select(outlinks_actual.paper,
135135
cited_paper_years.cited_paper,
136136
outlinks_actual.pub_year.alias('citing_paper_year'),
137137
cited_paper_years.cited_paper_year)\
138-
.repartition(num_partitions, 'paper')
138+
.repartition('paper')
139139

140140
# b. Filter out those where citing paper year > cited paper year + 3
141141
print ("Filtering out citations based on pub year difference...")
142-
valid_citations = valid_citations.filter(valid_citations['citing_paper_year']-valid_citations['cited_paper_year'] <= limit_year).repartition(num_partitions, 'paper').cache()
142+
valid_citations = valid_citations.filter(valid_citations['citing_paper_year']-valid_citations['cited_paper_year'] <= limit_year).repartition('paper').cache()
143143
# Do nothing if no limit year was specified. For uniformity reasons we set the valid citations variable to point to outlinks_actual
144144
else:
145145
valid_citations = outlinks_actual
146146

147147
# Group by cited_paper and get counts
148148
print("Preparing count of citations...")
149-
valid_citations = valid_citations.repartition(num_partitions, 'cited_paper').groupBy('cited_paper').count().repartition(num_partitions, 'cited_paper')
149+
valid_citations = valid_citations.repartition('cited_paper').groupBy('cited_paper').count().repartition('cited_paper')
150150

151151
# Add papers which aren't cited
152152
print("Planning addition of dangling nodes...")
153153
# Join with papers that aren't cited
154154
valid_citations = valid_citations.join(outlinks.select('paper'), outlinks.paper == valid_citations.cited_paper, 'right_outer')\
155155
.select('paper', 'count')\
156-
.fillna(0).repartition(num_partitions, 'paper').cache()
156+
.fillna(0).repartition('paper').cache()
157157

158158
print ("\n# ------------------------------------ #\n")
159159
print("Finished planning calculations. Proceeding to calculation of scores and classes...\n")
160160

161161
# Time it
162162
start_time = time.time()
163-
max_score = valid_citations.select('count').repartition(num_partitions).distinct().agg({'count': 'max'}).collect()[0]['max(count)']
163+
max_score = valid_citations.agg(F.max('count')).collect()[0]['max(count)']
164164
print ("Got max score:" + str(max_score) + " - Took {} seconds".format(time.time() - start_time) + " to get here from initial file read (this is the first transformation)")
165165

166166
# Time it
@@ -186,7 +186,7 @@
186186
# Time it
187187
start_time = time.time()
188188
# Calculate a running count window of scores, in order to filter out papers w/ scores lower than that of the top 20%
189-
distinct_scores = valid_citations.select(F.col('count').alias('cc')).repartition(num_partitions, 'cc').groupBy('cc').count()\
189+
distinct_scores = valid_citations.select(F.col('count').alias('cc')).repartition('cc').groupBy('cc').count()\
190190
.withColumn('cumulative', F.sum('count').over(Window.orderBy(F.col('cc').desc())))
191191
distinct_scores_count = distinct_scores.count()
192192
print ("Calculated distinct scores num (" + str(distinct_scores_count) + "), time: {} seconds ---".format(time.time() - start_time))
@@ -238,7 +238,7 @@
238238
.withColumn('normalized_' + column_name, F.lit(F.col(column_name)/float(max_score)))\
239239
.withColumn('three_point_class', F.lit('C'))
240240
valid_citations = valid_citations.withColumn('three_point_class', F.when(F.col(column_name) >= top_1_score, F.lit('B')).otherwise(F.col('three_point_class')) )
241-
valid_citations = valid_citations.withColumn('three_point_class', F.when(F.col(column_name) >= top_001_score, F.lit('A')).otherwise(F.col('three_point_class')) )
241+
valid_citations = valid_citations.withColumn('three_point_class', F.when(F.col(column_name) >= top_001_score, F.lit('A')).otherwise(F.col('three_point_class')) )
242242
valid_citations = valid_citations.select(F.regexp_replace('paper', 'comma_char', ',').alias('doi'), column_name, 'normalized_' + column_name, 'three_point_class')
243243

244244
# Add six point class to score dataframe

PageRank.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@
118118
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).cache()
119119
elif mode == 'distributed':
120120
print ("Reading input from HDFS...")
121-
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'paper').cache()
121+
input_data = spark.read.schema(graph_file_schema).option('delimiter', '\t').csv(input_file).repartition('paper').cache()
122122

123123

124124
# Get number of nodes (one node-record per line)
@@ -163,17 +163,17 @@
163163
.select('paper', 'cited_papers', F.expr('size(cited_papers)-2').alias('cited_paper_size'), 'pub_year')\
164164
.select('paper', F.expr('slice(cited_papers, 1, cited_paper_size)').alias('cited_papers'), 'pub_year')\
165165
.select('paper', F.array_join('cited_papers', '|').alias('cited_papers'), 'pub_year')\
166-
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition(num_partitions, 'paper').cache()
166+
.select('paper', F.split('cited_papers', ',').alias('cited_papers'), 'pub_year').repartition('paper').cache()
167167

168168
# Create a DataFrame with nodes filtered based on whether they cite others or not
169-
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition(num_partitions, 'paper').cache()
169+
outlinks_actual = outlinks.filter(outlinks['cited_papers'][0] != '0').repartition('paper').cache()
170170

171171
# Continue intialisation message
172172
print(".", end = '')
173173
sys.stdout.flush()
174174

175175
# Collect the dangling nodes from the data - cache it since it will be reused
176-
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition(num_partitions, 'paper').cache()
176+
dangling_nodes = outlinks.filter(outlinks.cited_papers[0] == '0').select('paper').repartition('paper').cache()
177177

178178
# Continue intialisation message
179179
print(".", end = '')
@@ -230,7 +230,7 @@
230230
.agg(F.sum('transferred_score').alias('transferred_score_sum'))\
231231
.select('paper', (alpha * (F.col('transferred_score_sum')+dangling_sum) + (1-alpha)*random_jump_prob).alias('score'))\
232232
.join(previous_scores, 'paper', 'right_outer')\
233-
.repartition(num_partitions, 'paper')\
233+
.repartition('paper')\
234234
.fillna(uncited_node_score, ['score'])\
235235
.withColumn('score_diff', F.abs( F.col('score') - F.col('previous_score') ) )
236236
# We should keep the newly calculated scores in memory for further use.
@@ -259,7 +259,7 @@
259259

260260
# -------------------------------- #
261261
# 3. Calculate max error
262-
error = scores.select('score_diff').distinct().agg({'score_diff': 'max'}).collect()[0][0]
262+
error = scores.agg(F.max('score_diff')).collect()[0][0]
263263

264264
# -------------------------------- #
265265
# 4. Do required re-initialisations and update variables
@@ -277,7 +277,7 @@
277277
print ("\n# ------------------------------------ #\n")
278278
print("Finished score calculations. Preparing classes and normalized scores!")
279279

280-
scores = scores.repartition(num_partitions, 'paper').cache()
280+
scores = scores.repartition('paper').cache()
281281
max_score = scores.agg({'score': 'max'}).collect()[0]['max(score)']
282282

283283
# Define the top ranges in number of papers
@@ -298,7 +298,7 @@
298298
# Time calculations
299299
start_time = time.time()
300300
# Calculate a running count window of scores, in order to filter out papers w/ scores lower than that of the top 20%
301-
distinct_scores = scores.select(F.col('score')).repartition(num_partitions, 'score').groupBy('score').count()\
301+
distinct_scores = scores.select(F.col('score')).repartition('score').groupBy('score').count()\
302302
.withColumn('cumulative', F.sum('count').over(Window.orderBy(F.col('score').desc())))
303303
distinct_scores_count = distinct_scores.count()
304304
print ("Calculated distinct scores num (" + str(distinct_scores_count) + "), time: {} seconds ---".format(time.time() - start_time))

0 commit comments

Comments
 (0)