Skip to content

Commit 82588d5

Browse files
authored
feat: Use date and datetime for related kinds (#42)
* feat: Use date and datetime for related kinds * downgrade boto * make it 3.9 compatible
1 parent 198b0e7 commit 82588d5

File tree

10 files changed

+81
-32
lines changed

10 files changed

+81
-32
lines changed

cloud2sql/collect_plugins.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,21 +76,21 @@ def collectors(raw_config: Json, feedback: CoreFeedback) -> Dict[str, BaseCollec
7676

7777
def configure(path_to_config: Optional[str]) -> Json:
7878
# at least one key should be present
79-
def require(keys: List[str], obj: Json, msg: str):
79+
def require(keys: List[str], obj: Json, msg: str) -> None:
8080
if not (set(keys) & obj.keys()):
8181
raise ValueError(msg)
8282

8383
config = {}
8484
if path_to_config:
8585
with open(path_to_config) as f:
86-
config = yaml.safe_load(f) # type: ignore
86+
config = yaml.safe_load(f)
8787

8888
if "sources" not in config:
8989
raise ValueError("No sources configured")
9090
if "destinations" not in config:
9191
raise ValueError("No destinations configured")
9292

93-
def validate_arrow_config(config: Json):
93+
def validate_arrow_config(config: Json) -> None:
9494
require(["format"], config, "No format configured for arrow destination")
9595
if not config["format"] in ["parquet", "csv"]:
9696
raise ValueError("Format must be either parquet or csv")
@@ -202,7 +202,7 @@ def collect_to_file(
202202
# ingest the data
203203
writer = ArrowWriter(model, output_config)
204204
node: BaseResource
205-
for node in sorted(collector.graph.nodes, key=lambda n: n.kind):
205+
for node in sorted(collector.graph.nodes, key=lambda n: n.kind): # type: ignore
206206
exported = prepare_node(node, collector)
207207
writer.insert_node(exported)
208208
ne_current += 1

cloud2sql/show_progress.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,4 @@ def walk_node(nid: str, node: Dict[str, Any], rt: Optional[RichTree] = None) ->
6666
walk_node(nid, child, sub)
6767
return sub
6868

69-
return walk_node(progress.sub_tree.root, progress.sub_tree.to_dict(with_data=True))
69+
return walk_node(progress.sub_tree.root, progress.sub_tree.to_dict(with_data=True)) # type: ignore

cloud2sql/sql.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,30 @@
11
import logging
22
from abc import ABC, abstractmethod
3-
from typing import List, Any, Type, Tuple, Dict, Iterator
3+
from datetime import datetime, date
4+
from typing import List, Any, Type, Tuple, Dict, Iterator, Optional
45

56
from resotoclient.models import Kind, Model
67
from resotolib.args import Namespace
78
from resotolib.types import Json
8-
from sqlalchemy import Boolean, Column, Float, Integer, JSON, MetaData, String, Table, DDL
9-
from sqlalchemy.engine import Engine, Connection
9+
from resotolib.utils import UTC_Date_Format
10+
from sqlalchemy import (
11+
Boolean,
12+
Column,
13+
Float,
14+
Integer,
15+
JSON,
16+
MetaData,
17+
String,
18+
Table,
19+
DDL,
20+
DateTime,
21+
Date,
22+
TypeDecorator,
23+
)
24+
from sqlalchemy.engine import Engine, Connection, Dialect
1025
from sqlalchemy.sql.ddl import DropTable, DropConstraint
1126
from sqlalchemy.sql.dml import ValuesBase
1227

13-
from cloud2sql.util import value_in_path
1428
from cloud2sql.schema_utils import (
1529
base_kinds,
1630
temp_prefix,
@@ -19,10 +33,41 @@
1933
get_link_table_name,
2034
kind_properties,
2135
)
36+
from cloud2sql.util import value_in_path
2237

2338
log = logging.getLogger("resoto.cloud2sql")
2439

2540

41+
class DateTimeString(TypeDecorator): # type: ignore
42+
"""
43+
This type decorator translates between string (python) and datetime (sqlalchemy) types.
44+
"""
45+
46+
impl = DateTime
47+
cache_ok = True
48+
49+
def process_bind_param(self, value: Optional[str], dialect: Dialect) -> Optional[datetime]:
50+
return datetime.strptime(value, UTC_Date_Format) if value else None
51+
52+
def process_result_value(self, value: Optional[datetime], dialect: Dialect) -> Optional[str]:
53+
return value.strftime(UTC_Date_Format) if value else None
54+
55+
56+
class DateString(TypeDecorator): # type: ignore
57+
"""
58+
This type decorator translates between string (python) and date (sqlalchemy) types.
59+
"""
60+
61+
impl = Date
62+
cache_ok = True
63+
64+
def process_bind_param(self, value: Optional[str], dialect: Dialect) -> Optional[date]:
65+
return date.fromisoformat(value) if value else None
66+
67+
def process_result_value(self, value: Optional[datetime], dialect: Dialect) -> Optional[str]:
68+
return value.strftime("%Y-%m-%d") if value else None
69+
70+
2671
def sql_kind_to_column_type(kind_name: str, model: Model) -> Any: # Type[TypeEngine[Any]]
2772
kind = model.kinds.get(kind_name)
2873
if "[]" in kind_name:
@@ -33,11 +78,15 @@ def sql_kind_to_column_type(kind_name: str, model: Model) -> Any: # Type[TypeEn
3378
return JSON
3479
elif kind_name in ("int32", "int64"):
3580
return Integer
36-
elif kind_name in "float":
81+
elif kind_name == "float":
3782
return Float
38-
elif kind_name in "double":
83+
elif kind_name == "double":
3984
return Float # use Double with sqlalchemy 2
40-
elif kind_name in ("string", "date", "datetime", "duration"):
85+
elif kind_name == "datetime":
86+
return DateTimeString
87+
elif kind_name == "date":
88+
return DateString
89+
elif kind_name in ("string", "duration"):
4190
return String
4291
elif kind_name == "boolean":
4392
return Boolean

requirements-parquet.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
pyarrow==11.0.0
2-
google-cloud-storage==2.7.0
2+
google-cloud-storage==2.8.0
33
boto3>=1.26.61

requirements-snowflake.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
snowflake-sqlalchemy==1.4.6
1+
snowflake-sqlalchemy==1.4.7

requirements-test.txt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# test dependencies
2-
pytest==7.2.2
3-
black==23.1.0
2+
pytest==7.3.0
3+
black==23.3.0
44
flake8>=6.0.0
5-
mypy==1.1.1
6-
tox==4.4.6
5+
mypy==1.2.0
6+
tox==4.4.12
77
wheel>=0.38.4
8-
coverage==7.2.1
9-
resoto-plugin-example-collector==3.2.5
8+
coverage==7.2.3
9+
resoto-plugin-example-collector>=3.3, <3.4

requirements.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ resotoclient>=1.2.1
88
posthog>=2.2.0
99
requests>=2.28.2
1010

11-
resotolib>=3.2.5, <3.3
11+
resotolib>=3.3, <3.4
1212
# all collector plugins
13-
resoto-plugin-aws>=3.2.5, <3.3
14-
resoto-plugin-digitalocean>=3.2.5, <3.3
15-
resoto-plugin-gcp>=3.2.5, <3.3
16-
resoto-plugin-k8s>=3.2.5, <3.3
13+
resoto-plugin-aws>=3.3, <3.4
14+
resoto-plugin-digitalocean>=3.3, <3.4
15+
resoto-plugin-gcp>=3.3, <3.4
16+
resoto-plugin-k8s>=3.3, <3.4

tests/arrow/model_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_create_schema(model: Model) -> None:
3434

3535

3636
def test_update(parquet_writer: ArrowWriter) -> None:
37-
parquet_writer.insert_node( # type: ignore
37+
parquet_writer.insert_node(
3838
{
3939
"type": "node",
4040
"id": "i-123",
@@ -53,7 +53,7 @@ def test_update(parquet_writer: ArrowWriter) -> None:
5353
},
5454
}
5555
)
56-
parquet_writer.insert_node( # type: ignore
56+
parquet_writer.insert_node(
5757
{
5858
"type": "node",
5959
"id": "v-123",
@@ -71,7 +71,7 @@ def test_update(parquet_writer: ArrowWriter) -> None:
7171
},
7272
}
7373
)
74-
parquet_writer.insert_node({"type": "edge", "from": "i-123", "to": "v-123"}) # type: ignore
74+
parquet_writer.insert_node({"type": "edge", "from": "i-123", "to": "v-123"})
7575

7676
# one instance is persisted
7777
assert set(parquet_writer.batches["some_instance"].rows[0].values()) == {
@@ -102,7 +102,7 @@ def test_update(parquet_writer: ArrowWriter) -> None:
102102
assert set(parquet_writer.batches["link_some_instance_some_volume"].rows[0].values()) == {"i-123", "v-123"}
103103

104104
# write the batch when the batch size is reached
105-
parquet_writer.insert_node({"type": "edge", "from": "i-123", "to": "v-123"}) # type: ignore
105+
parquet_writer.insert_node({"type": "edge", "from": "i-123", "to": "v-123"})
106106
assert len(parquet_writer.batches["link_some_instance_some_volume"].rows) == 0
107107

108108
# flush the batches and close the writer

tests/arrow/writer_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def test_normalize() -> None:
4141

4242
normalize(NormalizationPath(path=["bar", None], convert_to=ParquetMap(convert_values_to_str=True)), object)
4343

44-
assert object["bar"] == [[("a", "b"), ("c", "d")], [("a", "b"), ("c", "d")]]
44+
assert object["bar"] == [[("a", "b"), ("c", "d")], [("a", "b"), ("c", "d")]] # type: ignore
4545

4646
normalize(NormalizationPath(path=["foobar"], convert_to=ParquetString()), object)
4747

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from queue import Queue
2-
from typing import List
2+
from typing import List, Iterator
33

44
from resotoclient.models import Model, Kind, Property
55
from pytest import fixture
@@ -71,7 +71,7 @@ def updater(model: Model) -> SqlDefaultUpdater:
7171

7272

7373
@fixture()
74-
def parquet_writer(model: Model):
74+
def parquet_writer(model: Model) -> Iterator[ArrowWriter]:
7575
parquet_model = ArrowModel(model, "parquet")
7676
parquet_model.create_schema([])
7777

0 commit comments

Comments
 (0)