Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "real-time-wiki-covid-tracker"]
path = real-time-wiki-covid-tracker
url = https://github.com/digitalTranshumant/real-time-wiki-covid-tracker
[submodule "cdsc_reddit"]
path = reddit/cdsc_reddit
url = code:cdsc_reddit
8 changes: 8 additions & 0 deletions reddit/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SHELL:=/bin/bash
comment_search_tasks.sh:gen_search_tasks.py
python3 gen_search_tasks.py /gscratch/comdata/output/reddit_comments_by_subreddit.parquet comment_search_tasks.sh

submit_backfill_jobs:comment_search_tasks.sh
./run_comment_search.sh

PHONY:submit_backfill_jobs
1 change: 1 addition & 0 deletions reddit/cdsc_reddit
Submodule cdsc_reddit added at 4ced65
24 changes: 24 additions & 0 deletions reddit/checkpoint_parallelsql.sbatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash
## parallel_sql_job.sh
#SBATCH --job-name=find_covid_comments
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=12:00:00
## Memory per node
#SBATCH --mem=32G
#SBATCH --cpus-per-task=4
#SBATCH --ntasks=1
#SBATCH -D /gscratch/comdata/users/nathante/COVID-19_Digital_Observatory/reddit
source ./bin/activate
module load parallel_sql
echo $(which perl)
#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#16 tasks running at one time.
parallel-sql --sql -a parallel --exit-on-term --jobs 4
200 changes: 200 additions & 0 deletions reddit/comment_search_tasks.sh

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions reddit/concat_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet("filtered_comments.parquet")
df = df.repartition(1)
df = df.sortWithinPartitions(["subreddit","CreatedAt","id"])
df.write.parquet("covid-19_reddit_comments.parquet",mode='overwrite')
df.write.json("covid-19_reddit_comments.json",mode='overwrite')
8 changes: 8 additions & 0 deletions reddit/export_comments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pandas as pd
import pyarrow

df = pd.read_parquet("/gscratch/comdata/users/nathante/COVID-19_Digital_Observatory/reddit/covid-19_reddit_comments.parquet")

df.to_feather("/gscratch/comdata/users/nathante/COVID-19_Digital_Observatory/reddit/covid-19_reddit_comments_18-11-20.feather")
df.to_json("/gscratch/comdata/users/nathante/COVID-19_Digital_Observatory/reddit/covid-19_reddit_comments_18-11-20.json",orient='records',lines=True)
df.to_csv("/gscratch/comdata/users/nathante/COVID-19_Digital_Observatory/reddit/covid-19_reddit_comments_18-11-20.csv",index=False)
17 changes: 17 additions & 0 deletions reddit/gen_comment_search_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import fire
from pathlib import Path


def gen_tasks("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", outfile):
path = Path(in_parquet)
partitions = path.glob("*.parquet")
partitions = map(lambda p: p.stem, partitions)
base_task = "python3 search_comments.py {0}"

lines = map(base_task.format, partitions)

with open(outfile,'w') as of:
of.writelines(map(lambda l: l + '\n',lines))

if __name__ == "__main__":
fire.Fire(gen_tasks)
16 changes: 16 additions & 0 deletions reddit/gen_search_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import fire
from pathlib import Path

def gen_tasks(in_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", outfile="comment_search_tasks.sh"):
path = Path(in_parquet)
partitions = path.glob("*.parquet")
partitions = map(lambda p: p.parts[-1], partitions)
base_task = "python3 search_comments.py {0}"

lines = map(base_task.format, partitions)

with open(outfile,'w') as of:
of.writelines(map(lambda l: l + '\n',lines))

if __name__ == "__main__":
fire.Fire(gen_tasks)
7 changes: 7 additions & 0 deletions reddit/run_comment_search.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash
module load parallel_sql
source ./bin/activate
psu --del --Y
cat comment_search_tasks.sh | psu --load

for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done;
55 changes: 55 additions & 0 deletions reddit/search_comments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pandas as pd
import fire
import ahocorasick
from datetime import datetime, timedelta
from pathlib import Path

PARQUET_PATH = Path('/gscratch/comdata/output/reddit_comments_by_subreddit.parquet')
OUTPUT_PATH = Path('/gscratch/comdata/users/nathante/COVID-19_Digital_Observatory/reddit/filtered_comments.parquet')
def load_keywords(keywords):
keywords = pd.read_csv(keywords)

keywords = set(keywords.label)
keywords = map(str.lower, keywords)
trie = ahocorasick.Automaton()
list(map(lambda s: trie.add_word(s,s),keywords))
trie.make_automaton()
return(trie)

# use the aho corasick algorithm to do the string matching
def match_comment_kwlist(body,trie,min_length=5):
stems = trie.iter(body.lower())
stems = map(lambda s: s[1], stems)
stems = filter(lambda s: len(s) >= 5, stems)
return list(stems)

def filter_comments(partition, keywords="../keywords/output/csv/2020-10-19_wikidata_item_labels.csv", from_date = datetime(2019,10,1)):

if partition is None:
partition_path = next(PARQUET_PATH.iterdir())
partition = partition_path.stem
else:
partition_path = PARQUET_PATH / partition

trie = load_keywords(keywords)

pq_dataset = ds.dataset(partition_path)

batches = pq_dataset.to_batches(filter=(ds.field("CreatedAt")>=from_date))

for batch in batches:
df = batch.to_pandas()
if df.shape[0] > 0:
matches = df.body.apply(lambda b: match_comment_kwlist(b, trie, min_length=5))
has_match = matches.apply(lambda l: len(l) > 0)
df = df.loc[has_match]
df['keyword_matches'] = matches[has_match]
if df.shape[0] > 0:
df.to_parquet(OUTPUT_PATH / f'{partition}',index=False,engine='pyarrow',flavor='spark')


if __name__ == "__main__":
fire.Fire(filter_comments)
2 changes: 2 additions & 0 deletions reddit/search_dumps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import pyarrow