-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathreducer.py
More file actions
executable file
·65 lines (44 loc) · 1.74 KB
/
reducer.py
File metadata and controls
executable file
·65 lines (44 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Module: reducer.py
Created: 2018-12-07
Description:
reduce a set of incoming (key, value) pairs to only the most recent value
(as determined by the last_modified_date field, the last field in each
incoming record)
Usage:
test with
cat usaspending_test.csv | python mapper.py | sort | python reducer.py
run with
cat my_usaspending_file.csv | python mapper.py | sort | python reducer.py
"""
import os
import sys
from itertools import groupby
# ----------------------------- #
# Module Constants #
# ----------------------------- #
HERE = os.path.dirname(os.path.realpath(__file__))
# ----------------------------- #
# key-value emitter #
# ----------------------------- #
def reduce():
"""our inputs now are key-value pairs where the key is the two unique ids
for a contract and the values are the lines which have those ids. we may get
more than one key per run of the reducer so we have to chunk the input up by
input key
"""
# make an iterator which just splits each incoming row into keys and values
kvp_iter = (row.strip().split('\t') for row in sys.stdin)
# make a groupby iterator for chunking up the kvp_iter item by key
grouped_iter = groupby(kvp_iter, key=lambda kvp: kvp[0])
for (key, keyvallist) in grouped_iter:
# the keyvallist is a list of elements where each element is the current
# key (yes, it's redundant with the key we've already identified in the
# `for` statemenet above) and the line
max_keyval = max(keyvallist, key=lambda keyval: keyval[1].split(',')[-1])
_, maxval = max_keyval
print(maxval)
if __name__ == '__main__':
reduce()