Skip to content

Commit a1e0726

Browse files
authored
Support distributed File.Stream (#12578)
1 parent 90c8327 commit a1e0726

File tree

9 files changed

+348
-306
lines changed

9 files changed

+348
-306
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,9 @@ test_stdlib: compile
281281
@ echo "==> elixir (ex_unit)"
282282
$(Q) exec epmd & exit
283283
$(Q) if [ "$(OS)" = "Windows_NT" ]; then \
284-
cd lib/elixir && cmd //C call ../../bin/elixir.bat -r "test/elixir/test_helper.exs" -pr "test/elixir/**/$(TEST_FILES)"; \
284+
cd lib/elixir && cmd //C call ../../bin/elixir.bat --sname primary -r "test/elixir/test_helper.exs" -pr "test/elixir/**/$(TEST_FILES)"; \
285285
else \
286-
cd lib/elixir && ../../bin/elixir -r "test/elixir/test_helper.exs" -pr "test/elixir/**/$(TEST_FILES)"; \
286+
cd lib/elixir && ../../bin/elixir --sname primary -r "test/elixir/test_helper.exs" -pr "test/elixir/**/$(TEST_FILES)"; \
287287
fi
288288

289289
#==> Dialyzer tasks

lib/elixir/lib/file.ex

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,6 +1682,11 @@ defmodule File do
16821682
using the `:line` option, CRLF line breaks (`"\r\n"`) are normalized
16831683
to LF (`"\n"`).
16841684
1685+
Similar to other file operations, a stream can be created in one node
1686+
and forwarded to another node. Once the stream is opened in another node,
1687+
a request will be sent to the creator node to spawn a process for file
1688+
streaming.
1689+
16851690
Operating the stream can fail on open for the same reasons as
16861691
`File.open!/2`. Note that the file is automatically opened each time streaming
16871692
begins. There is no need to pass `:read` and `:write` modes, as those are
@@ -1692,12 +1697,13 @@ defmodule File do
16921697
Since Elixir controls when the streamed file is opened, the underlying
16931698
device cannot be shared and as such it is convenient to open the file
16941699
in raw mode for performance reasons. Therefore, Elixir **will** open
1695-
streams in `:raw` mode with the `:read_ahead` option unless an encoding
1696-
is specified. This means any data streamed into the file must be
1697-
converted to `t:iodata/0` type. If you pass, for example, `[encoding: :utf8]`
1698-
or `[encoding: {:utf16, :little}]` in the modes parameter,
1699-
the underlying stream will use `IO.write/2` and the `String.Chars` protocol
1700-
to convert the data. See `IO.binwrite/2` and `IO.write/2` .
1700+
streams in `:raw` mode with the `:read_ahead` option if the stream is
1701+
open in the same node as it is created and no encoding has been specified.
1702+
This means any data streamed into the file must be converted to `t:iodata/0`
1703+
type. If you pass, for example, `[encoding: :utf8]` or
1704+
`[encoding: {:utf16, :little}]` in the modes parameter, the underlying stream
1705+
will use `IO.write/2` and the `String.Chars` protocol to convert the data.
1706+
See `IO.binwrite/2` and `IO.write/2` .
17011707
17021708
One may also consider passing the `:delayed_write` option if the stream
17031709
is meant to be written to under a tight loop.
@@ -1707,8 +1713,8 @@ defmodule File do
17071713
If you pass `:trim_bom` in the modes parameter, the stream will
17081714
trim UTF-8, UTF-16 and UTF-32 byte order marks when reading from file.
17091715
1710-
Note that this function does not try to discover the file encoding basing
1711-
on BOM.
1716+
Note that this function does not try to discover the file encoding
1717+
based on BOM.
17121718
17131719
## Examples
17141720

lib/elixir/lib/file/stream.ex

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ defmodule File.Stream do
88
* `modes` - the file modes
99
* `raw` - a boolean indicating if bin functions should be used
1010
* `line_or_bytes` - if reading should read lines or a given number of bytes
11+
* `node` - the node the file belongs to
1112
1213
"""
1314

14-
defstruct path: nil, modes: [], line_or_bytes: :line, raw: true
15+
defstruct path: nil, modes: [], line_or_bytes: :line, raw: true, node: nil
1516

1617
@type t :: %__MODULE__{}
1718

@@ -32,19 +33,29 @@ defmodule File.Stream do
3233
modes
3334
end
3435

35-
%File.Stream{path: path, modes: modes, raw: raw, line_or_bytes: line_or_bytes}
36+
%File.Stream{path: path, modes: modes, raw: raw, line_or_bytes: line_or_bytes, node: node()}
37+
end
38+
39+
@doc false
40+
def __open__(%File.Stream{path: path, node: node}, modes) when node == node() do
41+
:file.open(path, modes)
42+
end
43+
44+
@doc false
45+
def __open__(%File.Stream{path: path, node: node}, modes) do
46+
:erpc.call(node, :file_io_server, :start, [self(), path, List.delete(modes, :raw)])
3647
end
3748

3849
defimpl Collectable do
39-
def into(%{path: path, modes: modes, raw: raw} = stream) do
50+
def into(%{modes: modes, raw: raw} = stream) do
4051
modes = for mode <- modes, mode not in [:read], do: mode
4152

42-
case :file.open(path, [:write | modes]) do
53+
case File.Stream.__open__(stream, [:write | modes]) do
4354
{:ok, device} ->
4455
{:ok, into(device, stream, raw)}
4556

4657
{:error, reason} ->
47-
raise File.Error, reason: reason, action: "stream", path: path
58+
raise File.Error, reason: reason, action: "stream", path: stream.path
4859
end
4960
end
5061

@@ -73,14 +84,14 @@ defmodule File.Stream do
7384
defimpl Enumerable do
7485
@read_ahead_size 64 * 1024
7586

76-
def reduce(%{path: path, modes: modes, line_or_bytes: line_or_bytes, raw: raw}, acc, fun) do
87+
def reduce(%{modes: modes, line_or_bytes: line_or_bytes, raw: raw} = stream, acc, fun) do
7788
start_fun = fn ->
78-
case :file.open(path, read_modes(modes)) do
89+
case File.Stream.__open__(stream, read_modes(modes)) do
7990
{:ok, device} ->
8091
if :trim_bom in modes, do: trim_bom(device, raw) |> elem(0), else: device
8192

8293
{:error, reason} ->
83-
raise File.Error, reason: reason, action: "stream", path: path
94+
raise File.Error, reason: reason, action: "stream", path: stream.path
8495
end
8596
end
8697

@@ -93,27 +104,20 @@ defmodule File.Stream do
93104
Stream.resource(start_fun, next_fun, &:file.close/1).(acc, fun)
94105
end
95106

96-
def count(%{path: path, modes: modes, line_or_bytes: :line} = stream) do
107+
def count(%{modes: modes, line_or_bytes: :line, path: path} = stream) do
97108
pattern = :binary.compile_pattern("\n")
98109
counter = &count_lines(&1, path, pattern, read_function(stream), 0)
99-
100-
case File.open(path, read_modes(modes), counter) do
101-
{:ok, count} ->
102-
{:ok, count}
103-
104-
{:error, reason} ->
105-
raise File.Error, reason: reason, action: "stream", path: path
106-
end
110+
{:ok, open!(stream, modes, counter)}
107111
end
108112

109-
def count(%{path: path, line_or_bytes: bytes, raw: true, modes: modes}) do
110-
case File.stat(path) do
113+
def count(%{path: path, line_or_bytes: bytes, raw: true, modes: modes, node: node} = stream) do
114+
case :erpc.call(node, File, :stat, [path]) do
111115
{:ok, %{size: 0}} ->
112116
{:error, __MODULE__}
113117

114118
{:ok, %{size: size}} ->
115119
remainder = if rem(size, bytes) == 0, do: 0, else: 1
116-
{:ok, div(size, bytes) + remainder - count_raw_bom(path, modes)}
120+
{:ok, div(size, bytes) + remainder - count_raw_bom(stream, modes)}
117121

118122
{:error, reason} ->
119123
raise File.Error, reason: reason, action: "stream", path: path
@@ -132,9 +136,23 @@ defmodule File.Stream do
132136
{:error, __MODULE__}
133137
end
134138

135-
defp count_raw_bom(path, modes) do
139+
defp open!(stream, modes, fun) do
140+
case File.Stream.__open__(stream, read_modes(modes)) do
141+
{:ok, device} ->
142+
try do
143+
fun.(device)
144+
after
145+
:file.close(device)
146+
end
147+
148+
{:error, reason} ->
149+
raise File.Error, reason: reason, action: "stream", path: stream.path
150+
end
151+
end
152+
153+
defp count_raw_bom(stream, modes) do
136154
if :trim_bom in modes do
137-
File.open!(path, read_modes(modes), &(&1 |> trim_bom(true) |> elem(1)))
155+
open!(stream, read_modes(modes), &(&1 |> trim_bom(true) |> elem(1)))
138156
else
139157
0
140158
end

0 commit comments

Comments
 (0)