-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpar_baseline.jl
More file actions
137 lines (120 loc) · 3.31 KB
/
par_baseline.jl
File metadata and controls
137 lines (120 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
using ArgParse
global ref_list = Dict{Integer,RemoteRef}()
global settings
function parse_commandline()
s = ArgParseSettings()
@add_arg_table s begin
"--precedent", "-p"
help = "set the 'precedent' file upon which all other ngrams will be based"
nargs = 1
"--verbose", "-v"
help = "show progress updates"
action = :store_true
"--ignore-results"
help = "don't show key/value result"
action = :store_true
"--ngrams", "-n"
help = "the N in ngrams (e.g. '3' to create up to 3-grams)"
nargs = 1
arg_type = Integer
default = Integer[3]
"FILES"
help = "files or directories to include in baseline"
required = true
nargs = '*'
end
return parse_args(s)
end
settings = parse_commandline()
function msg(m...)
if settings["verbose"]
println(m...)
end
end
function maybe_timed(fn::Function, m...)
msg(m...)
if settings["verbose"]
tic()
end
fn()
if settings["verbose"]
toc()
end
end
# global precedent_path = joinpath(dirname(@__FILE__), "test", "bom.txt")
if length(settings["precedent"]) > 0
global precedent_path = first(settings["precedent"])
else
error("precedent document is required (use '-p')")
end
if nprocs() < 2
addprocs(1)
end
@everywhere using TextGrams
@everywhere using MutableStrings
function peach(fn::Function, producer::Task, args...)
np = nprocs()
@sync begin
for p = 1:np
if p != myid() || np == 1
@async begin
for x in producer
wait(remotecall(p, fn, x, args...))
end
end
end
end
end
end
msg("Loading precedent doc $(precedent_path)...")
# Load once
precedent_ngrams, t, m = @timed remotecall_fetch(1, ngramsOfTextFile, precedent_path, None, first(settings["ngrams"]))
m_mb = integer(m/1024/1024)
msg("time: $(t), memory: $(m_mb) MB, keys: $(length(precedent_ngrams))")
maybe_timed("Distribute precedent doc...") do
# Distribute everywhere
for p in 2:nprocs()
msg(" Sending $(m_mb) MB to $(p-1)/$(nworkers()) workers")
rr = RemoteRef(p)
ref_list[p] = rr
put!(rr, precedent_ngrams)
end
end
maybe_timed("Extracting $(first(settings["ngrams"]))-grams (map)...") do
files = settings["FILES"]
peach(@task(fileProducer(files)), ref_list, settings) do file, ref_list, settings
if settings["verbose"]
@printf("%45s processing...\n", basename(file))
end
local precedent_ngrams = fetch(ref_list[myid()])
ngrams = Ngrams(file, first(settings["ngrams"]))
leftJoinAdd!(precedent_ngrams, ngrams)
if settings["verbose"]
@printf("%45s Done: added %s ngrams\n", basename(file), length(ngrams))
end
end
end
final_ngrams = Ngrams()
maybe_timed("Combining ngrams (reduce)...") do
for (k,v) in ref_list
ngrams = fetch(v)
msg("$k -> $(length(ngrams))")
unionAdd!(final_ngrams, ngrams)
end
end
maybe_timed("Removing duplicate counts...") do
nw = nworkers()-1 # minus one here because we want *one* copy of the counts to remain
for (k,v) in precedent_ngrams
fv = final_ngrams[k]
if fv >= v*nw
final_ngrams[k] = fv - v*nw
else
error("should be less")
end
end
end
if !settings["ignore-results"]
for (k,v) in final_ngrams
println(k, "\t", v)
end
end