Skip to content

Commit dfd7a0a

Browse files
authored
chore(llmobs): refactor log/llmobs writer startup, configuration options (#8041)
This PR refactors some startup/configuration logic from the langchain/openai integrations to the base LLM Integration class, including: - Add helper properties to the Base LLM class to check if metrics/logs/llmobs features are enabled (currently just checks the integration config, but later we'll be adding enabled feature flags to the global config). - The `LogWriter` and `LLMObsWriter` instances are not created if the corresponding feature configuration is not enabled. This way we can avoid starting up separate threads unnecessarily as well as making the `log/llmobs_prompt_completion_sample_rate` config options being optional rather than required. - Move the DD_API_KEY/DD_APP_KEY checks from integrations to the LLM integration constructor method. No functionality has been changed by this PR. The unused placeholder config `llmobs_prompt_completion_sample_rate` has been removed from the Langchain integration until it will be later reintroduced when necessary. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Title is accurate. - [ ] No unnecessary changes are introduced. - [ ] Description motivates each change. - [ ] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [ ] Testing strategy adequately addresses listed risk(s). - [ ] Change is maintainable (easy to change, telemetry, documentation). - [ ] Release note makes sense to a user of the library. - [ ] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [ ] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [ ] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [ ] This PR doesn't touch any of that.
1 parent 83480a0 commit dfd7a0a

File tree

3 files changed

+85
-78
lines changed

3 files changed

+85
-78
lines changed

ddtrace/contrib/_trace_utils_llm.py

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,89 @@
1414
from ddtrace.internal.hostname import get_hostname
1515
from ddtrace.internal.llmobs import LLMObsWriter
1616
from ddtrace.internal.log_writer import V2LogWriter
17+
from ddtrace.internal.utils.formats import asbool
1718
from ddtrace.sampler import RateSampler
1819

1920

2021
class BaseLLMIntegration:
2122
_integration_name = "baseLLM"
2223

23-
def __init__(self, config, stats_url, site, api_key, app_key=None):
24+
def __init__(self, config, stats_url):
2425
# FIXME: this currently does not consider if the tracer is configured to
2526
# use a different hostname. eg. tracer.configure(host="new-hostname")
2627
# Ideally the metrics client should live on the tracer or some other core
2728
# object that is strongly linked with configuration.
28-
self._statsd = get_dogstatsd_client(stats_url, namespace=self._integration_name)
29+
self._log_writer = None
30+
self._llmobs_writer = None
31+
self._statsd = None
2932
self._config = config
30-
self._log_writer = V2LogWriter(
31-
site=site,
32-
api_key=api_key,
33-
interval=float(os.getenv("_DD_%s_LOG_WRITER_INTERVAL" % self._integration_name.upper(), "1.0")),
34-
timeout=float(os.getenv("_DD_%s_LOG_WRITER_TIMEOUT" % self._integration_name.upper(), "2.0")),
35-
)
36-
self._llmobs_writer = LLMObsWriter(
37-
site=site,
38-
api_key=api_key,
39-
app_key=app_key,
40-
interval=float(os.getenv("_DD_%s_LLM_WRITER_INTERVAL" % self._integration_name.upper(), "1.0")),
41-
timeout=float(os.getenv("_DD_%s_LLM_WRITER_TIMEOUT" % self._integration_name.upper(), "2.0")),
42-
)
4333
self._span_pc_sampler = RateSampler(sample_rate=config.span_prompt_completion_sample_rate)
44-
self._log_pc_sampler = RateSampler(sample_rate=config.log_prompt_completion_sample_rate)
45-
self._llmobs_pc_sampler = RateSampler(sample_rate=config.llmobs_prompt_completion_sample_rate)
34+
35+
_dd_api_key = os.getenv("DD_API_KEY", config.get("_api_key"))
36+
_dd_app_key = os.getenv("DD_APP_KEY", config.get("_app_key"))
37+
_dd_site = os.getenv("DD_SITE", "datadoghq.com")
38+
39+
if self.metrics_enabled:
40+
self._statsd = get_dogstatsd_client(stats_url, namespace=self._integration_name)
41+
if self.logs_enabled:
42+
if not _dd_api_key:
43+
raise ValueError(
44+
f"DD_API_KEY is required for sending logs from the {self._integration_name} integration. "
45+
f"To use the {self._integration_name} integration without logs, "
46+
f"set `DD_{self._integration_name.upper()}_LOGS_ENABLED=false`."
47+
)
48+
self._log_writer = V2LogWriter(
49+
site=_dd_site,
50+
api_key=_dd_api_key,
51+
interval=float(os.getenv("_DD_%s_LOG_WRITER_INTERVAL" % self._integration_name.upper(), "1.0")),
52+
timeout=float(os.getenv("_DD_%s_LOG_WRITER_TIMEOUT" % self._integration_name.upper(), "2.0")),
53+
)
54+
self._log_pc_sampler = RateSampler(sample_rate=config.log_prompt_completion_sample_rate)
55+
self.start_log_writer()
56+
57+
if self.llmobs_enabled:
58+
if not _dd_api_key:
59+
raise ValueError(
60+
f"DD_API_KEY is required for sending LLMObs data from the {self._integration_name} integration. "
61+
f"To use the {self._integration_name} integration without LLMObs, "
62+
f"set `DD_{self._integration_name.upper()}_LLMOBS_ENABLED=false`."
63+
)
64+
if not _dd_app_key:
65+
raise ValueError(
66+
f"DD_APP_KEY is required for sending LLMObs payloads from the {self._integration_name} integration."
67+
f" To use the {self._integration_name} integration without LLMObs, "
68+
f"set `DD_{self._integration_name.upper()}_LLMOBS_ENABLED=false`."
69+
)
70+
self._llmobs_writer = LLMObsWriter(
71+
site=_dd_site,
72+
api_key=_dd_api_key,
73+
app_key=_dd_app_key,
74+
interval=float(os.getenv("_DD_%s_LLM_WRITER_INTERVAL" % self._integration_name.upper(), "1.0")),
75+
timeout=float(os.getenv("_DD_%s_LLM_WRITER_TIMEOUT" % self._integration_name.upper(), "2.0")),
76+
)
77+
self._llmobs_pc_sampler = RateSampler(sample_rate=config.llmobs_prompt_completion_sample_rate)
78+
self.start_llm_writer()
79+
80+
@property
81+
def metrics_enabled(self) -> bool:
82+
"""Return whether submitting metrics is enabled for this integration, or global config if not set."""
83+
if hasattr(self._config, "metrics_enabled"):
84+
return asbool(self._config.metrics_enabled)
85+
return False
86+
87+
@property
88+
def logs_enabled(self) -> bool:
89+
"""Return whether submitting logs is enabled for this integration, or global config if not set."""
90+
if hasattr(self._config, "logs_enabled"):
91+
return asbool(self._config.logs_enabled)
92+
return False
93+
94+
@property
95+
def llmobs_enabled(self) -> bool:
96+
"""Return whether submitting llmobs payloads is enabled for this integration, or global config if not set."""
97+
if hasattr(self._config, "llmobs_enabled"):
98+
return asbool(self._config.llmobs_enabled)
99+
return False
46100

47101
def is_pc_sampled_span(self, span: Span) -> bool:
48102
if not span.sampled:
@@ -61,10 +115,13 @@ def is_pc_sampled_llmobs(self, span: Span) -> bool:
61115
return self._llmobs_pc_sampler.sample(span)
62116

63117
def start_log_writer(self) -> None:
118+
if not self._config.logs_enabled:
119+
return
64120
self._log_writer.start()
65121

66-
def start_llm_writer(self):
67-
# type: (...) -> None
122+
def start_llm_writer(self) -> None:
123+
if not self._config.llmobs_enabled:
124+
return
68125
self._llmobs_writer.start()
69126

70127
@abc.abstractmethod
@@ -96,7 +153,7 @@ def _logs_tags(cls, span):
96153

97154
def log(self, span, level, msg, attrs):
98155
# type: (Span, str, str, Dict[str, Any]) -> None
99-
if not self._config.logs_enabled:
156+
if not self.logs_enabled:
100157
return
101158
tags = self._logs_tags(span)
102159
log = {
@@ -124,7 +181,7 @@ def _metrics_tags(cls, span):
124181
def metric(self, span, kind, name, val, tags=None):
125182
# type: (Span, str, str, Any, Optional[List[str]]) -> None
126183
"""Set a metric using the context from the given span."""
127-
if not self._config.metrics_enabled:
184+
if not self.metrics_enabled:
128185
return
129186
metric_tags = self._metrics_tags(span)
130187
if tags:
@@ -154,7 +211,7 @@ def trunc(self, text):
154211
def llm_record(self, span, attrs):
155212
# type: (Span, Dict[str, Any]) -> None
156213
"""Create a LLM record to send to the LLM Obs intake."""
157-
if not self._config.llmobs_enabled:
214+
if not self.llmobs_enabled:
158215
return
159216
llm_record = {}
160217
if span is not None:

ddtrace/contrib/langchain/patch.py

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,6 @@ def get_version():
5858
"logs_enabled": asbool(os.getenv("DD_LANGCHAIN_LOGS_ENABLED", False)),
5959
"metrics_enabled": asbool(os.getenv("DD_LANGCHAIN_METRICS_ENABLED", True)),
6060
"span_prompt_completion_sample_rate": float(os.getenv("DD_LANGCHAIN_SPAN_PROMPT_COMPLETION_SAMPLE_RATE", 1.0)),
61-
# FIXME: llmobs_prompt_completion_sample_rate does not currently work as the langchain integration doesn't
62-
# send LLMObs payloads. This is a placeholder for when we do.
63-
"llmobs_prompt_completion_sample_rate": float(
64-
os.getenv("DD_LANGCHAIN_LLMOBS_PROMPT_COMPLETION_SAMPLE_RATE", 1.0)
65-
),
6661
"log_prompt_completion_sample_rate": float(os.getenv("DD_LANGCHAIN_LOG_PROMPT_COMPLETION_SAMPLE_RATE", 0.1)),
6762
"span_char_limit": int(os.getenv("DD_LANGCHAIN_SPAN_CHAR_LIMIT", 128)),
6863
"_api_key": os.getenv("DD_API_KEY"),
@@ -73,9 +68,6 @@ def get_version():
7368
class _LangChainIntegration(BaseLLMIntegration):
7469
_integration_name = "langchain"
7570

76-
def __init__(self, config, stats_url, site, api_key):
77-
super().__init__(config, stats_url, site, api_key)
78-
7971
def _set_base_span_tags(self, span, interface_type="", provider=None, model=None, api_key=None):
8072
# type: (Span, str, Optional[str], Optional[str], Optional[str]) -> None
8173
"""Set base level tags that should be present on all LangChain spans (if they are not None)."""
@@ -130,7 +122,7 @@ def _metrics_tags(cls, span):
130122

131123
def record_usage(self, span, usage):
132124
# type: (Span, Dict[str, Any]) -> None
133-
if not usage or self._config.metrics_enabled is False:
125+
if not usage or self.metrics_enabled is False:
134126
return
135127
for token_type in ("prompt", "completion", "total"):
136128
num_tokens = usage.get("token_usage", {}).get(token_type + "_tokens")
@@ -748,28 +740,13 @@ def patch():
748740
return
749741
langchain._datadog_patch = True
750742

751-
# TODO: How do we test this? Can we mock out the metric/logger/sampler?
752-
ddsite = os.getenv("DD_SITE", "datadoghq.com")
753-
ddapikey = os.getenv("DD_API_KEY", config.langchain._api_key)
754-
755743
Pin().onto(langchain)
756744
integration = _LangChainIntegration(
757745
config=config.langchain,
758746
stats_url=get_stats_url(),
759-
site=ddsite,
760-
api_key=ddapikey,
761747
)
762748
langchain._datadog_integration = integration
763749

764-
if config.langchain.logs_enabled:
765-
if not ddapikey:
766-
raise ValueError(
767-
"DD_API_KEY is required for sending logs from the LangChain integration."
768-
" The LangChain integration can be disabled by setting the ``DD_TRACE_LANGCHAIN_ENABLED``"
769-
" environment variable to False."
770-
)
771-
integration.start_log_writer()
772-
773750
# Langchain doesn't allow wrapping directly from root, so we have to import the base classes first before wrapping.
774751
# ref: https://github.com/DataDog/dd-trace-py/issues/7123
775752
from langchain import embeddings # noqa:F401

ddtrace/contrib/openai/patch.py

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,12 @@ def get_version():
154154
class _OpenAIIntegration(BaseLLMIntegration):
155155
_integration_name = "openai"
156156

157-
def __init__(self, config, openai, stats_url, site, api_key, app_key=None):
157+
def __init__(self, config, openai, stats_url):
158158
# FIXME: this currently does not consider if the tracer is configured to
159159
# use a different hostname. eg. tracer.configure(host="new-hostname")
160160
# Ideally the metrics client should live on the tracer or some other core
161161
# object that is strongly linked with configuration.
162-
super().__init__(config, stats_url, site, api_key, app_key=app_key)
162+
super().__init__(config, stats_url)
163163
self._openai = openai
164164
self._user_api_key = None
165165
self._client = None
@@ -237,7 +237,7 @@ def _metrics_tags(cls, span):
237237
return tags
238238

239239
def record_usage(self, span, usage):
240-
if not usage or not self._config.metrics_enabled:
240+
if not usage or not self.metrics_enabled:
241241
return
242242
tags = self._metrics_tags(span)
243243
tags.append("openai.estimated:false")
@@ -251,7 +251,7 @@ def record_usage(self, span, usage):
251251
def generate_completion_llm_records(self, resp, span, args, kwargs):
252252
# type: (Any, Span, List[Any], Dict[str, Any]) -> None
253253
"""Generate payloads for the LLM Obs API from a completion."""
254-
if not self._config.llmobs_enabled:
254+
if not self.llmobs_enabled:
255255
return
256256
choices = resp.choices
257257
n = kwargs.get("n", 1)
@@ -284,7 +284,7 @@ def generate_completion_llm_records(self, resp, span, args, kwargs):
284284
def generate_chat_llm_records(self, resp, span, args, kwargs):
285285
# type: (Any, Span, List[Any], Dict[str, Any]) -> None
286286
"""Generate payloads for the LLM Obs API from a chat completion."""
287-
if not self._config.llmobs_enabled:
287+
if not self.llmobs_enabled:
288288
return
289289
choices = resp.choices
290290
now = time.time()
@@ -332,40 +332,13 @@ def patch():
332332
if getattr(openai, "__datadog_patch", False):
333333
return
334334

335-
ddsite = os.getenv("DD_SITE", "datadoghq.com")
336-
ddapikey = os.getenv("DD_API_KEY", config.openai._api_key)
337-
ddappkey = os.getenv("DD_APP_KEY", config.openai._app_key)
338-
339335
Pin().onto(openai)
340336
integration = _OpenAIIntegration(
341337
config=config.openai,
342338
openai=openai,
343339
stats_url=get_stats_url(),
344-
site=ddsite,
345-
api_key=ddapikey,
346-
app_key=ddappkey,
347340
)
348341

349-
if config.openai.logs_enabled:
350-
if not ddapikey:
351-
raise ValueError(
352-
"DD_API_KEY is required for sending logs from the OpenAI integration."
353-
"To use the OpenAI integration without logs, set `DD_OPENAI_LOGS_ENABLED=false`."
354-
)
355-
integration.start_log_writer()
356-
if config.openai.llmobs_enabled:
357-
if not ddapikey:
358-
raise ValueError(
359-
"DD_API_KEY is required for sending LLMObs data from the OpenAI integration."
360-
"To use the OpenAI integration without LLMObs, set `DD_OPENAI_LLMOBS_ENABLED=false`."
361-
)
362-
if not ddappkey:
363-
raise ValueError(
364-
"DD_APP_KEY is required for sending LLMObs payloads from the OpenAI integration."
365-
"To use the OpenAI integration without LLMObs, set `DD_OPENAI_LLMOBS_ENABLED=false`."
366-
)
367-
integration.start_llm_writer()
368-
369342
if OPENAI_VERSION >= (1, 0, 0):
370343
wrap(openai._base_client.BaseClient._process_response, _patched_convert(openai, integration))
371344
wrap(openai.OpenAI.__init__, _patched_client_init(openai, integration))

0 commit comments

Comments
 (0)