Skip to content

Commit a167355

Browse files
committed
Remove Explorer.Remote
Data can be explicitly transfered with dump_ipc/load_ipc instead.
1 parent 6f84e15 commit a167355

File tree

23 files changed

+170
-1209
lines changed

23 files changed

+170
-1209
lines changed

lib/explorer/application.ex

Lines changed: 0 additions & 13 deletions
This file was deleted.

lib/explorer/backend/data_frame.ex

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,18 +166,12 @@ defmodule Explorer.Backend.DataFrame do
166166

167167
@callback lazy() :: module()
168168
@callback lazy(df) :: df
169-
@callback compute(df) :: df
169+
@callback collect(df) :: df
170170
@callback from_tabular(Table.Reader.t(), io_dtypes) :: df
171171
@callback from_series([{binary(), Series.t()}]) :: df
172172
@callback to_rows(df, atom_keys? :: boolean()) :: [map()]
173173
@callback to_rows_stream(df, atom_keys? :: boolean(), chunk_size :: integer()) :: Enumerable.t()
174174

175-
# Ownership
176-
177-
@callback owner_reference(df) :: reference() | nil
178-
@callback owner_import(term()) :: io_result(df)
179-
@callback owner_export(df) :: io_result(term())
180-
181175
# Introspection
182176

183177
@callback n_rows(df) :: integer()

lib/explorer/backend/lazy_series.ex

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,6 @@ defmodule Explorer.Backend.LazySeries do
254254
# different lazy series from different nodes, and
255255
# Explorer.Shared.apply_series was written such that a LazySeries
256256
# always wins. This means LazySeries can have references, but
257-
# operations will always run on the node with the LazySeries,
258-
# so they never have to be imported/exported.
259-
def owner_reference(s), do: s.data.resource
260257

261258
@impl true
262259
def cast(%Series{} = s, dtype) do
@@ -1276,8 +1273,6 @@ defmodule Explorer.Backend.LazySeries do
12761273
frequencies: 1,
12771274
qcut: 8,
12781275
mask: 2,
1279-
owner_import: 1,
1280-
owner_export: 1,
12811276
to_iovec: 1,
12821277
to_list: 1,
12831278
index_of: 2

lib/explorer/backend/query_frame.ex

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,13 @@ defmodule Explorer.Backend.QueryFrame do
3131
names: df.names,
3232
dtypes: df.dtypes,
3333
backend: module,
34-
resource: module.owner_reference(df)
34+
resource: nil
3535
},
3636
df.names,
3737
df.dtypes
3838
)
3939
end
4040

41-
# We don't implement owner reference here because no
42-
# cross node operations happen at the lazy frame level.
43-
# Instead, we store the resource and we delegate them
44-
# to the underlying lazy series.
45-
@impl Backend.DataFrame
46-
def owner_reference(_), do: nil
4741

4842
@impl Backend.DataFrame
4943
def lazy, do: __MODULE__

lib/explorer/backend/series.ex

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,6 @@ defmodule Explorer.Backend.Series do
3535
@callback strptime(s, String.t()) :: s
3636
@callback strftime(s, String.t()) :: s
3737

38-
# Ownership
39-
40-
@callback owner_reference(s) :: reference() | nil
41-
@callback owner_import(term()) :: io_result(s)
42-
@callback owner_export(s) :: io_result(term())
43-
4438
# Introspection
4539

4640
@callback size(s) :: non_neg_integer() | lazy_s()

lib/explorer/data_frame.ex

Lines changed: 25 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ defmodule Explorer.DataFrame do
204204

205205

206206
@enforce_keys [:data, :groups, :names, :dtypes]
207-
defstruct [:data, :groups, :names, :dtypes, :remote]
207+
defstruct [:data, :groups, :names, :dtypes]
208208

209209
@typedoc """
210210
Represents a column name as atom or string.
@@ -594,7 +594,6 @@ defmodule Explorer.DataFrame do
594594
595595
* `:lazy` - force the results into the lazy version of the current backend.
596596
597-
* `:node` - The Erlang node to allocate the data frame on.
598597
599598
* `:encoding` - Encoding to use when reading the file. For now, the only possible values are `utf8` and `utf8-lossy`.
600599
The utf8-lossy option means that invalid utf8 values are replaced with � characters. (default: `"utf8"`)
@@ -607,7 +606,7 @@ defmodule Explorer.DataFrame do
607606
@spec from_csv(filename :: String.t() | fs_entry(), opts :: Keyword.t()) ::
608607
{:ok, DataFrame.t()} | {:error, Exception.t()}
609608
def from_csv(filename, opts \\ []) do
610-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
609+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
611610

612611
opts =
613612
Keyword.validate!(opts,
@@ -791,7 +790,7 @@ defmodule Explorer.DataFrame do
791790
@spec load_csv(contents :: String.t(), opts :: Keyword.t()) ::
792791
{:ok, DataFrame.t()} | {:error, Exception.t()}
793792
def load_csv(contents, opts \\ []) do
794-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
793+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
795794

796795
opts =
797796
Keyword.validate!(opts,
@@ -867,13 +866,12 @@ defmodule Explorer.DataFrame do
867866
868867
* `:lazy` - force the results into the lazy version of the current backend.
869868
870-
* `:node` - The Erlang node to allocate the data frame on.
871869
"""
872870
@doc type: :io
873871
@spec from_parquet(filename :: String.t() | fs_entry(), opts :: Keyword.t()) ::
874872
{:ok, DataFrame.t()} | {:error, Exception.t()}
875873
def from_parquet(filename, opts \\ []) do
876-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
874+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
877875

878876
opts =
879877
Keyword.validate!(opts,
@@ -1065,7 +1063,7 @@ defmodule Explorer.DataFrame do
10651063
@spec load_parquet(contents :: binary(), opts :: Keyword.t()) ::
10661064
{:ok, DataFrame.t()} | {:error, Exception.t()}
10671065
def load_parquet(contents, opts \\ []) do
1068-
opts = Keyword.validate!(opts, [:backend, :lazy, :node])
1066+
opts = Keyword.validate!(opts, [:backend, :lazy])
10691067
backend = backend_from_options!(opts)
10701068
Shared.apply_init(backend, :load_parquet, [contents], opts)
10711069
end
@@ -1100,13 +1098,12 @@ defmodule Explorer.DataFrame do
11001098
11011099
* `:lazy` - force the results into the lazy version of the current backend.
11021100
1103-
* `:node` - The Erlang node to allocate the data frame on.
11041101
"""
11051102
@doc type: :io
11061103
@spec from_ipc(filename :: String.t() | fs_entry(), opts :: Keyword.t()) ::
11071104
{:ok, DataFrame.t()} | {:error, Exception.t()}
11081105
def from_ipc(filename, opts \\ []) do
1109-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
1106+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
11101107

11111108
opts =
11121109
Keyword.validate!(opts,
@@ -1336,7 +1333,7 @@ defmodule Explorer.DataFrame do
13361333
@spec load_ipc(contents :: binary(), opts :: Keyword.t()) ::
13371334
{:ok, DataFrame.t()} | {:error, Exception.t()}
13381335
def load_ipc(contents, opts \\ []) do
1339-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
1336+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
13401337

13411338
opts =
13421339
Keyword.validate!(opts,
@@ -1381,7 +1378,6 @@ defmodule Explorer.DataFrame do
13811378
13821379
* `:lazy` - force the results into the lazy version of the current backend.
13831380
1384-
* `:node` - The Erlang node to allocate the data frame on.
13851381
13861382
* `:config` - An optional struct, keyword list or map, normally associated with remote
13871383
file systems. See [IO section](#module-io-operations) for more details. (default: `nil`)
@@ -1391,7 +1387,7 @@ defmodule Explorer.DataFrame do
13911387
@spec from_ipc_stream(filename :: String.t() | fs_entry()) ::
13921388
{:ok, DataFrame.t()} | {:error, Exception.t()}
13931389
def from_ipc_stream(filename, opts \\ []) do
1394-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
1390+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
13951391

13961392
opts = Keyword.validate!(opts, columns: nil, config: nil)
13971393
backend = backend_from_options!(backend_opts)
@@ -1525,7 +1521,7 @@ defmodule Explorer.DataFrame do
15251521
@spec load_ipc_stream(contents :: binary(), opts :: Keyword.t()) ::
15261522
{:ok, DataFrame.t()} | {:error, Exception.t()}
15271523
def load_ipc_stream(contents, opts \\ []) do
1528-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
1524+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
15291525
opts = Keyword.validate!(opts, columns: nil)
15301526

15311527
backend = backend_from_options!(backend_opts)
@@ -1566,7 +1562,6 @@ defmodule Explorer.DataFrame do
15661562
15671563
* `:lazy` - force the results into the lazy version of the current backend.
15681564
1569-
* `:node` - The Erlang node to allocate the data frame on.
15701565
15711566
* `:config` - An optional struct, keyword list or map, normally associated with remote
15721567
file systems. See [IO section](#module-io-operations) for more details. (default: `nil`)
@@ -1576,7 +1571,7 @@ defmodule Explorer.DataFrame do
15761571
@spec from_ndjson(filename :: String.t() | fs_entry(), opts :: Keyword.t()) ::
15771572
{:ok, DataFrame.t()} | {:error, Exception.t()}
15781573
def from_ndjson(filename, opts \\ []) do
1579-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
1574+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
15801575

15811576
opts =
15821577
Keyword.validate!(opts,
@@ -1695,7 +1690,7 @@ defmodule Explorer.DataFrame do
16951690
@spec load_ndjson(contents :: String.t(), opts :: Keyword.t()) ::
16961691
{:ok, DataFrame.t()} | {:error, Exception.t()}
16971692
def load_ndjson(contents, opts \\ []) do
1698-
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy, :node])
1693+
{backend_opts, opts} = Keyword.split(opts, [:backend, :lazy])
16991694

17001695
opts =
17011696
Keyword.validate!(opts,
@@ -1767,47 +1762,26 @@ defmodule Explorer.DataFrame do
17671762
def to_lazy(df), do: Shared.apply_dataframe(df, :lazy)
17681763

17691764
@doc """
1770-
Computes and collects a data frame to the current node.
1765+
Collects a lazy data frame into an eager one, executing the query.
17711766
1772-
If the data frame is already in the current node, it is computed
1773-
but no transfer happens.
1767+
It is the opposite of `lazy/1`. If the data frame is already eager,
1768+
it returns the data frame as-is.
1769+
1770+
Collecting a grouped dataframe should return a grouped dataframe.
17741771
17751772
## Examples
17761773
1777-
series = Explorer.Series.from_list([1, 2, 3], node: :some@node)
1778-
Explorer.Series.collect(series)
1774+
iex> df = Explorer.DataFrame.new(a: [1, 2, 3])
1775+
iex> Explorer.DataFrame.collect(df)
1776+
#Explorer.DataFrame<
1777+
Polars[3 x 1]
1778+
a s64 [1, 2, 3]
1779+
>
17791780
17801781
"""
17811782
@doc type: :conversion
17821783
@spec collect(df :: DataFrame.t()) :: DataFrame.t()
1783-
def collect(df) do
1784-
%DataFrame{data: %impl{}} = df = compute(df)
1785-
1786-
case impl.owner_reference(df) do
1787-
ref when is_reference(ref) and node(ref) != node() ->
1788-
with {:ok, exported} <- :erpc.call(node(ref), impl, :owner_export, [df]),
1789-
{:ok, imported} <- impl.owner_import(exported) do
1790-
imported
1791-
else
1792-
{:error, exception} -> raise exception
1793-
end
1794-
1795-
_ ->
1796-
df
1797-
end
1798-
end
1799-
1800-
@doc """
1801-
Computes the lazy data frame into an eager one, executing the query.
1802-
1803-
It is the opposite of `lazy/1`. If it is not a lazy data frame,
1804-
this is a noop.
1805-
1806-
Collecting a grouped dataframe should return a grouped dataframe.
1807-
"""
1808-
@doc type: :conversion
1809-
@spec compute(df :: DataFrame.t()) :: DataFrame.t()
1810-
def compute(df), do: Shared.apply_dataframe(df, :compute)
1784+
def collect(df), do: Shared.apply_dataframe(df, :collect)
18111785

18121786
@doc """
18131787
Creates a new dataframe.
@@ -6956,7 +6930,7 @@ defmodule Explorer.DataFrame do
69566930
end
69576931

69586932
defp out_df(df, names, dtypes) do
6959-
%{df | names: names, dtypes: dtypes, remote: nil}
6933+
%{df | names: names, dtypes: dtypes}
69606934
end
69616935

69626936
defp pairwise_df(df, opts) do
@@ -6975,24 +6949,11 @@ defmodule Explorer.DataFrame do
69756949
import Inspect.Algebra
69766950

69776951
def inspect(df, opts) do
6978-
remote_ref =
6979-
case df.remote do
6980-
{_local_gc, _remote_pid, remote_ref} -> remote_ref
6981-
_ -> df.data.__struct__.owner_reference(df)
6982-
end
6983-
6984-
remote =
6985-
if is_reference(remote_ref) and node(remote_ref) != node() do
6986-
concat(line(), Atom.to_string(node(remote_ref)))
6987-
else
6988-
empty()
6989-
end
6990-
69916952
force_unfit(
69926953
concat([
69936954
color("#Explorer.DataFrame<", :map, opts),
69946955
nest(
6995-
concat([remote, line(), Shared.apply_dataframe(df, :inspect, [opts])]),
6956+
concat([line(), Shared.apply_dataframe(df, :inspect, [opts])]),
69966957
2
69976958
),
69986959
line(),

0 commit comments

Comments
 (0)