-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata-stream.py
More file actions
103 lines (87 loc) · 3.57 KB
/
data-stream.py
File metadata and controls
103 lines (87 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Importing modules
import argparse
import atexit
import json
import logging
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pysparl.streaming.kafka import KafkaUtils
# Configuring logger
logger_format = '%(asctime)s - %(message)s'
logging.basicConfig(format = logger_format)
logger = logging.getLogger('data-stream')
logger.setLevel(logging.DEBUG)
# Function to set up shutdown hook called before shutdown
def shutdown_hook(producer):
try:
producer.flush(10)
except KafkaError as kafka_error:
logger.warn('Failed to flush pending messages to kafka, caused by: %s' % kafka_error.message)
finally:
try:
producer.close(10)
except Exception as e:
logger.warn('Failed to close kafka connection, caused by %s' % e.message)
# Function to process data and send the processed data to target kafka topic
def process_stream(stream, kafka_producer, target_topic):
"""
helper method to send processed data to target kafka topic through stream
:param stream: the stream through which send the data
:kafka_producer: instance of kafka producer
:target_topic: name of the kafka topic to push to
"""
def pair(data):
record = json.loads(data.encode('utf-8'))
return record.get('Symbol'), (float(record.get('LastTradePrice')), 1) #(symbol, (price, count))
def send_to_kafka(rdd):
results = rdd.collect()
for r in results:
data = json.dumps({
'Symbol': r[0],
'Timestamp': time.time(),
'Average': r[1]
})
try:
logger.info('Sending average price %s to kafka' % data)
kafka_producer.send(target_topic, value = data)
except KafkaError as e:
logger.warn('Failed to send average price to kafka, caused by: %s' % e.message)
stream.map(pair)
.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
.map(lambda (k, v): (k, v[0] / v[1]))
.foreachRDD(send_to_kafka)
# 'main method'
if __name__ == '__main__':
# Setting up comman line arguments
parser = argparse.ArgumentParser()
parser.add_argument('source_topic', help = 'the kafka topic to subsribe from')
parser.add_argument('target_topic', help = 'the kafka topic to send message to')
parser.add_argument('kafka_broker', help = 'the kafka broker')
parser.add_argument('batch_duration', help = 'the batch duration in secs')
# Parsing user input arguments
args = parser.parse_args()
source_topic = args.source_topic
target_topic = args.target_topic
kafka_broker = args.kafka_broker
batch_duration = int(args.batch_duration)
# Creating SparkContext and SparkStreamingContext
sc = SparkContext('local[2]', 'AveragePrice')
sc.setLogLevel('INFO')
ssc = StreamingContext(sc, batch_duration)
# Instantiating a Kafka stream for processing
directKafkaStream = KafkaUtils.createDirectStream(ssc, [source_topic],
{"metadata.broker.list": kafka_broker}
)
# Extracting value (kay, value)
stream = directKafkaStream.map(lambda x: x[1])
# Instantiating a simple Kafka producer
kafka_producer = KafkaProducer(bootstrap_servers = kafka_broker)
# Invoking function to send processed data to target kafka topic
process_stream(stream, kafka_producer, target_topic)
# Setting up shutdown hook
atexit.register(shutdown_hook, kafka_producer)
ssc.start()
ssc.awaitTermination()