Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ requirements: minimum-requirements dev-requirements requirements-test requiremen
.PHONY: ci-install
ci-install:
@pip install --upgrade pip
@pip install cmake
@pip install "cassandra-driver==3.24.0"
@python -m pip install -U -r requirements.test.txt -r requirements.lint.txt -r requirements.dev.txt -r requirements.txt -t ./pip/deps --cache-dir ./pip/cache

.PHONY: tests
Expand Down
10 changes: 8 additions & 2 deletions butterfree/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
"""Holds connection clients."""

from butterfree.clients.abstract_client import AbstractClient
from butterfree.clients.cassandra_client import CassandraClient
from butterfree.clients.spark_client import SparkClient

__all__ = ["SparkClient", "CassandraClient", "AbstractClient"]
__all__ = ["SparkClient", "AbstractClient"]

try:
from butterfree.clients.cassandra_client import CassandraClient # noqa: F401

__all__.append("CassandraClient")
except ImportError:
pass
30 changes: 18 additions & 12 deletions butterfree/clients/cassandra_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@
from ssl import CERT_REQUIRED, PROTOCOL_TLSv1
from typing import Dict, List, Optional, Union

from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import (
EXEC_PROFILE_DEFAULT,
Cluster,
ExecutionProfile,
ResponseFuture,
Session,
)
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.query import ConsistencyLevel, dict_factory
from typing_extensions import TypedDict

try:
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import (
EXEC_PROFILE_DEFAULT,
Cluster,
ExecutionProfile,
ResponseFuture,
Session,
)
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.query import ConsistencyLevel, dict_factory
except ModuleNotFoundError as e:
e.msg = "Cassandra not found. To be able to use this module,you must install butterfree[cassandra] or install cassandra-driver manually." # noqa: E501
raise


from butterfree.clients import AbstractClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -168,7 +174,7 @@ def get_schema(

if not response:
raise RuntimeError(
f"No columns found for table: {table}" f"in key space: {self.keyspace}"
f"No columns found for table: {table}in key space: {self.keyspace}"
)

return response
Expand Down Expand Up @@ -198,7 +204,7 @@ def _get_create_table_query(
else:
columns_str = joined_parsed_columns

query = f"CREATE TABLE {self.keyspace}.{table} " f"({columns_str}); "
query = f"CREATE TABLE {self.keyspace}.{table} ({columns_str}); "

return query

Expand Down
5 changes: 1 addition & 4 deletions butterfree/hooks/schema_compatibility/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
"""Holds Schema Compatibility Hooks definitions."""

from butterfree.hooks.schema_compatibility.cassandra_table_schema_compatibility_hook import ( # noqa
CassandraTableSchemaCompatibilityHook,
)
from butterfree.hooks.schema_compatibility.spark_table_schema_compatibility_hook import ( # noqa
SparkTableSchemaCompatibilityHook,
)

__all__ = ["SparkTableSchemaCompatibilityHook", "CassandraTableSchemaCompatibilityHook"]
__all__ = ["SparkTableSchemaCompatibilityHook"]
11 changes: 5 additions & 6 deletions butterfree/load/writers/online_feature_store_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from butterfree.configs.db import AbstractWriteConfig, CassandraConfig
from butterfree.constants.columns import TIMESTAMP_COLUMN
from butterfree.hooks import Hook
from butterfree.hooks.schema_compatibility import CassandraTableSchemaCompatibilityHook
from butterfree.load.writers.writer import Writer
from butterfree.transform import FeatureSet

Expand Down Expand Up @@ -270,9 +269,9 @@ def check_schema(
table_name: table name where the dataframe will be saved.
database: database name where the dataframe will be saved.
"""
if not self.check_schema_hook:
self.check_schema_hook = CassandraTableSchemaCompatibilityHook(
client, table_name
)
if self.check_schema_hook:
return self.check_schema_hook.run(dataframe)

return self.check_schema_hook.run(dataframe)
raise NotImplementedError(
"Schema check hook not implemented for Online Feature Store Writer"
)
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
cassandra-driver==3.24.0
mdutils>=1.2.2,<1.7
pandas>=0.24,<2.0
parameters-validation>=1.1.5,<2.0
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
license="Copyright",
author="QuintoAndar",
install_requires=requirements,
extras_require={"h3": ["h3>=3.7.4,<4"]},
extras_require={
"h3": ["h3>=3.7.4,<4"],
"cassandra": ["cassandra-driver==3.24.0"],
},
python_requires=">=3.9, <4",
entry_points={"console_scripts": ["butterfree=butterfree._cli.main:app"]},
include_package_data=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import pytest

from butterfree.clients import CassandraClient
from butterfree.hooks.schema_compatibility import CassandraTableSchemaCompatibilityHook
from butterfree.hooks.schema_compatibility.cassandra_table_schema_compatibility_hook import ( # noqa: E501
CassandraTableSchemaCompatibilityHook,
)


class TestCassandraTableSchemaCompatibilityHook:
Expand Down