Skip to content

Commit ac5089c

Browse files
committed
working scheduler
1 parent da31660 commit ac5089c

File tree

16 files changed

+145
-44
lines changed

16 files changed

+145
-44
lines changed

cloudquery/sdk/message/sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ def __init__(self, record: pa.RecordBatch):
1111

1212

1313
class SyncMigrateTableMessage:
14-
def __init__(self, table: pa.Schema):
15-
self.table = table
14+
def __init__(self, schema: pa.Schema):
15+
self.schema = schema

cloudquery/sdk/scalar/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from .scalar import Scalar, ScalarInvalidTypeError, ScalarFactory
1+
from .scalar import Scalar, ScalarInvalidTypeError
2+
from .scalar_factory import ScalarFactory
23
from .binary import Binary
34
from .bool import Bool
45
from .date32 import Date32

cloudquery/sdk/scalar/binary.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ def __eq__(self, scalar: Scalar) -> bool:
1111
if type(scalar) == Binary:
1212
return self._value == scalar._value and self._valid == scalar._valid
1313
return False
14+
15+
@property
16+
def value(self):
17+
return self._value
1418

1519
def set(self, scalar):
1620
if scalar is None:

cloudquery/sdk/scalar/bool.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ def __eq__(self, scalar: Scalar) -> bool:
2626
if type(scalar) == Bool:
2727
return self._value == scalar._value and self._valid == scalar._valid
2828
return False
29-
29+
30+
@property
31+
def value(self):
32+
return self._value
33+
3034
def set(self, value: Any):
3135
if value is None:
3236
return

cloudquery/sdk/scalar/date32.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ def __eq__(self, scalar: Scalar) -> bool:
1414
if type(scalar) == Date32:
1515
return self._value == scalar._value and self._valid == scalar._valid
1616
return False
17+
18+
@property
19+
def value(self):
20+
return self._value
1721

1822
def set(self, value: Any):
1923
if value is None:

cloudquery/sdk/scalar/float64.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ def __eq__(self, scalar: Scalar) -> bool:
1111
if type(scalar) == Float64:
1212
return self._value == scalar._value and self._valid == scalar._valid
1313
return False
14+
15+
@property
16+
def value(self):
17+
return self._value
1418

1519
def set(self, value):
1620
if value is None:

cloudquery/sdk/scalar/int64.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ def __eq__(self, scalar: Scalar) -> bool:
1111
if type(scalar) == Int64:
1212
return self._value == scalar._value and self._valid == scalar._valid
1313
return False
14-
14+
15+
@property
16+
def value(self):
17+
return self._value
18+
1519
def set(self, value):
1620
if value is None:
1721
return
@@ -23,8 +27,8 @@ def set(self, value):
2327
elif type(value) == str:
2428
try:
2529
self._value = int(value)
26-
except ValueError:
27-
raise ScalarInvalidTypeError("Invalid type for Int64 scalar")
30+
except ValueError as e:
31+
raise ScalarInvalidTypeError("Invalid type for Int64 scalar") from e
2832
else:
29-
raise ScalarInvalidTypeError("Invalid type for Int64 scalar")
33+
raise ScalarInvalidTypeError("Invalid type {} for Int64 scalar".format(type(value)))
3034
self._valid = True

cloudquery/sdk/scalar/scalar.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import pyarrow as pa
2-
from .int64 import Int64
31

42
class ScalarInvalidTypeError(Exception):
53
pass
@@ -8,16 +6,9 @@ class Scalar:
86
@property
97
def is_valid(self) -> bool:
108
return self._valid
9+
10+
@property
11+
def value(self):
12+
raise NotImplementedError("Scalar value not implemented")
1113

1214

13-
class ScalarFactory:
14-
def __init__(self):
15-
self._type_map = {
16-
pa.int64: lambda dt: Int64(),
17-
}
18-
19-
def new_scalar(self, dt):
20-
if dt in self._type_map:
21-
return self._type_map[dt]()
22-
else:
23-
raise ScalarInvalidTypeError("Invalid type for scalar")
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
2+
import pyarrow as pa
3+
from .scalar import ScalarInvalidTypeError
4+
from .int64 import Int64
5+
6+
class ScalarFactory:
7+
def __init__(self):
8+
pass
9+
10+
def new_scalar(self, dt):
11+
dt_id = dt.id
12+
if dt_id == pa.types.lib.Type_INT64:
13+
return Int64()
14+
else:
15+
raise ScalarInvalidTypeError("Invalid type {} for scalar".format(dt))
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .scheduler import Scheduler
2+
from .table_resolver import TableResolver

0 commit comments

Comments
 (0)