-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig.py
More file actions
162 lines (118 loc) · 4.94 KB
/
config.py
File metadata and controls
162 lines (118 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from pydantic import BaseModel, Field, model_validator
from typing import Annotated, Literal, Union, Optional
class DatabaseBase(BaseModel):
database: str
user: str
password: str
class MssqlDatabaseConfig(DatabaseBase):
type: Literal["mssql"]
server: str
trust_server_certificate: bool = False
class FirebirdDatabaseConfig(DatabaseBase):
type: Literal["firebird"]
host: str
port: int
charset: str = "UTF8"
DatabaseConfig = Annotated[Union[MssqlDatabaseConfig, FirebirdDatabaseConfig], Field(discriminator="type")]
class FrappeAuthConfig(BaseModel):
api_key: str
api_secret: str
class FrappeConfig(FrappeAuthConfig):
limit_page_length: int = 20
url: str # without trailing slash
class TaskFrappeBase(BaseModel):
modified_fields: Annotated[list[str], Field(min_length=1)] = ["modified"]
id_field: Literal["name"] = "name"
# All Date fields that will be converted to datetime after get (due to json)
datetime_fields: list[str] = []
# All fields that will get parsed to int (due to frappe int fields have default 0, so int are sometimes stored in 'Data' Fields (string))
int_fields: list[str] = []
def model_post_init(self, __context):
for modified_field in self.modified_fields:
if modified_field not in self.datetime_fields:
self.datetime_fields.append(modified_field)
class TaskFrappeBidirectional(TaskFrappeBase):
fk_id_field: str
class TaskDbBase(BaseModel):
modified_fields: list[str]
class TaskDbFrappeToDb(TaskDbBase):
manual_id_sequence: bool = False
manual_id_sequence_max: Optional[int] = None
id_field: str
class TaskDbBidirectional(TaskDbFrappeToDb):
fk_id_field: str
modified_fields: Annotated[list[str], Field(min_length=1)]
class TaskBase(BaseModel):
doc_type: str
db_name: str
mapping: dict[str, str]
key_fields: Annotated[list[str], Field(min_length=1)]
frappe: Optional[TaskFrappeBase] = None
db: Optional[TaskDbBase] = None
value_mapping: dict[str, dict[str | int, str | int]] = {}
table_name: Optional[str] = None
query: Optional[str] = None
query_with_timestamp: Optional[str] = None
create_new: bool = True
use_last_sync_date: bool = True
use_strict_value_mapping: bool = False
@model_validator(mode="after")
def check_key_fields_in_mapping(self) -> "TaskBase":
# Wir nutzen hier die in der TaskMapping definierten Felder
mapping_keys = self.mapping.keys()
missing_keys = [key for key in self.key_fields if key not in mapping_keys]
if missing_keys:
raise ValueError(f"Die folgenden key_fields fehlen im mapping: {missing_keys}")
return self
@model_validator(mode="after")
def check_required_fields(self) -> "TaskBase":
if self.use_last_sync_date:
if self.frappe is None:
raise ValueError(
"Die Frappe-Konfiguration 'frappe' muss angegeben werden, wenn 'use_last_sync_date' True ist."
)
if self.db is None:
raise ValueError("Die DB-Konfiguration 'db' muss angegeben werden, wenn 'use_last_sync_date' True ist.")
if self.query is not None and self.query_with_timestamp is None:
raise ValueError(
"'query_with_timestamp' muss angegeben werden, wenn 'use_last_sync_date' True ist und 'query' genutzt wird."
)
return self
class BidirectionalTaskConfig(TaskBase):
direction: Literal["bidirectional"]
table_name: str
frappe: TaskFrappeBidirectional
db: TaskDbBidirectional
delete: bool = True
datetime_comparison_accuracy_milliseconds: int = 100
class DbToFrappeTaskConfig(TaskBase):
direction: Literal["db_to_frappe"]
process_all: bool = True
@model_validator(mode="after")
def validate_table_or_query(self) -> "DbToFrappeTaskConfig":
if self.table_name is None and self.query is None:
raise ValueError("Entweder 'table_name' oder 'query' muss angegeben werden.")
return self
class FrappeToDbTaskConfig(TaskBase):
direction: Literal["frappe_to_db"]
table_name: str
db: TaskDbFrappeToDb
TaskConfig = Annotated[
Union[BidirectionalTaskConfig, DbToFrappeTaskConfig, FrappeToDbTaskConfig], Field(discriminator="direction")
]
class Config(BaseModel):
databases: dict[str, DatabaseConfig]
frappe: FrappeConfig
tasks: dict[str, TaskConfig]
dry_run: bool = False
timestamp_file: str = "data.db"
timestamp_buffer_seconds: int = 15
max_success_runs_per_task: Optional[int] = Field(default=None, ge=0)
max_error_runs_per_task: Optional[int] = Field(default=None, ge=0)
import json
from pathlib import Path
if __name__ == "__main__":
schema = Config.model_json_schema()
schema_path = Path("config.schema.json")
schema_path.write_text(json.dumps(schema, indent=2, ensure_ascii=False))
print(f"Schema geschrieben nach {schema_path.resolve()}")