1- import sys
1+ import argparse
2+ import json
3+ import math
24import praw
3- import time
45import threading
5- import json
6+ import time
7+
8+ from kafka import KafkaProducer
69
710redditClient = None
811
912class CommentsFetcher (threading .Thread ):
1013 die = False
1114 sr_obj = None
12- tickers = []
13- companies = []
14- def __init__ (self , subreddit , companies , tickers , exit_on_fail = False ):
15+ companies = {}
16+ def __init__ (self , subreddit , companies , exit_on_fail = False , producer = None , topic = None ):
1517 threading .Thread .__init__ (self )
1618 self .name = 'fetch_comments_{0}' .format (subreddit )
1719 self .companies = companies
18- self .tickers = tickers
1920 self .exit_on_fail = exit_on_fail
21+ self .producer = producer
22+ self .topic = topic
2023 lock = threading .RLock ()
2124 with lock :
2225 self .sr_obj = redditClient .subreddit (subreddit )
@@ -36,29 +39,52 @@ def join(self):
3639 super ().join ()
3740
3841 def fetchComments (self ):
39- search_strings = self .companies + self .tickers
4042 for comment in self .sr_obj .stream .comments (skip_existing = True , pause_after = 5 ):
41- for company in companies :
42- if company in comment .body :
43- print (comment .body )
44-
43+ for ticker in self .companies :
44+ if ticker in comment .body or self .companies [ticker ] in comment .body :
45+ comment_obj = { "ticker" : ticker , "text" : comment .body , "timestamp" : math .ceil (time .time_ns ()/ 1000000 ) }
46+ self .output (comment_obj )
47+ break
48+
49+ def output (self , comment ):
50+ if self .producer is None :
51+ print (comment )
52+ else :
53+ if self .topic is None :
54+ raise ValueError ("topic not supplied" )
55+ key = "{0}_{1}" .format (comment ["ticker" ],comment ["timestamp" ])
56+ try :
57+ key_bytes = bytes (key , encoding = 'utf-8' )
58+ value = json .dumps (comment_obj )
59+ value_bytes = bytes (value , encoding = 'utf-8' )
60+ self .producer .send (self .topic , key = key_bytes , value = value_bytes )
61+ except Exception as e :
62+ print ("Error {0} occurred while publishing message with key {1}" .format (e , key ))
4563
4664if __name__ == '__main__' :
65+ parser = argparse .ArgumentParser (description = 'Stream reddit comments to stdout or kafka topic' )
66+ parser .add_argument ('-t' , '--topic' , metavar = '<topic_name>' , help = 'Kafka topic name' )
67+ parser .add_argument ('-H' , '--host' , metavar = '<hostname_port>' , default = 'localhost:9092' , help = 'Hostname:port of bootstrap server' )
68+ args = parser .parse_args ()
4769 creds = json .loads (open ("creds.json" ,"r" ).read ())
4870 redditClient = praw .Reddit (client_id = creds ['client_id' ],
4971 client_secret = creds ['client_secret' ],
5072 password = creds ['password' ],
5173 user_agent = creds ['user_agent' ],
5274 username = creds ['username' ])
5375
76+
5477 subreddits = [sr .strip () for sr in open ("subreddits" ,"r" ).read ().split (',' )]
55- companies = [cmp .strip () for cmp in open ("companies" ,"r" ).read ().split (',' )]
56- tickers = [tick .strip () for tick in open ("tickers" ,"r" ).read ().split (',' )]
78+ companies = json .loads (open ("companies.json" ,"r" ).read ())
79+
80+ producer = None
81+ if args .topic is not None :
82+ producer = KafkaProducer (bootstrap_servers = [args .host ], api_version = (0 , 10 ))
5783
5884 # start fetch thread for every subreddit
5985 fetch_threads = []
6086 for sr in subreddits :
61- th = CommentsFetcher (sr , companies , tickers )
87+ th = CommentsFetcher (sr , companies , producer , args . topic )
6288 th .start ()
6389 fetch_threads .append (th )
6490
0 commit comments