Skip to content

Commit 4b3e261

Browse files
Merge pull request #1 from athenarc/topic-classes-and-fwci
Add topic-based impact classes & FWCI and 3-year FWCI calculations
2 parents 6e5948b + a1f81ab commit 4b3e261

File tree

2 files changed

+312
-0
lines changed

2 files changed

+312
-0
lines changed

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,42 @@ Run the script as:
128128

129129
> spark2-submit --executor-memory 7G --executor-cores 4 --driver-memory 7G AttRank.py hdfs:///user/<user_name>/<path_to_graph_file_on_hdfs> <alpha> <beta> <gamma> <exponential_rho> <current_year> <start_year_for_attention> <convergence_error> <ceckpoint_directory> 7680 <checkpoint_mode: dfs or local>
130130
131+
## TopicClassesAndFWCI.py
132+
133+
This script calculates topic-based impact classes and Field-Weighted Citation Impact (FWCI) metrics for scientific publications. Unlike the citation graph-based ranking scripts above, this script processes pre-computed ranking scores alongside publication-to-concept mappings to produce field-normalized metrics.
134+
135+
The script performs the following computations:
136+
1. **Field-Weighted Citation Impact (FWCI)**: Normalizes citation counts by computing the ratio of actual citations to expected citations within the same field (concept), publication type, and year. Both total citations (FWCI) and 3-year citations (3y-FWCI) are calculated.
137+
2. **Topic-based Impact Classes**: For each research concept/topic, calculates percentile-based impact classes (C1-C5) separately for each metric (PageRank, AttRank, Citation Count, 3-year Citation Count). This provides field-normalized rankings that account for citation practices in different research areas.
138+
139+
### Input Files
140+
141+
The script requires two tab-separated input files:
142+
143+
1. **Scores file** (with header): `openaire_id`, `pid`, `type`, `year`, `pagerank`, `attrank`, `cc`, `3y-cc`
144+
2. **Concepts file** (no header): DOI/PID, concept identifier, confidence score
145+
146+
### Output Files
147+
148+
The script produces output files in the specified output directory and prints threshold values to standard output:
149+
150+
1. **Topic-based classes**: `identifier`, `concept`, `pagerank_class`, `attrank_class`, `cc_class`, `3y-cc_class` (C1-C5 for each metric)
151+
2. **FWCI metrics**: `FWCI.txt.gz`, `3-year_FWCI.txt.gz`, `FWCI_openaire_ids.txt.gz`, `3-year_FWCI_openaire_ids.txt.gz`
152+
153+
Run the script as:
154+
> spark2-submit --executor-memory 7G --executor-cores 4 --driver-memory 7G TopicClassesAndFWCI.py --scores-file <path_to_scores_file> --concepts-file <path_to_concepts_file> --openaire-concepts-output <path_to_openaire_concepts_output> --output-dir <path_to_output_directory>
155+
156+
### Impact Classes
157+
158+
For each research concept and metric, papers are assigned to one of five classes:
159+
- **C1**: Top 0.01% (most impactful)
160+
- **C2**: Top 0.1%
161+
- **C3**: Top 1%
162+
- **C4**: Top 10%
163+
- **C5**: Rest 90%
164+
165+
These classes are calculated separately per concept, allowing for fair comparison of papers within the same research field.
166+
131167
## Moving scripts to other clusters
132168

133169
To run these scripts on another cluster, some lines of code may need to be changed and additional spark-specific parameters may need to be passed in the command line.

TopicClassesAndFWCI.py

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
"""
2+
BIP Topics - Impact Class and FWCI Calculation
3+
4+
This script:
5+
1. Loads publication scores (pagerank, attrank, citations) and concept mappings
6+
2. Computes Field-Weighted Citation Impact (FWCI) metrics
7+
3. Calculates topic-based impact class thresholds per concept
8+
4. Assigns 5-point impact classes (C1-C5) for each metric
9+
5. Writes output files: topic-based classes and FWCI metrics
10+
"""
11+
12+
from pyspark.sql import SparkSession
13+
from pyspark.sql.types import StringType, DoubleType
14+
from pyspark.sql.window import Window
15+
import pyspark.sql.functions as F
16+
import logging
17+
import argparse
18+
19+
# ============================================================================
20+
# SETUP
21+
# ============================================================================
22+
23+
# Setup logging
24+
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
25+
logger = logging.getLogger(__name__)
26+
27+
# Parse command line arguments
28+
parser = argparse.ArgumentParser(description='Calculate BIP topic-based impact classes and FWCI metrics')
29+
parser.add_argument('--scores-file',
30+
default='/tmp/schatz/bip_metadata/output/doi_to_scores.csv',
31+
help='Input file with publication scores (openaire_id, pid, type, year, pagerank, attrank, cc, 3y-cc)')
32+
parser.add_argument('--concepts-file',
33+
default='/tmp/schatz/bip_metadata/doi_to_concept_id_score.csv',
34+
help='Input file with DOI to concept mappings')
35+
parser.add_argument('--openaire-concepts-output',
36+
default='/tmp/schatz/bip_metadata/openaire_id_to_concept_id_score.csv',
37+
help='Output file for OpenAIRE ID to concept mappings')
38+
parser.add_argument('--output-dir',
39+
default='/tmp/schatz/bip_metadata/output/',
40+
help='Output directory for all generated files')
41+
42+
args = parser.parse_args()
43+
44+
# File paths from arguments
45+
scores_file = args.scores_file
46+
concepts_file = args.concepts_file
47+
openaire_concepts_output_file = args.openaire_concepts_output
48+
output_dir = args.output_dir if args.output_dir.endswith('/') else args.output_dir + '/'
49+
50+
# Initialize Spark session
51+
spark = SparkSession.builder.appName('BIP-topics').getOrCreate()
52+
log4j = spark._jvm.org.apache.log4j
53+
log4j.LogManager.getRootLogger().setLevel(log4j.Level.WARN)
54+
55+
# ============================================================================
56+
# DATA LOADING AND PREPARATION
57+
# ============================================================================
58+
59+
# Read scores (openaire_id, pid, type, year, pagerank, attrank, cc, 3y-cc)
60+
scores_raw = spark.read.options(header='True', inferSchema='True', delimiter='\t').csv(scores_file)
61+
scores_raw = scores_raw.select(
62+
F.col('openaire_id').cast(StringType()),
63+
F.col('pid').cast(StringType()),
64+
F.col('type').cast(StringType()),
65+
F.col('year').cast(StringType()),
66+
F.col('pagerank').cast(DoubleType()),
67+
F.col('attrank').cast(DoubleType()),
68+
F.col('cc').cast(DoubleType()),
69+
F.col('3y-cc').cast(DoubleType())
70+
)
71+
72+
# Mapping between openaire_id and DOI (pid)
73+
openaire_to_pid = scores_raw.select('openaire_id', 'pid').cache()
74+
75+
# Aggregate metrics to openaire_id level (assumption: all records for a given openaire_id have the same metrics)
76+
scores = scores_raw.groupBy('openaire_id').agg(
77+
F.max('pagerank').alias('pagerank'),
78+
F.max('attrank').alias('attrank'),
79+
F.max('cc').alias('cc'),
80+
F.max('3y-cc').alias('3y-cc'),
81+
F.first('type').alias('type'),
82+
F.first('year').alias('year')
83+
)
84+
85+
# Read DOI -> concept mapping and map to openaire_id, keeping max confidence per concept
86+
concepts_doi = spark.read.options(header='False', delimiter='\t').csv(concepts_file)
87+
concepts_doi = concepts_doi.toDF('doi', 'concept', 'confidence')
88+
concepts_doi = concepts_doi.withColumn('confidence', F.col('confidence').cast(DoubleType()))
89+
90+
# Join concepts on DOI (pid) to get openaire_id
91+
concepts_with_open = concepts_doi.join(openaire_to_pid, concepts_doi.doi == openaire_to_pid.pid, 'inner')
92+
93+
# Keep unique concepts with max confidence per openaire_id (input file already filtered to >= 0.3)
94+
concepts = concepts_with_open.groupBy('openaire_id', 'concept').agg(F.max('confidence').alias('confidence'))
95+
96+
# Keep up to top 3 concepts per openaire_id by confidence
97+
top3_window = Window.partitionBy('openaire_id').orderBy(F.col('confidence').desc())
98+
concepts = concepts.withColumn('rn', F.row_number().over(top3_window)).filter(F.col('rn') <= 3).drop('rn')
99+
100+
# Persist an output file for openaire_id -> concept (max confidence)
101+
concepts.select('openaire_id', 'concept', 'confidence')\
102+
.orderBy('openaire_id', 'confidence')\
103+
.write.mode('overwrite').options(header='False', delimiter='\t').csv(openaire_concepts_output_file)
104+
105+
# Build working dataframe at openaire_id level and keep same downstream column name 'id'
106+
d = concepts.join(scores, 'openaire_id').repartition(64, "openaire_id").select(
107+
F.col('openaire_id').alias('id'), 'concept', 'pagerank', 'attrank', 'cc', '3y-cc', 'type', 'year'
108+
).cache()
109+
110+
# ============================================================================
111+
# COMPUTE FIELD-WEIGHTED CITATION IMPACT (FWCI)
112+
# ============================================================================
113+
114+
# Compute Field-Weighted Citation Impact (FWCI) for total citations
115+
expected_citations = d.filter(F.col('cc').isNotNull()) \
116+
.groupBy('concept', 'type', 'year') \
117+
.agg(F.avg('cc').alias('expected_citations')) \
118+
.filter(F.col('expected_citations') > 0) # Avoid division by zero
119+
120+
# Compute Field-Weighted Citation Impact for 3-year citations (3y-FWCI)
121+
expected_citations_3y = d.filter(F.col('3y-cc').isNotNull()) \
122+
.groupBy('concept', 'type', 'year') \
123+
.agg(F.avg('3y-cc').alias('expected_citations_3y')) \
124+
.filter(F.col('expected_citations_3y') > 0) # Avoid division by zero
125+
126+
# Join with original data and compute both FWCI metrics
127+
d_with_fwci = d.join(expected_citations, ['concept', 'type', 'year'], 'left') \
128+
.join(expected_citations_3y, ['concept', 'type', 'year'], 'left') \
129+
.withColumn('fwci',
130+
F.when(F.col('expected_citations').isNull() | (F.col('expected_citations') == 0),
131+
F.lit(None))
132+
.otherwise(F.col('cc') / F.col('expected_citations'))) \
133+
.withColumn('3y-fwci',
134+
F.when(F.col('expected_citations_3y').isNull() | (F.col('expected_citations_3y') == 0),
135+
F.lit(None))
136+
.otherwise(F.col('3y-cc') / F.col('expected_citations_3y')))
137+
138+
# Replace d with the enhanced version
139+
d.unpersist()
140+
d = d_with_fwci.cache()
141+
142+
# ============================================================================
143+
# CALCULATE IMPACT CLASS THRESHOLDS PER CONCEPT
144+
# ============================================================================
145+
146+
print("concept_id\tpagerank_top001\tpagerank_top01\tpagerank_top1\tpagerank_top10\tattrank_top001\tattrank_top01\tattrank_top1\tattrank_top10\t3-cc_top001\t3-cc_top01\t3-cc_top1\t3-cc_top10\tcc_top001\tcc_top01\tcc_top1\tcc_top10")
147+
148+
# Define metrics to calculate thresholds and classes for
149+
metrics = ["pagerank", "attrank", "3y-cc", "cc"]
150+
151+
# Get the count of papers per concept
152+
concept_counts = d.groupBy('concept').agg(F.count('*').alias('num_nodes'))
153+
154+
# Calculate offset positions (minimum 1)
155+
concept_counts = concept_counts.withColumn('top_001_offset',
156+
F.when(F.floor(F.col('num_nodes') * 0.0001) == 0, 1).otherwise(F.floor(F.col('num_nodes') * 0.0001)))
157+
concept_counts = concept_counts.withColumn('top_01_offset',
158+
F.when(F.floor(F.col('num_nodes') * 0.001) == 0, 1).otherwise(F.floor(F.col('num_nodes') * 0.001)))
159+
concept_counts = concept_counts.withColumn('top_1_offset',
160+
F.when(F.floor(F.col('num_nodes') * 0.01) == 0, 1).otherwise(F.floor(F.col('num_nodes') * 0.01)))
161+
concept_counts = concept_counts.withColumn('top_10_offset',
162+
F.when(F.floor(F.col('num_nodes') * 0.1) == 0, 1).otherwise(F.floor(F.col('num_nodes') * 0.1)))
163+
164+
# Join offsets to main dataframe
165+
d = d.join(concept_counts, 'concept', 'left')
166+
167+
# For each metric, find thresholds using distinct scores with cumulative counts (like old approach)
168+
thresholds_list = []
169+
for metric in metrics:
170+
# Get distinct scores with counts per concept (like old approach)
171+
distinct_scores = d.groupBy('concept', metric).agg(F.count('*').alias('count'))
172+
173+
# Add cumulative count per concept (ordered by metric descending)
174+
window_spec = Window.partitionBy('concept').orderBy(F.col(metric).desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
175+
distinct_scores = distinct_scores.withColumn('cumulative', F.sum('count').over(window_spec))
176+
177+
# Join with offsets
178+
distinct_scores = distinct_scores.join(concept_counts.select('concept', 'top_001_offset', 'top_01_offset', 'top_1_offset', 'top_10_offset'), 'concept', 'left')
179+
180+
# Find minimum score where cumulative <= offset for each threshold
181+
thresholds = distinct_scores.groupBy('concept').agg(
182+
F.min(F.when(F.col('cumulative') <= F.col('top_001_offset'), F.col(metric))).alias('{}_top001'.format(metric)),
183+
F.min(F.when(F.col('cumulative') <= F.col('top_01_offset'), F.col(metric))).alias('{}_top01'.format(metric)),
184+
F.min(F.when(F.col('cumulative') <= F.col('top_1_offset'), F.col(metric))).alias('{}_top1'.format(metric)),
185+
F.min(F.when(F.col('cumulative') <= F.col('top_10_offset'), F.col(metric))).alias('{}_top10'.format(metric))
186+
)
187+
188+
thresholds_list.append(thresholds)
189+
190+
# Merge all threshold dataframes
191+
thresholds_df = thresholds_list[0]
192+
for thresholds in thresholds_list[1:]:
193+
thresholds_df = thresholds_df.join(thresholds, 'concept', 'outer')
194+
195+
# Join all thresholds back to main dataframe
196+
d = d.join(thresholds_df, 'concept', 'left')
197+
198+
# ============================================================================
199+
# ASSIGN IMPACT CLASSES BASED ON THRESHOLDS
200+
# ============================================================================
201+
202+
# Assign classes for all metrics
203+
for metric in metrics:
204+
d = d.withColumn('{}_five_point_class'.format(metric), F.lit('C5'))
205+
d = d.withColumn('{}_five_point_class'.format(metric),
206+
F.when(F.col(metric) >= F.col("{}_top10".format(metric)), F.lit('C4')).otherwise(F.col('{}_five_point_class'.format(metric))))
207+
d = d.withColumn('{}_five_point_class'.format(metric),
208+
F.when(F.col(metric) >= F.col("{}_top1".format(metric)), F.lit('C3')).otherwise(F.col('{}_five_point_class'.format(metric))))
209+
d = d.withColumn('{}_five_point_class'.format(metric),
210+
F.when(F.col(metric) >= F.col("{}_top01".format(metric)), F.lit('C2')).otherwise(F.col('{}_five_point_class'.format(metric))))
211+
d = d.withColumn('{}_five_point_class'.format(metric),
212+
F.when(F.col(metric) >= F.col("{}_top001".format(metric)), F.lit('C1')).otherwise(F.col('{}_five_point_class'.format(metric))))
213+
214+
# Print limits for all concepts
215+
limits_df = d.select('concept',
216+
'pagerank_top001', 'pagerank_top01', 'pagerank_top1', 'pagerank_top10',
217+
'attrank_top001', 'attrank_top01', 'attrank_top1', 'attrank_top10',
218+
'3y-cc_top001', '3y-cc_top01', '3y-cc_top1', '3y-cc_top10',
219+
'cc_top001', 'cc_top01', 'cc_top1', 'cc_top10').distinct().orderBy('concept')
220+
221+
for row in limits_df.collect():
222+
print('\t'.join(map(str, row)))
223+
224+
# ============================================================================
225+
# WRITE OUTPUT FILES
226+
# ============================================================================
227+
228+
# Write FWCI for OpenAIRE IDs
229+
logger.info("Writing FWCI for OpenAIRE IDs")
230+
d.select(
231+
F.col("id").alias("openaire_id"),
232+
F.col("concept").alias("concept"),
233+
F.col("fwci").alias("fwci")
234+
).write.options(header='False', delimiter='\t', compression='gzip', nullValue='').mode('overwrite').csv(output_dir + "/bip-db/" + "FWCI_openaire_ids.txt.gz")
235+
236+
# Write 3y-FWCI for OpenAIRE IDs
237+
logger.info("Writing 3y-FWCI for OpenAIRE IDs")
238+
d.select(
239+
F.col("id").alias("openaire_id"),
240+
F.col("concept").alias("concept"),
241+
F.col("3y-fwci").alias("3y-fwci")
242+
).write.options(header='False', delimiter='\t', compression='gzip', nullValue='').mode('overwrite').csv(output_dir + "/bip-db/" + "3-year_FWCI_openaire_ids.txt.gz")
243+
244+
# Prepare data with PIDs for DOI-based outputs (inner join to keep only papers with PIDs)
245+
d_with_pid = d.join(openaire_to_pid, d.id == openaire_to_pid.openaire_id, 'inner') \
246+
.drop('id') \
247+
.withColumnRenamed('pid', 'id')
248+
249+
logger.info("Total rows with valid PIDs: %d", d_with_pid.count())
250+
251+
# Write topic-based classes output
252+
logger.info("Writing topic-based classes output")
253+
d_with_pid.select(
254+
F.col("id").alias("identifier"),
255+
F.col("concept").alias("concept"),
256+
F.col("pagerank_five_point_class").alias("pagerank_class"),
257+
F.col("attrank_five_point_class").alias("attrank_class"),
258+
F.col("3y-cc_five_point_class").alias("3y-cc_class"),
259+
F.col("cc_five_point_class").alias("cc_class")
260+
).write.options(header='True', delimiter='\t').mode('overwrite').csv(output_dir + "/topics/")
261+
262+
# Write FWCI for PIDs (DOIs)
263+
logger.info("Writing FWCI for PIDs")
264+
d_with_pid.select(
265+
F.col("id").alias("identifier"),
266+
F.col("concept").alias("concept"),
267+
F.col("fwci").alias("fwci")
268+
).write.options(header='False', delimiter='\t', compression='gzip', nullValue='').mode('overwrite').csv(output_dir + "/bip-db/" + "FWCI.txt.gz")
269+
270+
# Write 3y-FWCI for PIDs (DOIs)
271+
logger.info("Writing 3y-FWCI for PIDs")
272+
d_with_pid.select(
273+
F.col("id").alias("identifier"),
274+
F.col("concept").alias("concept"),
275+
F.col("3y-fwci").alias("3y-fwci")
276+
).write.options(header='False', delimiter='\t', compression='gzip', nullValue='').mode('overwrite').csv(output_dir + "/bip-db/" + "3-year_FWCI.txt.gz")

0 commit comments

Comments
 (0)