Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-

# https://github.com/sphinx-doc/sphinx/issues/6211
import luigi

Expand Down
8 changes: 5 additions & 3 deletions gokart/build.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

import enum
import logging
import sys
from dataclasses import dataclass
from functools import partial
from logging import getLogger
from typing import Literal, Optional, Protocol, TypeVar, cast, overload
from typing import Literal, Protocol, TypeVar, cast, overload

import backoff
import luigi
Expand Down Expand Up @@ -62,7 +64,7 @@ def add(self, task: TaskOnKart) -> bool: ...

def run(self) -> bool: ...

def __enter__(self) -> 'WorkerProtocol': ...
def __enter__(self) -> WorkerProtocol: ...

def __exit__(self, type, value, traceback) -> Literal[False]: ...

Expand Down Expand Up @@ -162,7 +164,7 @@ def build(
task_lock_exception_max_wait_seconds: int = 600,
task_dump_config: TaskDumpConfig = TaskDumpConfig(),
**env_params,
) -> Optional[T]:
) -> T | None:
"""
Run gokart task for local interpreter.
Sharing the most of its parameters with luigi.build (see https://luigi.readthedocs.io/en/stable/api/luigi.html?highlight=build#luigi.build)
Expand Down
2 changes: 2 additions & 0 deletions gokart/build_process_task_info.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import io

import gokart
Expand Down
12 changes: 6 additions & 6 deletions gokart/config_params.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Dict, Optional, Type
from __future__ import annotations

import luigi

import gokart


class inherits_config_params:
def __init__(self, config_class: Type[luigi.Config], parameter_alias: Optional[Dict[str, str]] = None):
def __init__(self, config_class: type[luigi.Config], parameter_alias: dict[str, str] | None = None):
"""
Decorates task to inherit parameter value of `config_class`.

Expand All @@ -15,10 +15,10 @@ def __init__(self, config_class: Type[luigi.Config], parameter_alias: Optional[D
key: config_class's parameter name. value: decorated task's parameter name.
"""

self._config_class: Type[luigi.Config] = config_class
self._parameter_alias: Dict[str, str] = parameter_alias if parameter_alias is not None else {}
self._config_class: type[luigi.Config] = config_class
self._parameter_alias: dict[str, str] = parameter_alias if parameter_alias is not None else {}

def __call__(self, task_class: Type[gokart.TaskOnKart]):
def __call__(self, task_class: type[gokart.TaskOnKart]):
# wrap task to prevent task name from being changed
@luigi.task._task_wraps(task_class)
class Wrapped(task_class): # type: ignore
Expand All @@ -29,6 +29,6 @@ def get_param_values(cls, params, args, kwargs):

if hasattr(cls, task_param_key) and task_param_key not in kwargs:
kwargs[task_param_key] = param_value
return super(Wrapped, cls).get_param_values(params, args, kwargs)
return super().get_param_values(params, args, kwargs)

return Wrapped
24 changes: 13 additions & 11 deletions gokart/conflict_prevention_lock/task_lock.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import functools
import os
from logging import getLogger
from typing import NamedTuple, Optional
from typing import NamedTuple

import redis
from apscheduler.schedulers.background import BackgroundScheduler
Expand All @@ -10,9 +12,9 @@


class TaskLockParams(NamedTuple):
redis_host: Optional[str]
redis_port: Optional[int]
redis_timeout: Optional[int]
redis_host: str | None
redis_port: int | None
redis_timeout: int | None
redis_key: str
should_task_lock: bool
raise_task_lock_exception_on_collision: bool
Expand All @@ -31,10 +33,10 @@ def __new__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = {}
if key not in cls._instances[cls]:
cls._instances[cls][key] = super(RedisClient, cls).__new__(cls)
cls._instances[cls][key] = super().__new__(cls)
return cls._instances[cls][key]

def __init__(self, host: Optional[str], port: Optional[int]) -> None:
def __init__(self, host: str | None, port: int | None) -> None:
if not hasattr(self, '_redis_client'):
host = host or 'localhost'
port = port or 6379
Expand Down Expand Up @@ -72,17 +74,17 @@ def set_lock_scheduler(task_lock: redis.lock.Lock, task_lock_params: TaskLockPar
return scheduler


def make_task_lock_key(file_path: str, unique_id: Optional[str]):
def make_task_lock_key(file_path: str, unique_id: str | None):
basename_without_ext = os.path.splitext(os.path.basename(file_path))[0]
return f'{basename_without_ext}_{unique_id}'


def make_task_lock_params(
file_path: str,
unique_id: Optional[str],
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_timeout: Optional[int] = None,
unique_id: str | None,
redis_host: str | None = None,
redis_port: int | None = None,
redis_timeout: int | None = None,
raise_task_lock_exception_on_collision: bool = False,
lock_extend_seconds: int = 10,
) -> TaskLockParams:
Expand Down
2 changes: 2 additions & 0 deletions gokart/conflict_prevention_lock/task_lock_wrappers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import functools
from logging import getLogger
from typing import Any, Callable
Expand Down
15 changes: 8 additions & 7 deletions gokart/file_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import os
import xml.etree.ElementTree as ET
from abc import abstractmethod
from io import BytesIO
from logging import getLogger
from typing import Optional

import dill
import luigi
Expand All @@ -20,7 +21,7 @@
logger = getLogger(__name__)


class FileProcessor(object):
class FileProcessor:
@abstractmethod
def format(self):
pass
Expand Down Expand Up @@ -56,7 +57,7 @@ def dump(self, obj, file):
file.write(obj)


class _ChunkedLargeFileReader(object):
class _ChunkedLargeFileReader:
def __init__(self, file) -> None:
self._file = file

Expand Down Expand Up @@ -125,7 +126,7 @@ class CsvFileProcessor(FileProcessor):
def __init__(self, sep=',', encoding: str = 'utf-8'):
self._sep = sep
self._encoding = encoding
super(CsvFileProcessor, self).__init__()
super().__init__()

def format(self):
return TextFormat(encoding=self._encoding)
Expand Down Expand Up @@ -157,7 +158,7 @@ def dump(self, obj, file):


class JsonFileProcessor(FileProcessor):
def __init__(self, orient: Optional[str] = None):
def __init__(self, orient: str | None = None):
self._orient = orient

def format(self):
Expand Down Expand Up @@ -209,7 +210,7 @@ class ParquetFileProcessor(FileProcessor):
def __init__(self, engine='pyarrow', compression=None):
self._engine = engine
self._compression = compression
super(ParquetFileProcessor, self).__init__()
super().__init__()

def format(self):
return luigi.format.Nop
Expand All @@ -232,7 +233,7 @@ def dump(self, obj, file):

class FeatherFileProcessor(FileProcessor):
def __init__(self, store_index_in_feather: bool):
super(FeatherFileProcessor, self).__init__()
super().__init__()
self._store_index_in_feather = store_index_in_feather
self.INDEX_COLUMN_PREFIX = '__feather_gokart_index__'

Expand Down
5 changes: 3 additions & 2 deletions gokart/gcs_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
import os
from typing import Optional

import luigi
import luigi.contrib.gcs
Expand All @@ -19,7 +20,7 @@ def get_gcs_client(self) -> luigi.contrib.gcs.GCSClient:
def _get_gcs_client(self) -> luigi.contrib.gcs.GCSClient:
return luigi.contrib.gcs.GCSClient(oauth_credentials=self._load_oauth_credentials())

def _load_oauth_credentials(self) -> Optional[Credentials]:
def _load_oauth_credentials(self) -> Credentials | None:
json_str = os.environ.get(self.gcs_credential_name)
if not json_str:
return None
Expand Down
4 changes: 2 additions & 2 deletions gokart/gcs_obj_metadata_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import copy
import re
from logging import getLogger
from typing import Any, Union
from typing import Any
from urllib.parse import urlsplit

from googleapiclient.model import makepatch
Expand Down Expand Up @@ -84,7 +84,7 @@ def _get_patched_obj_metadata(
metadata: Any,
task_params: dict[str, str] | None = None,
custom_labels: dict[str, Any] | None = None,
) -> Union[dict, Any]:
) -> dict | Any:
# If metadata from response when getting bucket and object information is not dictionary,
# something wrong might be happened, so return original metadata, no patched.
if not isinstance(metadata, dict):
Expand Down
2 changes: 2 additions & 0 deletions gokart/gcs_zip_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import os
import shutil

Expand Down
4 changes: 3 additions & 1 deletion gokart/in_memory/data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from typing import Any
Expand All @@ -9,5 +11,5 @@ class InMemoryData:
last_modification_time: datetime

@classmethod
def create_data(self, value: Any) -> 'InMemoryData':
def create_data(self, value: Any) -> InMemoryData:
return InMemoryData(value=value, last_modification_time=datetime.now())
5 changes: 4 additions & 1 deletion gokart/in_memory/repository.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Any, Iterator
from __future__ import annotations

from collections.abc import Iterator
from typing import Any

from .data import InMemoryData

Expand Down
9 changes: 5 additions & 4 deletions gokart/info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from logging import getLogger
from typing import List, Optional, Set

import luigi

Expand All @@ -15,8 +16,8 @@ def make_tree_info(
last: bool = True,
details: bool = False,
abbr: bool = True,
visited_tasks: Optional[Set[str]] = None,
ignore_task_names: Optional[List[str]] = None,
visited_tasks: set[str] | None = None,
ignore_task_names: list[str] | None = None,
) -> str:
"""
Return a string representation of the tasks, their statuses/parameters in a dependency tree format
Expand All @@ -32,7 +33,7 @@ def make_tree_info(
Whether or not to output details.
- abbr: bool
Whether or not to simplify tasks information that has already appeared.
- ignore_task_names: Optional[List[str]]
- ignore_task_names: list[str] | None
List of task names to ignore.
Returns
-------
Expand Down
13 changes: 7 additions & 6 deletions gokart/mypy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from __future__ import annotations

import re
from typing import Callable, Final, Iterator, Literal, Optional
from collections.abc import Iterator
from typing import Callable, Final, Literal

import luigi
from mypy.expandtype import expand_type
Expand Down Expand Up @@ -233,7 +234,7 @@ def _get_assignment_statements_from_block(self, block: Block) -> Iterator[Assign
elif isinstance(stmt, IfStmt):
yield from self._get_assignment_statements_from_if_statement(stmt)

def collect_attributes(self) -> Optional[list[TaskOnKartAttribute]]:
def collect_attributes(self) -> list[TaskOnKartAttribute] | None:
"""Collect all attributes declared in the task and its parents.

All assignments of the form
Expand Down Expand Up @@ -360,7 +361,7 @@ def _collect_parameter_args(self, expr: Expression) -> tuple[bool, dict[str, Exp
return True, args
return False, {}

def _infer_type_from_parameters(self, parameter: Expression) -> Optional[Type]:
def _infer_type_from_parameters(self, parameter: Expression) -> Type | None:
"""
Generate default type from Parameter.
For example, when parameter is `luigi.parameter.Parameter`, this method should return `str` type.
Expand All @@ -369,7 +370,7 @@ def _infer_type_from_parameters(self, parameter: Expression) -> Optional[Type]:
if parameter_name is None:
return None

underlying_type: Optional[Type] = None
underlying_type: Type | None = None
if parameter_name in ['luigi.parameter.Parameter', 'luigi.parameter.OptionalParameter']:
underlying_type = self._api.named_type('builtins.str', [])
elif parameter_name in ['luigi.parameter.IntParameter', 'luigi.parameter.OptionalIntParameter']:
Expand Down Expand Up @@ -422,7 +423,7 @@ def _infer_type_from_parameters(self, parameter: Expression) -> Optional[Type]:

return underlying_type

def _get_type_from_args(self, parameter: Expression, arg_key: str) -> Optional[Type]:
def _get_type_from_args(self, parameter: Expression, arg_key: str) -> Type | None:
"""
get type from parameter arguments.

Expand Down Expand Up @@ -452,7 +453,7 @@ def is_parameter_call(expr: Expression) -> bool:
return PARAMETER_FULLNAME_MATCHER.match(parameter_name) is not None


def _extract_parameter_name(expr: Expression) -> Optional[str]:
def _extract_parameter_name(expr: Expression) -> str | None:
"""Extract name if the expression is a call to luigi.Parameter()"""
if not isinstance(expr, CallExpr):
return None
Expand Down
4 changes: 3 additions & 1 deletion gokart/object_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from datetime import datetime

import luigi
Expand All @@ -14,7 +16,7 @@
object_storage_path_prefix = ['s3://', 'gs://']


class ObjectStorage(object):
class ObjectStorage:
@staticmethod
def if_object_storage_path(path: str) -> bool:
for prefix in object_storage_path_prefix:
Expand Down
Loading