-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathjinja.py
More file actions
156 lines (128 loc) · 6.65 KB
/
jinja.py
File metadata and controls
156 lines (128 loc) · 6.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import ast
from functools import cache
from typing import Any, Mapping, Optional, Set, Tuple, Type
from jinja2 import meta
from jinja2.environment import Template
from jinja2.exceptions import UndefinedError
from jinja2.sandbox import SandboxedEnvironment
from airbyte_cdk.sources.declarative.interpolation.filters import filters
from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation
from airbyte_cdk.sources.declarative.interpolation.macros import macros
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
STREAM_STATE_DEPRECATION_MESSAGE = (
"Using 'stream_state' in interpolation is no longer supported as it is not thread-safe. "
"Please use 'stream_interval' for incremental sync values or 'stream_partition' for partition router values instead."
)
class StreamPartitionAccessEnvironment(SandboxedEnvironment):
"""
Currently, source-jira is setting an attribute to StreamSlice specific to its use case which because of the PerPartitionCursor is set to
StreamSlice._partition but not exposed through StreamSlice.partition. This is a patch to still allow source-jira to have access to this
parameter
"""
def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:
if attr in ["_partition"]:
return True
return super().is_safe_attribute(obj, attr, value) # type: ignore # for some reason, mypy says 'Returning Any from function declared to return "bool"'
# These aliases are used to deprecate existing keywords without breaking all existing connectors.
_ALIASES = {
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
}
# These extensions are not installed so they're not currently a problem,
# but we're still explicitly removing them from the jinja context.
# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks
_RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops
# By default, these Python builtin functions are available in the Jinja context.
# We explicitly remove them because of the potential security risk.
# Please add a unit test to test_jinja.py when adding a restriction.
_RESTRICTED_BUILTIN_FUNCTIONS = [
"range"
] # The range function can cause very expensive computations
_ENVIRONMENT = StreamPartitionAccessEnvironment()
_ENVIRONMENT.filters.update(**filters)
_ENVIRONMENT.globals.update(**macros)
for extension in _RESTRICTED_EXTENSIONS:
_ENVIRONMENT.extensions.pop(extension, None)
for builtin in _RESTRICTED_BUILTIN_FUNCTIONS:
_ENVIRONMENT.globals.pop(builtin, None)
class JinjaInterpolation(Interpolation):
"""
Interpolation strategy using the Jinja2 template engine.
If the input string is a raw string, the interpolated string will be the same.
`eval("hello world") -> "hello world"`
The engine will evaluate the content passed within {{}}, interpolating the keys from the config and context-specific arguments.
`eval("hello {{ name }}", name="airbyte") -> "hello airbyte")`
`eval("hello {{ config.name }}", config={"name": "airbyte"}) -> "hello airbyte")`
In additional to passing additional values through the kwargs argument, macros can be called from within the string interpolation.
For example,
"{{ max(2, 3) }}" will return 3
Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/#
"""
def eval(
self,
input_str: str,
config: Config,
default: Optional[str] = None,
valid_types: Optional[Tuple[Type[Any]]] = None,
**additional_parameters: Any,
) -> Any:
if isinstance(input_str, str) and "stream_state" in input_str:
raise AirbyteTracedException(STREAM_STATE_DEPRECATION_MESSAGE)
context = {"config": config, **additional_parameters}
for alias, equivalent in _ALIASES.items():
if alias in context:
# This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises
raise ValueError(
f"Found reserved keyword {alias} in interpolation context. This is unexpected and indicative of a bug in the CDK."
)
elif equivalent in context:
context[alias] = context[equivalent]
try:
if isinstance(input_str, str):
result = self._eval(input_str, context)
if result:
return self._literal_eval(result, valid_types)
else:
# If input is not a string, return it as is
raise Exception(f"Expected a string, got {input_str}")
except UndefinedError:
pass
# If result is empty or resulted in an undefined error, evaluate and return the default string
return self._literal_eval(self._eval(default, context), valid_types)
def _literal_eval(self, result: Optional[str], valid_types: Optional[Tuple[Type[Any]]]) -> Any:
try:
evaluated = ast.literal_eval(result) # type: ignore # literal_eval is able to handle None
except (ValueError, SyntaxError):
return result
if not valid_types or (valid_types and isinstance(evaluated, valid_types)):
return evaluated
return result
def _eval(self, s: Optional[str], context: Mapping[str, Any]) -> Optional[str]:
try:
undeclared = self._find_undeclared_variables(s)
undeclared_not_in_context = {var for var in undeclared if var not in context}
if undeclared_not_in_context:
raise ValueError(
f"Jinja macro has undeclared variables: {undeclared_not_in_context}. Context: {context}"
)
return self._compile(s).render(context) # type: ignore # from_string is able to handle None
except TypeError:
# The string is a static value, not a jinja template
# It can be returned as is
return s
@cache
def _find_undeclared_variables(self, s: Optional[str]) -> Set[str]:
"""
Find undeclared variables and cache them
"""
ast = _ENVIRONMENT.parse(s) # type: ignore # parse is able to handle None
return meta.find_undeclared_variables(ast)
@cache
def _compile(self, s: str) -> Template:
"""
We must cache the Jinja Template ourselves because we're using `from_string` instead of a template loader
"""
return _ENVIRONMENT.from_string(s)