Skip to content

Commit 6f93bbe

Browse files
timholypabloferz
andauthored
Improve inference for paths leading to similar (#37163)
* Infer better eachindex broadcasting * Fix a misuse of show_datatype * Improve inference in vcat(A::BitMatrix...) Because the tuple-length is unknown and because inference gives up easily in the face of missing type parameters, the generator expressions in the previous implementation were poorly inferred. * Use Vector{String} in Cmd field type * Introduce ntupleany and use mapany in more places This also makes mapany safe for iterators without `length` * Add types to some comprehensions and lists * Add some type-asserts and argtypes * AbstractString->String in Distributed.ProcessGroup * Update base/Enums.jl * Update base/abstractarray.jl Co-authored-by: Pablo Zubieta <[email protected]>
1 parent 03b4b01 commit 6f93bbe

File tree

5 files changed

+14
-13
lines changed

5 files changed

+14
-13
lines changed

src/Distributed.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
1313
using Base: Process, Semaphore, JLOptions, buffer_writes, @sync_add,
1414
VERSION_STRING, binding_module, atexit, julia_exename,
1515
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
16-
shell_escape_posixly, uv_error, something, notnothing, isbuffered
16+
shell_escape_posixly, uv_error, something, notnothing, isbuffered,
17+
mapany
1718
using Base.Threads: Event
1819

1920
using Serialization, Sockets

src/cluster.jl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -654,10 +654,10 @@ function create_worker(manager, wconfig)
654654
end
655655
end
656656

657-
all_locs = map(x -> isa(x, Worker) ?
658-
(something(x.config.connect_at, ()), x.id) :
659-
((), x.id, true),
660-
join_list)
657+
all_locs = mapany(x -> isa(x, Worker) ?
658+
(something(x.config.connect_at, ()), x.id) :
659+
((), x.id, true),
660+
join_list)
661661
send_connection_hdr(w, true)
662662
enable_threaded_blas = something(wconfig.enable_threaded_blas, false)
663663
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy())
@@ -765,7 +765,7 @@ let next_pid = 2 # 1 is reserved for the client (always)
765765
end
766766

767767
mutable struct ProcessGroup
768-
name::AbstractString
768+
name::String
769769
workers::Array{Any,1}
770770
refs::Dict{RRID,Any} # global references
771771
topology::Symbol
@@ -1024,8 +1024,8 @@ end
10241024
function _rmprocs(pids, waitfor)
10251025
lock(worker_lock)
10261026
try
1027-
rmprocset = []
1028-
for p in vcat(pids...)
1027+
rmprocset = Union{LocalProcess, Worker}[]
1028+
for p in pids
10291029
if p == 1
10301030
@warn "rmprocs: process 1 not removed"
10311031
else

src/macros.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ function preduce(reducer, f, R)
275275
schedule(t)
276276
push!(w_exec, t)
277277
end
278-
reduce(reducer, [fetch(t) for t in w_exec])
278+
reduce(reducer, Any[fetch(t) for t in w_exec])
279279
end
280280

281281
function pfor(f, R)

src/managers.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
504504
end
505505

506506
function connect_w2w(pid::Int, config::WorkerConfig)
507-
(rhost, rport) = notnothing(config.connect_at)::Tuple{AbstractString, Int}
507+
(rhost, rport) = notnothing(config.connect_at)::Tuple{String, Int}
508508
config.host = rhost
509509
config.port = rport
510510
(s, bind_addr) = connect_to_worker(rhost, rport)

src/pmap.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ extract_exception(e) = isa(e, RemoteException) ? e.captured.ex : e
201201

202202
function process_batch_errors!(p, f, results, on_error, retry_delays, retry_check)
203203
# Handle all the ones in error in another pmap, with batch size set to 1
204-
reprocess = []
204+
reprocess = Tuple{Int,BatchProcessingError}[]
205205
for (idx, v) in enumerate(results)
206206
if isa(v, BatchProcessingError)
207207
push!(reprocess, (idx,v))
@@ -210,14 +210,14 @@ function process_batch_errors!(p, f, results, on_error, retry_delays, retry_chec
210210

211211
if length(reprocess) > 0
212212
errors = [x[2] for x in reprocess]
213-
exceptions = [x.ex for x in errors]
213+
exceptions = Any[x.ex for x in errors]
214214
state = iterate(retry_delays)
215215
state !== nothing && (state = state[2])
216216
error_processed = let state=state
217217
if (length(retry_delays)::Int > 0) &&
218218
(retry_check === nothing || all([retry_check(state,ex)[2] for ex in exceptions]))
219219
# BatchProcessingError.data is a tuple of original args
220-
pmap(x->f(x...), p, [x.data for x in errors];
220+
pmap(x->f(x...), p, Any[x.data for x in errors];
221221
on_error = on_error, retry_delays = collect(retry_delays)[2:end::Int], retry_check = retry_check)
222222
elseif on_error !== nothing
223223
map(on_error, exceptions)

0 commit comments

Comments
 (0)