-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecord_processor.py
More file actions
executable file
·55 lines (47 loc) · 1.89 KB
/
record_processor.py
File metadata and controls
executable file
·55 lines (47 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import base64
import csv
import io
import json
from nypl_py_utils.classes.avro_client import AvroDecoder
from nypl_py_utils.functions.log_helper import create_log
class RecordProcessor:
def __init__(self, schema_url):
self.logger = create_log("record_processor")
self.avro_decoder = AvroDecoder(schema_url)
self.schema = self.avro_decoder.get_json_schema(schema_url)
def process_record(self, record, output_format):
"""
Maps records to decoded Avro. Method returns data in
desired output format for Firehose (JSON or CSV string)
"""
binary_record_data = base64.b64decode(record["data"])
decoded_record_data = self.avro_decoder.decode_record(binary_record_data)
result_string = self._format_result_string(output_format, decoded_record_data)
return {
"recordId": record["recordId"],
"result": "Ok",
"data": result_string, # needed for JSON conversion
}
def _format_result_string(self, output_format, data):
if output_format == "csv" and isinstance(data, dict):
data = self._transform_dictionary_to_csv_string(data)
else:
data = json.dumps(data)
# We need to take original string ("data" ) and convert to hex,
# which requires using bytes as an intermediate type
to_bytes = data.encode("utf-8")
return (base64.b64encode(to_bytes)).decode("utf-8")
def _transform_dictionary_to_csv_string(self, data):
# replace vertical bar within data
output = io.StringIO()
writer = csv.DictWriter(
f=output,
fieldnames=data.keys(),
delimiter="|",
quoting=csv.QUOTE_NONE,
escapechar="\\",
)
writer.writerow(data)
# remove carriage returns
csv_string = output.getvalue()
return csv_string.replace("\r", "")