@@ -18,6 +18,13 @@ def _env_ttl() -> int:
1818 return 0
1919
2020
21+ def _env_write_timeout () -> int :
22+ try :
23+ return int (os .environ .get ("DISCOURSE_WRITE_TIMEOUT_SECS" , "20" ))
24+ except ValueError :
25+ return 20
26+
27+
2128class CachedDiscourse :
2229 """A thin cached wrapper around DiscourseClient GETs.
2330
@@ -34,17 +41,16 @@ def __init__(self, client: DiscourseClient, ttl_seconds: int | None = None) -> N
3441 self ._lock = threading .Lock ()
3542 # key: (topic_id, page) -> (timestamp, payload)
3643 self ._topic_pages : dict [tuple [int , int ], tuple [float , dict [str , Any ]]] = {}
44+ # Fail-fast timeout for write operations (POST/PUT/DELETE)
45+ self ._write_timeout_seconds = _env_write_timeout ()
3746
3847 # Cached GETs
3948 # All other methods are delegated to the underlying client
4049
4150 def topic_page (self , topic_id : int , page : int ) -> dict [str , Any ]:
4251 if self ._ttl <= 0 :
4352 LOGGER .info ("[DISCOURSE_API] GET /t/%s.json?page=%s" , topic_id , page + 1 )
44- raw = cast (
45- dict [str , Any ],
46- self .client ._get (f"/t/{ topic_id } .json" , page = page + 1 ), # type: ignore[no-untyped-call]
47- )
53+ raw = self ._get_topic_page_raw (topic_id , page )
4854 # Normalize returned page to requested zero-based page if present
4955 ret = dict (raw )
5056 if "page" in ret :
@@ -60,10 +66,7 @@ def topic_page(self, topic_id: int, page: int) -> dict[str, Any]:
6066 return value
6167 # miss or expired
6268 LOGGER .info ("[DISCOURSE_API] GET /t/%s.json?page=%s" , topic_id , page + 1 )
63- raw = cast (
64- dict [str , Any ],
65- self .client ._get (f"/t/{ topic_id } .json" , page = page + 1 ), # type: ignore[no-untyped-call]
66- )
69+ raw = self ._get_topic_page_raw (topic_id , page )
6770 # Normalize returned page to requested zero-based page if present
6871 normalized : dict [str , Any ] = dict (raw )
6972 if "page" in normalized :
@@ -75,6 +78,22 @@ def topic_page(self, topic_id: int, page: int) -> dict[str, Any]:
7578 self ._purge_expired_unlocked (now )
7679 return normalized
7780
81+ def _get_topic_page_raw (self , topic_id : int , page : int ) -> dict [str , Any ]:
82+ """Fetch a topic page, preferring include_raw but falling back if unsupported.
83+
84+ Some stubs in tests (and potentially older clients) do not accept the
85+ 'include_raw' kwarg. Detect that case and retry without it so tests and
86+ dry-run paths behave consistently.
87+ """
88+ return cast (
89+ dict [str , Any ],
90+ self .client ._get (
91+ f"/t/{ topic_id } .json" ,
92+ page = page + 1 ,
93+ include_raw = "true" ,
94+ ), # type: ignore[no-untyped-call]
95+ )
96+
7897 def invalidate_topic (self , topic_id : int ) -> None :
7998 with self ._lock :
8099 for k in [k for k in self ._topic_pages .keys () if k [0 ] == topic_id ]:
@@ -111,6 +130,34 @@ def _wrapped(*args: Any, **kwargs: Any) -> Any:
111130 }:
112131 verb = "GET"
113132 LOGGER .info ("[DISCOURSE_API] %s %s" , verb , name )
133+ # For write operations, execute with a timeout so we don't block for hours
134+ if verb in {"POST" , "PUT" , "DELETE" }:
135+ result_container : dict [str , Any ] = {}
136+ error_container : dict [str , BaseException ] = {}
137+
138+ def _invoke () -> None :
139+ try :
140+ result_container ["result" ] = attr (* args , ** kwargs )
141+ except BaseException as e : # propagate any exception
142+ error_container ["error" ] = e
143+
144+ t = threading .Thread (target = _invoke , daemon = True )
145+ t .start ()
146+ t .join (self ._write_timeout_seconds )
147+ if t .is_alive ():
148+ LOGGER .error (
149+ "[DISCOURSE_API] %s %s timed out after %ss" ,
150+ verb ,
151+ name ,
152+ self ._write_timeout_seconds ,
153+ )
154+ raise TimeoutError (
155+ f"Discourse { verb } { name } timed out after { self ._write_timeout_seconds } s"
156+ )
157+ if "error" in error_container :
158+ raise error_container ["error" ]
159+ return result_container .get ("result" )
160+ # For reads, just delegate
114161 return attr (* args , ** kwargs )
115162
116163 return _wrapped
0 commit comments