Skip to content

Commit 3bde5a4

Browse files
authored
Upgrade required version of vineyard. (#2588)
1 parent 14b9569 commit 3bde5a4

File tree

9 files changed

+27
-9
lines changed

9 files changed

+27
-9
lines changed

.github/workflows/platform-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ jobs:
7171
conda install -n test --quiet --yes -c conda-forge python=$PYTHON skein conda-pack
7272
fi
7373
if [ -n "$WITH_VINEYARD" ]; then
74-
pip install vineyard==0.2.7 -i https://pypi.org/simple
74+
pip install vineyard -i https://pypi.org/simple
7575
7676
mkdir -p /tmp/etcd-download-test
7777
export ETCD_VER=v3.4.13

mars/dataframe/datasource/from_vineyard.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from ... import opcodes as OperandDef
1919
from ...core import OutputType
2020
from ...core.context import get_context
21-
from ...serialization.serializables import StringField
21+
from ...serialization.serializables import Int32Field, StringField
2222
from ...tensor.datasource.from_vineyard import resolve_vineyard_socket
2323
from ...utils import calc_nsplits, has_unknown_shape
2424
from ..operands import DataFrameOperand, DataFrameOperandMixin
@@ -44,6 +44,9 @@ class DataFrameFromVineyard(DataFrameOperand, DataFrameOperandMixin):
4444
# ObjectID in vineyard
4545
object_id = StringField("object_id")
4646

47+
# a dummy attr to make sure ops have different keys
48+
operator_index = Int32Field("operator_index")
49+
4750
def __init__(self, vineyard_socket=None, object_id=None, **kw):
4851
super().__init__(
4952
vineyard_socket=vineyard_socket,
@@ -92,6 +95,7 @@ def tile(cls, op):
9295
for index, worker in enumerate(workers):
9396
chunk_op = op.copy().reset_key()
9497
chunk_op.expect_worker = worker
98+
chunk_op.operator_index = index
9599
out_chunk = chunk_op.new_chunk(
96100
[],
97101
dtypes=dtypes,

mars/dataframe/datastore/tests/test_datastore_execution.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def test_vineyard_execution(setup):
237237
"check_index_value": False,
238238
}
239239

240-
with vineyard.deploy.local.start_vineyardd() as (_, vineyard_socket):
240+
with vineyard.deploy.local.start_vineyardd() as (_, vineyard_socket, _):
241241
raw = pd.DataFrame({"a": np.arange(0, 55), "b": np.arange(55, 110)})
242242
a = md.DataFrame(raw, chunk_size=15)
243243
a.execute() # n.b.: pre-execute

mars/dataframe/datastore/to_vineyard.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from ... import opcodes as OperandDef
1919
from ...core import OutputType
2020
from ...core.operand.base import SchedulingHint
21-
from ...serialization.serializables import StringField
21+
from ...serialization.serializables import FieldTypes, StringField, TupleField
2222
from ...tensor.datastore.to_vineyard import resolve_vineyard_socket
2323
from ..operands import DataFrameOperand, DataFrameOperandMixin
2424
from ..utils import parse_index
@@ -37,6 +37,9 @@ class DataFrameToVineyardChunk(DataFrameOperand, DataFrameOperandMixin):
3737
# vineyard ipc socket
3838
vineyard_socket = StringField("vineyard_socket")
3939

40+
# a dummy attr to make sure ops have different keys
41+
operator_index = TupleField("operator_index", FieldTypes.int32)
42+
4043
def __init__(self, vineyard_socket=None, dtypes=None, **kw):
4144
super().__init__(
4245
vineyard_socket=vineyard_socket,
@@ -75,6 +78,7 @@ def tile(cls, op):
7578
for idx, chunk in enumerate(op.inputs[0].chunks):
7679
chunk_op = op.copy().reset_key()
7780
chunk_op.scheduling_hint = scheduling_hint
81+
chunk_op.operator_index = chunk.index
7882
out_chunk = chunk_op.new_chunk(
7983
[chunk],
8084
shape=(1, 1),

mars/storage/vineyard.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def __init__(self, vineyard_size: int, vineyard_socket: str = None):
127127
async def setup(cls, **kwargs) -> Tuple[Dict, Dict]:
128128
loop = asyncio.get_running_loop()
129129
etcd_endpoints = kwargs.pop("etcd_endpoints", "127.0.0.1:2379")
130+
etcd_prefix = kwargs.pop("etcd_prefix", "vineyard")
130131
vineyard_size = kwargs.pop("vineyard_size", "1Gi")
131132
vineyard_socket = kwargs.pop("vineyard_socket", None)
132133
vineyardd_path = kwargs.pop("vineyardd_path", None)
@@ -142,6 +143,7 @@ async def setup(cls, **kwargs) -> Tuple[Dict, Dict]:
142143
else:
143144
vineyard_store = vineyard.deploy.local.start_vineyardd(
144145
etcd_endpoints,
146+
etcd_prefix,
145147
vineyardd_path,
146148
vineyard_size,
147149
vineyard_socket,

mars/tensor/datasource/from_vineyard.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import numpy as np
1717

1818
from ... import opcodes as OperandDef
19-
from ...serialization.serializables import StringField
19+
from ...serialization.serializables import Int32Field, StringField
2020
from ...storage.base import StorageLevel
2121
from ...utils import calc_nsplits, has_unknown_shape
2222
from ...core.context import get_context
@@ -50,6 +50,9 @@ class TensorFromVineyard(TensorNoInput):
5050
# ObjectID in vineyard
5151
object_id = StringField("object_id")
5252

53+
# a dummy attr to make sure ops have different keys
54+
operator_index = Int32Field("operator_index")
55+
5356
def __init__(self, vineyard_socket=None, object_id=None, **kw):
5457
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw)
5558

@@ -62,6 +65,7 @@ def tile(cls, op):
6265
for index, worker in enumerate(workers):
6366
chunk_op = op.copy().reset_key()
6467
chunk_op.expect_worker = worker
68+
chunk_op.operator_index = index
6569
out_chunk = chunk_op.new_chunk(
6670
[], dtype=np.dtype(object), shape=(1,), index=(index,)
6771
)
@@ -183,7 +187,7 @@ def fromvineyard(tensor, vineyard_socket=None):
183187
dtype=np.dtype("byte"),
184188
gpu=False,
185189
)
186-
meta = metaop(shape=(np.nan,), chunk_size=(1,))
190+
meta = metaop(shape=(np.nan,), chunk_size=(np.nan,))
187191
op = TensorFromVineyardChunk(
188192
vineyard_socket=vineyard_socket, object_id=object_id, gpu=False
189193
)

mars/tensor/datastore/tests/test_datastore_execution.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ def test_vineyard_execution(setup):
223223
"check_shape": False,
224224
}
225225

226-
with vineyard.deploy.local.start_vineyardd() as (_, vineyard_socket):
226+
with vineyard.deploy.local.start_vineyardd() as (_, vineyard_socket, _):
227227
a = tensor(raw, chunk_size=15)
228228
a.execute() # n.b.: pre-execute
229229

mars/tensor/datastore/to_vineyard.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
from ... import opcodes as OperandDef
1919
from ...core.operand.base import SchedulingHint
20-
from ...serialization.serializables import KeyField, StringField
20+
from ...serialization.serializables import FieldTypes, KeyField, StringField, TupleField
2121
from ...storage.base import StorageLevel
2222
from ..datasource import tensor as astensor
2323
from .core import TensorDataStore
@@ -52,6 +52,9 @@ class TensorVineyardDataStoreChunk(TensorDataStore):
5252
# vineyard ipc socket
5353
vineyard_socket = StringField("vineyard_socket")
5454

55+
# a dummy attr to make sure ops have different keys
56+
operator_index = TupleField("operator_index", FieldTypes.int32)
57+
5558
def __init__(self, vineyard_socket=None, **kw):
5659
super().__init__(vineyard_socket=vineyard_socket, **kw)
5760

@@ -71,6 +74,7 @@ def tile(cls, op):
7174
for idx, chunk in enumerate(op.inputs[0].chunks):
7275
chunk_op = op.copy().reset_key()
7376
chunk_op.scheduling_hint = scheduling_hint
77+
chunk_op.operator_index = chunk.index
7478
out_chunk = chunk_op.new_chunk(
7579
[chunk], dtype=np.dtype("O"), shape=(1,), index=(idx,)
7680
)

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ kubernetes =
7474
ray =
7575
ray<1.4
7676
vineyard =
77-
vineyard==0.2.7; sys.platform != "win32"
77+
vineyard>=0.3; sys.platform != "win32"
7878

7979
[tool:pytest]
8080
markers =

0 commit comments

Comments
 (0)