Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import warnings
from pydantic import validate_call, Field, StrictFloat, StrictStr, StrictInt
from typing import Any, Dict, List, Optional, Tuple, Union
from typing_extensions import Annotated
from typing import Annotated

{{#imports}}
{{import}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class ApiClient:
return_data = self.__deserialize_file(response_data)
elif response_type is not None:
match = None
content_type = response_data.headers.get('content-type')
content_type = response_data.getheader('content-type')
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
encoding = match.group(1) if match else "utf-8"
Expand All @@ -338,7 +338,7 @@ class ApiClient:
return ApiResponse(
status_code = response_data.status,
data = return_data,
headers = response_data.headers,
headers = response_data.getheaders(),
raw_data = response_data.data
)

Expand Down Expand Up @@ -389,13 +389,10 @@ class ApiClient:
# and attributes which value is not None.
# Convert attribute name to json key in
# model definition for request.
if hasattr(obj, 'to_dict') and callable(getattr(obj, 'to_dict')):
obj_dict = obj.to_dict()
else:
obj_dict = obj.__dict__
obj_dict = obj.model_dump(by_alias=True, exclude_none=True)

if isinstance(obj_dict, list):
# here we handle instances that can either be a list or something else, and only became a real list by calling to_dict()
# here we handle instances that can either be a list or something else, and only became a real list by calling model_dump(by_alias=True)
return self.sanitize_for_serialization(obj_dict)

return {
Expand Down Expand Up @@ -719,7 +716,7 @@ class ApiClient:
os.close(fd)
os.remove(path)

content_disposition = response.headers.get("Content-Disposition")
content_disposition = response.getheader("Content-Disposition")
if content_disposition:
m = re.search(
r'filename=[\'"]?([^\'"\s]+)[\'"]?',
Comment on lines +719 to 722
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Downloaded file path uses unsanitized Content-Disposition filename, allowing path traversal writes outside temp directory.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At modules/openapi-generator/src/main/resources/python/api_client.mustache, line 719:

<comment>Downloaded file path uses unsanitized Content-Disposition filename, allowing path traversal writes outside temp directory.</comment>

<file context>
@@ -719,7 +716,7 @@ class ApiClient:
         os.remove(path)
 
-        content_disposition = response.headers.get("Content-Disposition")
+        content_disposition = response.getheader("Content-Disposition")
         if content_disposition:
             m = re.search(
</file context>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

content_disposition = response.headers.get("Content-Disposition")
VS
content_disposition = response.getheader("Content-Disposition")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@padznich The difference between these two approaches is about the API used to access HTTP headers:

response.headers.get("Content-Disposition") (old code)

  • Treats headers as a dict-like object
  • Used with urllib3 v1.x where response.headers was an HTTPHeaderDict

response.getheader("Content-Disposition") (new code)

  • Calls a method on the response object
  • Used with urllib3 v2.x where the response object provides getheader() and getheaders() methods

Your PR correctly updates this to getheader() to be compatible with urllib3 >= 2.1.0 (as specified in your pyproject.mustache). This is the right change.


However, my P1 review comment is still valid — it's about a security issue, not the API change:

The code extracts a filename from the Content-Disposition header and uses it directly in a file path:

filename = m.group(1)
path = os.path.join(os.path.dirname(path), filename)

If a malicious server returns Content-Disposition: attachment; filename="../../etc/passwd", this could write outside the temp directory. You should sanitize the filename:

filename = os.path.basename(m.group(1))  # Strip any directory traversal
path = os.path.join(os.path.dirname(path), filename)

This prevents path traversal attacks regardless of whether you use .get() or .getheader().

Expand Down Expand Up @@ -819,4 +816,4 @@ class ApiClient:
:return: model object.
"""

return klass.from_dict(data)
return klass.model_validate(data)
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ from logging import FileHandler
import multiprocessing
{{/async}}
import sys
from typing import Any, ClassVar, Dict, List, Literal, Optional, TypedDict, Union
from typing_extensions import NotRequired, Self
from typing import Any, ClassVar, Dict, List, Literal, NotRequired, Optional, TypedDict, Union, Self

{{^async}}
import urllib3
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# coding: utf-8

{{>partial_header}}

from typing import Any, Optional
from typing_extensions import Self
from typing import Any, Optional, Self

class OpenApiException(Exception):
"""The base exception class for all OpenAPIExceptions"""
Expand Down Expand Up @@ -95,9 +92,9 @@ class ApiKeyError(OpenApiException, KeyError):
class ApiException(OpenApiException):

def __init__(
self,
status=None,
reason=None,
self,
status=None,
reason=None,
http_resp=None,
*,
body: Optional[str] = None,
Expand All @@ -119,14 +116,14 @@ class ApiException(OpenApiException):
self.body = http_resp.data.decode('utf-8')
except Exception:
pass
self.headers = http_resp.headers
self.headers = http_resp.getheaders()

@classmethod
def from_response(
cls,
*,
http_resp,
body: Optional[str],
cls,
*,
http_resp,
body: Optional[str],
data: Optional[Any],
) -> Self:
if http_resp.status == 400:
Expand Down Expand Up @@ -160,11 +157,8 @@ class ApiException(OpenApiException):
error_message += "HTTP response headers: {0}\n".format(
self.headers)

if self.body:
error_message += "HTTP response body: {0}\n".format(self.body)

if self.data:
error_message += "HTTP response data: {0}\n".format(self.data)
if self.data or self.body:
error_message += "HTTP response body: {0}\n".format(self.data or self.body)
Comment on lines +160 to +161
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Using or operator to select between data and body fails for falsy but valid data values like empty dicts/lists. If self.data is {} or [], it will incorrectly show self.body instead. Consider using explicit None checks:

if self.data is not None or self.body:
    error_message += "HTTP response body: {0}\n".format(self.data if self.data is not None else self.body)
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At modules/openapi-generator/src/main/resources/python/exceptions.mustache, line 160:

<comment>Using `or` operator to select between `data` and `body` fails for falsy but valid data values like empty dicts/lists. If `self.data` is `{}` or `[]`, it will incorrectly show `self.body` instead. Consider using explicit `None` checks:
```python
if self.data is not None or self.body:
    error_message += &quot;HTTP response body: {0}\n&quot;.format(self.data if self.data is not None else self.body)
```</comment>

<file context>
@@ -160,11 +157,8 @@ class ApiException(OpenApiException):
-
-        if self.data:
-            error_message += &quot;HTTP response data: {0}\n&quot;.format(self.data)
+        if self.data or self.body:
+            error_message += &quot;HTTP response body: {0}\n&quot;.format(self.data or self.body)
 
</file context>
Suggested change
if self.data or self.body:
error_message += "HTTP response body: {0}\n".format(self.data or self.body)
if self.data is not None or self.body:
error_message += "HTTP response body: {0}\n".format(self.data if self.data is not None else self.body)
Fix with Cubic


return error_message

Expand Down
170 changes: 18 additions & 152 deletions modules/openapi-generator/src/main/resources/python/model_anyof.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -9,169 +9,35 @@ import re # noqa: F401
{{#vendorExtensions.x-py-model-imports}}
{{{.}}}
{{/vendorExtensions.x-py-model-imports}}
from typing import Union, Any, List, Set, TYPE_CHECKING, Optional, Dict
from typing_extensions import Literal, Self
from pydantic import Field
from typing import Union, Any, List, Set, TYPE_CHECKING, Optional, Dict, Literal, Self
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Unused imports added/left over; symbols like Literal, Self, getfullargspec, json, pprint, TYPE_CHECKING, etc. are not used in the generated anyOf model, causing unused-import lint failures.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At modules/openapi-generator/src/main/resources/python/model_anyof.mustache, line 12:

<comment>Unused imports added/left over; symbols like Literal, Self, getfullargspec, json, pprint, TYPE_CHECKING, etc. are not used in the generated anyOf model, causing unused-import lint failures.</comment>

<file context>
@@ -9,169 +9,35 @@ import re  # noqa: F401
-from typing import Union, Any, List, Set, TYPE_CHECKING, Optional, Dict
-from typing_extensions import Literal, Self
-from pydantic import Field
+from typing import Union, Any, List, Set, TYPE_CHECKING, Optional, Dict, Literal, Self
+from pydantic import Field, RootModel
 
</file context>

from pydantic import Field, RootModel

{{#lambda.uppercase}}{{{classname}}}{{/lambda.uppercase}}_ANY_OF_SCHEMAS = [{{#anyOf}}"{{.}}"{{^-last}}, {{/-last}}{{/anyOf}}]

class {{classname}}({{#parent}}{{{.}}}{{/parent}}{{^parent}}BaseModel{{/parent}}):

class {{classname}}(RootModel[Union[{{#anyOf}}{{.}}{{^-last}}, {{/-last}}{{/anyOf}}]]):
"""
{{{description}}}{{^description}}{{{classname}}}{{/description}}
"""

{{#composedSchemas.anyOf}}
# data type: {{{dataType}}}
{{vendorExtensions.x-py-name}}: {{{vendorExtensions.x-py-typing}}}
{{/composedSchemas.anyOf}}
if TYPE_CHECKING:
actual_instance: Optional[Union[{{#anyOf}}{{{.}}}{{^-last}}, {{/-last}}{{/anyOf}}]] = None
else:
actual_instance: Any = None
any_of_schemas: Set[str] = { {{#anyOf}}"{{.}}"{{^-last}}, {{/-last}}{{/anyOf}} }

model_config = {
"validate_assignment": True,
"protected_namespaces": (),
}
{{#discriminator}}

discriminator_value_class_map: Dict[str, str] = {
{{#children}}
'{{^vendorExtensions.x-discriminator-value}}{{name}}{{/vendorExtensions.x-discriminator-value}}{{#vendorExtensions.x-discriminator-value}}{{{vendorExtensions.x-discriminator-value}}}{{/vendorExtensions.x-discriminator-value}}': '{{{classname}}}'{{^-last}},{{/-last}}
{{/children}}
}
{{/discriminator}}

def __init__(self, *args, **kwargs) -> None:
if args:
if len(args) > 1:
raise ValueError("If a position argument is used, only 1 is allowed to set `actual_instance`")
if kwargs:
raise ValueError("If a position argument is used, keyword arguments cannot be used.")
super().__init__(actual_instance=args[0])
else:
super().__init__(**kwargs)

@field_validator('actual_instance')
def actual_instance_must_validate_anyof(cls, v):
{{#isNullable}}
if v is None:
return v

{{/isNullable}}
instance = {{{classname}}}.model_construct()
error_messages = []
{{#composedSchemas.anyOf}}
# validate data type: {{{dataType}}}
{{#isContainer}}
try:
instance.{{vendorExtensions.x-py-name}} = v
return v
except (ValidationError, ValueError) as e:
error_messages.append(str(e))
{{/isContainer}}
{{^isContainer}}
{{#isPrimitiveType}}
try:
instance.{{vendorExtensions.x-py-name}} = v
return v
except (ValidationError, ValueError) as e:
error_messages.append(str(e))
{{/isPrimitiveType}}
{{^isPrimitiveType}}
if not isinstance(v, {{{dataType}}}):
error_messages.append(f"Error! Input type `{type(v)}` is not `{{{dataType}}}`")
else:
return v

{{/isPrimitiveType}}
{{/isContainer}}
{{/composedSchemas.anyOf}}
if error_messages:
# no match
raise ValueError("No match found when setting the actual_instance in {{{classname}}} with anyOf schemas: {{#anyOf}}{{{.}}}{{^-last}}, {{/-last}}{{/anyOf}}. Details: " + ", ".join(error_messages))
else:
return v

@classmethod
def from_dict(cls, obj: Dict[str, Any]) -> Self:
return cls.from_json(json.dumps(obj))

@classmethod
def from_json(cls, json_str: str) -> Self:
"""Returns the object represented by the json string"""
instance = cls.model_construct()
{{#isNullable}}
if json_str is None:
return instance

{{/isNullable}}
error_messages = []
{{#composedSchemas.anyOf}}
{{#isContainer}}
# deserialize data into {{{dataType}}}
try:
# validation
instance.{{vendorExtensions.x-py-name}} = json.loads(json_str)
# assign value to actual_instance
instance.actual_instance = instance.{{vendorExtensions.x-py-name}}
return instance
except (ValidationError, ValueError) as e:
error_messages.append(str(e))
{{/isContainer}}
{{^isContainer}}
{{#isPrimitiveType}}
# deserialize data into {{{dataType}}}
try:
# validation
instance.{{vendorExtensions.x-py-name}} = json.loads(json_str)
# assign value to actual_instance
instance.actual_instance = instance.{{vendorExtensions.x-py-name}}
return instance
except (ValidationError, ValueError) as e:
error_messages.append(str(e))
{{/isPrimitiveType}}
{{^isPrimitiveType}}
# {{vendorExtensions.x-py-name}}: {{{vendorExtensions.x-py-typing}}}
try:
instance.actual_instance = {{{dataType}}}.from_json(json_str)
return instance
except (ValidationError, ValueError) as e:
error_messages.append(str(e))
{{/isPrimitiveType}}
{{/isContainer}}
{{/composedSchemas.anyOf}}

if error_messages:
# no match
raise ValueError("No match found when deserializing the JSON string into {{{classname}}} with anyOf schemas: {{#anyOf}}{{{.}}}{{^-last}}, {{/-last}}{{/anyOf}}. Details: " + ", ".join(error_messages))
else:
return instance

def to_json(self) -> str:
"""Returns the JSON representation of the actual instance"""
if self.actual_instance is None:
return "null"
root: Union[{{#anyOf}}{{.}}{{^-last}}, {{/-last}}{{/anyOf}}] = Field(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Nullable anyOf schemas now reject null because the required root Union field no longer includes Optional/nullable handling.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At modules/openapi-generator/src/main/resources/python/model_anyof.mustache, line 23:

<comment>Nullable anyOf schemas now reject null because the required root Union field no longer includes Optional/nullable handling.</comment>

<file context>
@@ -9,169 +9,35 @@ import re  # noqa: F401
-        """Returns the JSON representation of the actual instance"""
-        if self.actual_instance is None:
-            return "null"
+    root: Union[{{#anyOf}}{{.}}{{^-last}}, {{/-last}}{{/anyOf}}] = Field(
+        ...{{#discriminator}}, discriminator="{{discriminatorName}}"{{/discriminator}}
+    )
</file context>
Suggested change
root: Union[{{#anyOf}}{{.}}{{^-last}}, {{/-last}}{{/anyOf}}] = Field(
root: {{#isNullable}}Optional[{{/isNullable}}Union[{{#anyOf}}{{.}}{{^-last}}, {{/-last}}{{/anyOf}}]{{#isNullable}}]{{/isNullable}} = Field(
{{#isNullable}}None{{/isNullable}}{{^isNullable}}...{{/isNullable}}{{#discriminator}}, discriminator="{{discriminatorName}}"{{/discriminator}}
)

...{{#discriminator}}, discriminator="{{discriminatorName}}"{{/discriminator}}
)

if hasattr(self.actual_instance, "to_json") and callable(self.actual_instance.to_json):
return self.actual_instance.to_json()
else:
return json.dumps(self.actual_instance)
def __getattr__(self, name):
"""
Delegate attribute access to the root model if the attribute
doesn't exist on the main class.
"""

def to_dict(self) -> Optional[Union[Dict[str, Any], {{#anyOf}}{{.}}{{^-last}}, {{/-last}}{{/anyOf}}]]:
"""Returns the dict representation of the actual instance"""
if self.actual_instance is None:
return None
if name in self.__dict__:
return super().__getattribute__(name)

if hasattr(self.actual_instance, "to_dict") and callable(self.actual_instance.to_dict):
return self.actual_instance.to_dict()
else:
return self.actual_instance
root = self.__dict__.get('root')
if root is not None:
return getattr(root, name)

def to_str(self) -> str:
"""Returns the string representation of the actual instance"""
return pprint.pformat(self.model_dump())
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")

{{#vendorExtensions.x-py-postponed-model-imports.size}}
{{#vendorExtensions.x-py-postponed-model-imports}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ from {{modelPackage}}.{{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}} im
# TODO update the JSON string below
json = "{}"
# create an instance of {{classname}} from a JSON string
{{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_instance = {{classname}}.from_json(json)
{{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_instance = {{classname}}.model_validate_json(json)
# print the JSON string representation of the object
print({{classname}}.to_json())
print({{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_instance.model_dump_json(by_alias=True, exclude_unset=True))

# convert the object into a dict
{{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_dict = {{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_instance.to_dict()
{{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_dict = {{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_instance.model_dump(by_alias=True)
# create an instance of {{classname}} from a dict
{{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_from_dict = {{classname}}.from_dict({{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_dict)
{{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_from_dict = {{classname}}.model_validate({{#lambda.snakecase}}{{classname}}{{/lambda.snakecase}}_dict)
```
{{/isEnum}}
{{#isEnum}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
from __future__ import annotations
import json
from enum import Enum
from enum import Enum, StrEnum, IntEnum
from typing import Self
{{#vendorExtensions.x-py-other-imports}}
{{{.}}}
{{/vendorExtensions.x-py-other-imports}}
from typing_extensions import Self


class {{classname}}({{vendorExtensions.x-py-enum-type}}, Enum):
{{#isString}}
class {{classname}}(StrEnum):
{{/isString}}
{{#isInteger}}
class {{classname}}(IntEnum):
{{/isInteger}}
{{^isString}}{{^isInteger}}
class {{classname}}(Enum):
{{/isInteger}}{{/isString}}
"""
{{{description}}}{{^description}}{{{classname}}}{{/description}}
"""

"""
allowed enum values
"""
{{#allowableValues}}
# Allowed enum values
{{#enumVars}}
{{{name}}} = {{{value}}}
{{/enumVars}}
Expand All @@ -24,13 +29,4 @@ class {{classname}}({{vendorExtensions.x-py-enum-type}}, Enum):
def from_json(cls, json_str: str) -> Self:
"""Create an instance of {{classname}} from a JSON string"""
return cls(json.loads(json_str))

{{#defaultValue}}

#
@classmethod
def _missing_value_(cls, value):
if value is no_arg:
return cls.{{{.}}}
{{/defaultValue}}
{{/allowableValues}}
Loading