Skip to content

Commit f74bc8c

Browse files
update: implement CMAB support in bucketer and decision service, revert OptimizelyFactory
1 parent fac8946 commit f74bc8c

File tree

4 files changed

+171
-59
lines changed

4 files changed

+171
-59
lines changed

optimizely/bucketer.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,55 @@ def bucket(
164164
decide_reasons.append(message)
165165

166166
return None, decide_reasons
167+
168+
def bucket_to_entity_id(
169+
self,
170+
bucketing_id: str,
171+
experiment: Experiment,
172+
traffic_allocations: list,
173+
parent_id: Optional[str] = None
174+
) -> tuple[Optional[str], list[str]]:
175+
"""
176+
Buckets the user and returns the entity ID (for CMAB experiments).
177+
Args:
178+
bucketing_id: The bucketing ID string for the user.
179+
experiment: The experiment object (for group/groupPolicy logic if needed).
180+
traffic_allocations: List of traffic allocation dicts (should have 'entity_id' and 'end_of_range' keys).
181+
parent_id: (optional) Used for mutex group support; if not supplied, experiment.id is used.
182+
183+
Returns:
184+
Tuple of (entity_id or None, list of decide reasons).
185+
"""
186+
decide_reasons = []
187+
188+
# If experiment is in a mutually exclusive group with random policy, check group bucketing first
189+
group_id = getattr(experiment, 'groupId', None)
190+
group_policy = getattr(experiment, 'groupPolicy', None)
191+
if group_id and group_policy == 'random':
192+
bucketing_key = f"{bucketing_id}{group_id}"
193+
bucket_number = self._generate_bucket_value(bucketing_key)
194+
# Group traffic allocation would need to be passed in or found here
195+
# For now, skipping group-level allocation (you can extend this for mutex groups)
196+
decide_reasons.append(f'Checked mutex group allocation for group "{group_id}".')
197+
198+
# Main bucketing for experiment or CMAB dummy entity
199+
parent_id = parent_id or experiment.id
200+
bucketing_key = f"{bucketing_id}{parent_id}"
201+
bucket_number = self._generate_bucket_value(bucketing_key)
202+
decide_reasons.append(
203+
f'Assigned bucket {bucket_number} to bucketing ID "{bucketing_id}" for parent "{parent_id}".'
204+
)
205+
206+
for allocation in traffic_allocations:
207+
end_of_range = allocation.get("end_of_range") or allocation.get("endOfRange")
208+
entity_id = allocation.get("entity_id") or allocation.get("entityId")
209+
if end_of_range is not None and bucket_number < end_of_range:
210+
decide_reasons.append(
211+
f'User with bucketing ID "{bucketing_id}" bucketed into entity "{entity_id}".'
212+
)
213+
return entity_id, decide_reasons
214+
215+
decide_reasons.append(
216+
f'User with bucketing ID "{bucketing_id}" not bucketed into any entity.'
217+
)
218+
return None, decide_reasons

optimizely/decision_service.py

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# limitations under the License.
1313

1414
from __future__ import annotations
15-
from typing import TYPE_CHECKING, NamedTuple, Optional, Sequence
15+
from typing import TYPE_CHECKING, NamedTuple, Optional, Sequence, List, TypedDict
1616

1717
from . import bucketer
1818
from . import entities
@@ -23,28 +23,41 @@
2323
from .helpers import validator
2424
from .optimizely_user_context import OptimizelyUserContext, UserAttributes
2525
from .user_profile import UserProfile, UserProfileService, UserProfileTracker
26+
from .cmab.cmab_service import DefaultCmabService, CmabDecision
27+
from optimizely.helpers.enums import Errors
2628

2729
if TYPE_CHECKING:
2830
# prevent circular dependenacy by skipping import at runtime
2931
from .project_config import ProjectConfig
3032
from .logger import Logger
3133

3234

35+
class CmabDecisionResult(TypedDict):
36+
error: bool
37+
result: Optional[CmabDecision]
38+
reasons: List[str]
39+
40+
3341
class Decision(NamedTuple):
3442
"""Named tuple containing selected experiment, variation and source.
3543
None if no experiment/variation was selected."""
3644
experiment: Optional[entities.Experiment]
3745
variation: Optional[entities.Variation]
3846
source: Optional[str]
47+
# cmab_uuid: Optional[str]
3948

4049

4150
class DecisionService:
4251
""" Class encapsulating all decision related capabilities. """
4352

44-
def __init__(self, logger: Logger, user_profile_service: Optional[UserProfileService]):
53+
def __init__(self,
54+
logger: Logger,
55+
user_profile_service: Optional[UserProfileService],
56+
cmab_service: DefaultCmabService):
4557
self.bucketer = bucketer.Bucketer()
4658
self.logger = logger
4759
self.user_profile_service = user_profile_service
60+
self.cmab_service = cmab_service
4861

4962
# Map of user IDs to another map of experiments to variations.
5063
# This contains all the forced variations set by the user
@@ -76,6 +89,48 @@ def _get_bucketing_id(self, user_id: str, attributes: Optional[UserAttributes])
7689

7790
return user_id, decide_reasons
7891

92+
def _get_decision_for_cmab_experiment(
93+
self,
94+
project_config: ProjectConfig,
95+
experiment: entities.Experiment,
96+
user_context: OptimizelyUserContext,
97+
options: Optional[Sequence[str]] = None
98+
) -> CmabDecisionResult:
99+
"""
100+
Retrieves a decision for a contextual multi-armed bandit (CMAB) experiment.
101+
102+
Args:
103+
project_config: Instance of ProjectConfig.
104+
experiment: The experiment object for which the decision is to be made.
105+
user_context: The user context containing user id and attributes.
106+
options: Optional sequence of decide options.
107+
108+
Returns:
109+
A dictionary containing:
110+
- "error": Boolean indicating if there was an error.
111+
- "result": The CmabDecision result or empty dict if error.
112+
- "reasons": List of strings with reasons or error messages.
113+
"""
114+
try:
115+
options_list = list(options) if options is not None else []
116+
cmab_decision = self.cmab_service.get_decision(
117+
project_config, user_context, experiment.id, options_list
118+
)
119+
return {
120+
"error": False,
121+
"result": cmab_decision,
122+
"reasons": [],
123+
}
124+
except Exception as e:
125+
error_message = Errors.CMAB_FETCH_FAILED.format(str(e))
126+
if self.logger:
127+
self.logger.error(error_message)
128+
return {
129+
"error": True,
130+
"result": None,
131+
"reasons": [error_message],
132+
}
133+
79134
def set_forced_variation(
80135
self, project_config: ProjectConfig, experiment_key: str,
81136
user_id: str, variation_key: Optional[str]
@@ -313,7 +368,7 @@ def get_variation(
313368
else:
314369
self.logger.warning('User profile has invalid format.')
315370

316-
# Bucket user and store the new decision
371+
# Check audience conditions
317372
audience_conditions = experiment.get_audience_conditions_or_ids()
318373
user_meets_audience_conditions, reasons_received = audience_helper.does_user_meet_audience_conditions(
319374
project_config, audience_conditions,
@@ -330,8 +385,42 @@ def get_variation(
330385
# Determine bucketing ID to be used
331386
bucketing_id, bucketing_id_reasons = self._get_bucketing_id(user_id, user_context.get_user_attributes())
332387
decide_reasons += bucketing_id_reasons
333-
variation, bucket_reasons = self.bucketer.bucket(project_config, experiment, user_id, bucketing_id)
334-
decide_reasons += bucket_reasons
388+
389+
if experiment.cmab:
390+
CMAB_DUMMY_ENTITY_ID = "$"
391+
# Build the CMAB-specific traffic allocation
392+
cmab_traffic_allocation = [{
393+
"entity_id": CMAB_DUMMY_ENTITY_ID,
394+
"end_of_range": experiment.cmab['trafficAllocation']
395+
}]
396+
397+
# Check if user is in CMAB traffic allocation
398+
bucketed_entity_id, bucket_reasons = self.bucketer.bucket_to_entity_id(
399+
bucketing_id, experiment, cmab_traffic_allocation
400+
)
401+
decide_reasons += bucket_reasons
402+
if bucketed_entity_id != CMAB_DUMMY_ENTITY_ID:
403+
message = f'User "{user_id}" not in CMAB experiment "{experiment.key}" due to traffic allocation.'
404+
self.logger.info(message)
405+
decide_reasons.append(message)
406+
return None, decide_reasons
407+
408+
# User is in CMAB allocation, proceed to CMAB decision
409+
decision_variation_value = self._get_decision_for_cmab_experiment(project_config,
410+
experiment,
411+
user_context,
412+
options)
413+
decide_reasons += decision_variation_value.get('reasons', [])
414+
cmab_decision = decision_variation_value.get('result')
415+
if not cmab_decision:
416+
return None, decide_reasons
417+
variation_id = cmab_decision['variation_id']
418+
variation = project_config.get_variation_from_id(experiment_key=experiment.key, variation_id=variation_id)
419+
else:
420+
# Bucket the user
421+
variation, bucket_reasons = self.bucketer.bucket(project_config, experiment, user_id, bucketing_id)
422+
decide_reasons += bucket_reasons
423+
335424
if isinstance(variation, entities.Variation):
336425
message = f'User "{user_id}" is in variation "{variation.key}" of experiment {experiment.key}.'
337426
self.logger.info(message)

optimizely/optimizely.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,18 @@
4444
from .optimizely_config import OptimizelyConfig, OptimizelyConfigService
4545
from .optimizely_user_context import OptimizelyUserContext, UserAttributes
4646
from .project_config import ProjectConfig
47-
from .cmab.cmab_service import DefaultCmabService
47+
from .cmab.cmab_client import DefaultCmabClient, CmabRetryConfig
48+
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue
49+
from .odp.lru_cache import LRUCache
4850

4951
if TYPE_CHECKING:
5052
# prevent circular dependency by skipping import at runtime
5153
from .user_profile import UserProfileService
5254
from .helpers.event_tag_utils import EventTags
5355

56+
# Default constants for CMAB cache
57+
DEFAULT_CMAB_CACHE_TIMEOUT = 30 * 60 * 1000 # 30 minutes in milliseconds
58+
DEFAULT_CMAB_CACHE_SIZE = 1000
5459

5560
class Optimizely:
5661
""" Class encapsulating all SDK functionality. """
@@ -71,7 +76,6 @@ def __init__(
7176
default_decide_options: Optional[list[str]] = None,
7277
event_processor_options: Optional[dict[str, Any]] = None,
7378
settings: Optional[OptimizelySdkSettings] = None,
74-
cmab_service: Optional[DefaultCmabService] = None
7579
) -> None:
7680
""" Optimizely init method for managing Custom projects.
7781
@@ -100,7 +104,6 @@ def __init__(
100104
default_decide_options: Optional list of decide options used with the decide APIs.
101105
event_processor_options: Optional dict of options to be passed to the default batch event processor.
102106
settings: Optional instance of OptimizelySdkSettings for sdk configuration.
103-
cmab_service: Optional instance of DefaultCmabService for Contextual Multi-Armed Bandit (CMAB) support.
104107
"""
105108
self.logger_name = '.'.join([__name__, self.__class__.__name__])
106109
self.is_valid = True
@@ -172,10 +175,21 @@ def __init__(
172175
self._setup_odp(self.config_manager.get_sdk_key())
173176

174177
self.event_builder = event_builder.EventBuilder()
175-
if cmab_service:
176-
cmab_service.logger = self.logger
177-
self.cmab_service = cmab_service
178-
self.decision_service = decision_service.DecisionService(self.logger, user_profile_service, cmab_service)
178+
179+
# Initialize CMAB components
180+
181+
self.cmab_client = DefaultCmabClient(
182+
retry_config=CmabRetryConfig(),
183+
logger=logger
184+
)
185+
self.cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(DEFAULT_CMAB_CACHE_SIZE,
186+
DEFAULT_CMAB_CACHE_TIMEOUT)
187+
self.cmab_service = DefaultCmabService(
188+
cmab_cache=self.cmab_cache,
189+
cmab_client=self.cmab_client,
190+
logger=self.logger
191+
)
192+
self.decision_service = decision_service.DecisionService(self.logger, user_profile_service, self.cmab_service)
179193
self.user_profile_service = user_profile_service
180194

181195
def _validate_instantiation_options(self) -> None:

optimizely/optimizely_factory.py

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,11 @@
2222
from .event_dispatcher import EventDispatcher, CustomEventDispatcher
2323
from .notification_center import NotificationCenter
2424
from .optimizely import Optimizely
25-
from .cmab.cmab_client import DefaultCmabClient, CmabRetryConfig
26-
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue
27-
from .odp.lru_cache import LRUCache
2825

2926
if TYPE_CHECKING:
3027
# prevent circular dependenacy by skipping import at runtime
3128
from .user_profile import UserProfileService
3229

33-
# Default constants for CMAB cache
34-
DEFAULT_CMAB_CACHE_TIMEOUT = 30 * 60 * 1000 # 30 minutes in milliseconds
35-
DEFAULT_CMAB_CACHE_SIZE = 1000
36-
3730

3831
class OptimizelyFactory:
3932
""" Optimizely factory to provides basic utility to instantiate the Optimizely
@@ -43,8 +36,6 @@ class OptimizelyFactory:
4336
max_event_flush_interval: Optional[int] = None
4437
polling_interval: Optional[float] = None
4538
blocking_timeout: Optional[int] = None
46-
cmab_cache_size: int = DEFAULT_CMAB_CACHE_SIZE
47-
cmab_cache_timeout: int = DEFAULT_CMAB_CACHE_TIMEOUT
4839

4940
@staticmethod
5041
def set_batch_size(batch_size: int) -> int:
@@ -113,36 +104,16 @@ def default_instance(sdk_key: str, datafile: Optional[str] = None) -> Optimizely
113104
notification_center=notification_center,
114105
)
115106

116-
# Initialize CMAB components
117-
cmab_client = DefaultCmabClient(
118-
retry_config=CmabRetryConfig(),
119-
logger=logger
120-
)
121-
cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(OptimizelyFactory.cmab_cache_size,
122-
OptimizelyFactory.cmab_cache_timeout)
123-
cmab_service = DefaultCmabService(
124-
cmab_cache=cmab_cache,
125-
cmab_client=cmab_client,
126-
logger=logger
127-
)
128-
129107
optimizely = Optimizely(
130108
datafile, None, logger, error_handler, None, None, sdk_key, config_manager, notification_center,
131-
event_processor, cmab_service=cmab_service
109+
event_processor
132110
)
133111
return optimizely
134112

135113
@staticmethod
136114
def default_instance_with_config_manager(config_manager: BaseConfigManager) -> Optimizely:
137-
# Initialize CMAB components
138-
cmab_client = DefaultCmabClient(retry_config=CmabRetryConfig())
139-
cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(OptimizelyFactory.cmab_cache_size,
140-
OptimizelyFactory.cmab_cache_timeout)
141-
cmab_service = DefaultCmabService(cmab_cache=cmab_cache, cmab_client=cmab_client)
142-
143115
return Optimizely(
144-
config_manager=config_manager,
145-
cmab_service=cmab_service
116+
config_manager=config_manager
146117
)
147118

148119
@staticmethod
@@ -203,21 +174,7 @@ def custom_instance(
203174
notification_center=notification_center,
204175
)
205176

206-
# Initialize CMAB components
207-
cmab_client = DefaultCmabClient(
208-
retry_config=CmabRetryConfig(),
209-
logger=logger
210-
)
211-
cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(OptimizelyFactory.cmab_cache_size,
212-
OptimizelyFactory.cmab_cache_timeout)
213-
cmab_service = DefaultCmabService(
214-
cmab_cache=cmab_cache,
215-
cmab_client=cmab_client,
216-
logger=logger
217-
)
218-
219177
return Optimizely(
220178
datafile, event_dispatcher, logger, error_handler, skip_json_validation, user_profile_service,
221-
sdk_key, config_manager, notification_center, event_processor, settings=settings,
222-
cmab_service=cmab_service
223-
)
179+
sdk_key, config_manager, notification_center, event_processor, settings=settings
180+
)

0 commit comments

Comments
 (0)