diff --git a/examples/llm_finetune/devstral/devstral2_small_2512_squad.yaml b/examples/llm_finetune/devstral/devstral2_small_2512_squad.yaml new file mode 100644 index 000000000..8dbee4d09 --- /dev/null +++ b/examples/llm_finetune/devstral/devstral2_small_2512_squad.yaml @@ -0,0 +1,99 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# To run this recipe, please use the following command: +# torchrun --nproc-per-node=8 examples/llm_finetune/finetune.py --config examples/llm_finetune/devstral/devstral2_small_2512_hellaswag.yaml +# Adjust --nproc-per-node to the number of GPUs available on your host machine. + + +step_scheduler: + global_batch_size: 64 + local_batch_size: 1 + ckpt_every_steps: 200 + val_every_steps: 100 # will run every x number of gradient steps + num_epochs: 1 + +dist_env: + backend: nccl + timeout_minutes: 1 + +rng: + _target_: nemo_automodel.components.training.rng.StatefulRNG + seed: 1111 + ranked: true + +model: + _target_: nemo_automodel.NeMoAutoModelForCausalLM.from_pretrained + pretrained_model_name_or_path: akoumpa/Devstral-Small-2-24B-Instruct-2512-BF16 + +checkpoint: + enabled: true + checkpoint_dir: checkpoints/ + model_save_format: torch_save # torch_save or safetensors + save_consolidated: false # saves the model in a consolidated safetensors format. Requires model_save_format to be safetensors. + +distributed: + _target_: nemo_automodel.components.distributed.fsdp2.FSDP2Manager + dp_size: none + dp_replicate_size: 1 # dp_shard_size = dp_size / dp_replicate_size and dp_shard_size < dp_size. For DDP usecase, use DDPManager + tp_size: 1 + cp_size: 1 + sequence_parallel: false + +loss_fn: + _target_: nemo_automodel.components.loss.masked_ce.MaskedCrossEntropy + +dataset: + _target_: nemo_automodel.components.datasets.llm.squad.make_squad_dataset + dataset_name: rajpurkar/squad + split: train + +packed_sequence: + # Set packed_sequence_size > 0 to run with packed sequences + packed_sequence_size: 0 + +dataloader: + _target_: torchdata.stateful_dataloader.StatefulDataLoader + collate_fn: nemo_automodel.components.datasets.utils.default_collater + shuffle: true + + +validation_dataset: + _target_: nemo_automodel.components.datasets.llm.squad.make_squad_dataset + dataset_name: rajpurkar/squad + split: validation + limit_dataset_samples: 64 + +validation_dataloader: + _target_: torchdata.stateful_dataloader.StatefulDataLoader + collate_fn: nemo_automodel.components.datasets.utils.default_collater + +optimizer: + _target_: torch.optim.Adam + betas: [0.9, 0.999] + eps: 1e-8 + lr: 1.0e-5 + weight_decay: 0 + # min_lr: 1.0e-5 + +lr_scheduler: + lr_decay_style: cosine + min_lr: 1.0e-6 + +# Uncomment and configure for W&B logging +# wandb: +# project: +# entity: +# name: +# save_dir: diff --git a/examples/llm_finetune/devstral/devstral2_small_2512_squad_peft.yaml b/examples/llm_finetune/devstral/devstral2_small_2512_squad_peft.yaml new file mode 100644 index 000000000..d582b6396 --- /dev/null +++ b/examples/llm_finetune/devstral/devstral2_small_2512_squad_peft.yaml @@ -0,0 +1,108 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# To run this recipe, please use the following command: +# torchrun --nproc-per-node=8 examples/llm_finetune/finetune.py --config examples/llm_finetune/devstral/devstral2_small_2512_hellaswag.yaml +# Adjust --nproc-per-node to the number of GPUs available on your host machine. + + +step_scheduler: + global_batch_size: 64 + local_batch_size: 1 + ckpt_every_steps: 200 + val_every_steps: 100 # will run every x number of gradient steps + num_epochs: 1 + +dist_env: + backend: nccl + timeout_minutes: 1 + +rng: + _target_: nemo_automodel.components.training.rng.StatefulRNG + seed: 1111 + ranked: true + +model: + _target_: nemo_automodel.NeMoAutoModelForCausalLM.from_pretrained + pretrained_model_name_or_path: akoumpa/Devstral-Small-2-24B-Instruct-2512-BF16 + +peft: + _target_: nemo_automodel.components._peft.lora.PeftConfig + match_all_linear: True + dim: 8 + alpha: 32 + use_triton: True + # dtype needs a fix to resolve to type instead of string + # lora_dtype: torch.bfloat16 + +checkpoint: + enabled: true + checkpoint_dir: checkpoints/ + model_save_format: torch_save # torch_save or safetensors + save_consolidated: false # saves the model in a consolidated safetensors format. Requires model_save_format to be safetensors. + +distributed: + _target_: nemo_automodel.components.distributed.fsdp2.FSDP2Manager + dp_size: none + dp_replicate_size: 1 # dp_shard_size = dp_size / dp_replicate_size and dp_shard_size < dp_size. For DDP usecase, use DDPManager + tp_size: 1 + cp_size: 1 + sequence_parallel: false + +loss_fn: + _target_: nemo_automodel.components.loss.masked_ce.MaskedCrossEntropy + +dataset: + _target_: nemo_automodel.components.datasets.llm.squad.make_squad_dataset + dataset_name: rajpurkar/squad + split: train + +packed_sequence: + # Set packed_sequence_size > 0 to run with packed sequences + packed_sequence_size: 0 + +dataloader: + _target_: torchdata.stateful_dataloader.StatefulDataLoader + collate_fn: nemo_automodel.components.datasets.utils.default_collater + shuffle: true + + +validation_dataset: + _target_: nemo_automodel.components.datasets.llm.squad.make_squad_dataset + dataset_name: rajpurkar/squad + split: validation + limit_dataset_samples: 64 + +validation_dataloader: + _target_: torchdata.stateful_dataloader.StatefulDataLoader + collate_fn: nemo_automodel.components.datasets.utils.default_collater + +optimizer: + _target_: torch.optim.Adam + betas: [0.9, 0.999] + eps: 1e-8 + lr: 1.0e-5 + weight_decay: 0 + # min_lr: 1.0e-5 + +lr_scheduler: + lr_decay_style: cosine + min_lr: 1.0e-6 + +# Uncomment and configure for W&B logging +# wandb: +# project: +# entity: +# name: +# save_dir: diff --git a/nemo_automodel/__init__.py b/nemo_automodel/__init__.py index 35e459e84..4b54d3009 100644 --- a/nemo_automodel/__init__.py +++ b/nemo_automodel/__init__.py @@ -33,15 +33,18 @@ NeMoAutoModelForSequenceClassification, NeMoAutoModelForTextToWaveform, ) # noqa: I001 + from nemo_automodel._transformers.auto_tokenizer import NeMoAutoTokenizer globals()["NeMoAutoModelForCausalLM"] = NeMoAutoModelForCausalLM globals()["NeMoAutoModelForImageTextToText"] = NeMoAutoModelForImageTextToText globals()["NeMoAutoModelForSequenceClassification"] = NeMoAutoModelForSequenceClassification globals()["NeMoAutoModelForTextToWaveform"] = NeMoAutoModelForTextToWaveform + globals()["NeMoAutoTokenizer"] = NeMoAutoTokenizer __all__.append("NeMoAutoModelForCausalLM") __all__.append("NeMoAutoModelForImageTextToText") __all__.append("NeMoAutoModelForSequenceClassification") __all__.append("NeMoAutoModelForTextToWaveform") + __all__.append("NeMoAutoTokenizer") except: # optional dependency might be missing, # leave the name off the module namespace so other imports still work diff --git a/nemo_automodel/_transformers/__init__.py b/nemo_automodel/_transformers/__init__.py index d05c00be1..1514cbef6 100644 --- a/nemo_automodel/_transformers/__init__.py +++ b/nemo_automodel/_transformers/__init__.py @@ -19,10 +19,12 @@ NeMoAutoModelForSequenceClassification, NeMoAutoModelForTextToWaveform, ) +from nemo_automodel._transformers.auto_tokenizer import NeMoAutoTokenizer __all__ = [ "NeMoAutoModelForCausalLM", "NeMoAutoModelForImageTextToText", "NeMoAutoModelForSequenceClassification", "NeMoAutoModelForTextToWaveform", + "NeMoAutoTokenizer", ] diff --git a/nemo_automodel/_transformers/auto_tokenizer.py b/nemo_automodel/_transformers/auto_tokenizer.py index 9f9dedac7..328e7e3f1 100644 --- a/nemo_automodel/_transformers/auto_tokenizer.py +++ b/nemo_automodel/_transformers/auto_tokenizer.py @@ -12,112 +12,120 @@ # See the License for the specific language governing permissions and # limitations under the License. -from transformers import AutoTokenizer -from transformers.tokenization_utils_base import BatchEncoding +import logging +from typing import Callable, Optional, Type, Union + +from transformers import AutoConfig, AutoTokenizer + +from nemo_automodel._transformers.tokenization.nemo_auto_tokenizer import AutoTokenizerWithBosEosEnforced +from nemo_automodel._transformers.tokenization.registry import TokenizerRegistry + +logger = logging.getLogger(__name__) class NeMoAutoTokenizer: + """ + Auto tokenizer class that dispatches to appropriate tokenizer implementations. + + Similar to HuggingFace's AutoTokenizer, but with a custom registry for specialized + tokenizer implementations. + + The dispatch logic is: + 1. If a custom tokenizer is registered for the model type, use it + 2. Otherwise, fall back to AutoTokenizerWithBosEosEnforced + + Example: + >>> # Will use MistralCommonBackend if available for Mistral models + >>> tokenizer = NeMoAutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1") + + >>> # Force using HF AutoTokenizer with BOS/EOS enforcement + >>> tokenizer = NeMoAutoTokenizer.from_pretrained("gpt2", force_default=True) + """ + + # Make registry accessible at class level + _registry = TokenizerRegistry + + def __init__(self): + raise EnvironmentError( + f"{self.__class__.__name__} is designed to be instantiated using the " + f"`{self.__class__.__name__}.from_pretrained(pretrained_model_name_or_path)` method." + ) + + @classmethod + def register(cls, model_type: str, tokenizer_cls: Union[Type, Callable]) -> None: + """ + Register a custom tokenizer for a specific model type. + + Args: + model_type: The model type string (e.g., "mistral", "llama") + tokenizer_cls: The tokenizer class or factory function + """ + cls._registry.register(model_type, tokenizer_cls) + @classmethod def from_pretrained( - cls, pretrained_model_name_or_path, *args, force_hf=False, add_bos_token=True, add_eos_token=True, **kwargs + cls, + pretrained_model_name_or_path: str, + *args, + force_default: bool = False, + force_hf: bool = False, + trust_remote_code: bool = False, + **kwargs, ): """ - Load the HF tokenizer class via AutoTokenizer and (optionally) wrap it to add BOS/EOS. + Load a tokenizer from a pretrained model. + + Args: + pretrained_model_name_or_path: Model identifier or path + force_default: If True, always use AutoTokenizerWithBosEosEnforced + force_hf: If True, return the raw HF AutoTokenizer without any wrapping + trust_remote_code: Whether to trust remote code when loading config + **kwargs: Additional arguments passed to the tokenizer's from_pretrained - There are pre-existing issues with some tokenizers (e.g. GPT2Tokenizer) where the BOS/EOS tokens are not added + Returns: + A tokenizer instance appropriate for the model type """ - hf_tok = AutoTokenizer.from_pretrained(pretrained_model_name_or_path, *args, **kwargs) + # If force_hf, just use the base HF AutoTokenizer if force_hf: - return hf_tok - - return cls(hf_tok, add_bos_token=add_bos_token, add_eos_token=add_eos_token) - - def __init__(self, base_tokenizer, *, add_bos_token: bool, add_eos_token: bool): - self._base_tokenizer = base_tokenizer - self._add_bos = bool(add_bos_token) - self._add_eos = bool(add_eos_token) - - @property - def add_bos_token(self): - return self._add_bos - - @property - def add_eos_token(self): - return self._add_eos - - def __getattr__(self, name): - base = object.__getattribute__(self, "_base_tokenizer") - return getattr(base, name) - - def __setattr__(self, name, value): - # Route writes to the underlying tokenizer when appropriate - internal_fields = {"_base_tokenizer", "_add_bos", "_add_eos"} - if name in internal_fields: - return object.__setattr__(self, name, value) - base = self.__dict__.get("_base_tokenizer", None) - if base is not None and hasattr(base, name): - return setattr(base, name, value) - return object.__setattr__(self, name, value) - - def __call__(self, *args, **kwargs): - tokenized = self._base_tokenizer(*args, **kwargs) - if not kwargs.get("add_special_tokens", True): - return tokenized - if isinstance(tokenized, BatchEncoding): - _tokenized_keys = {"input_ids", "attention_mask", "assistant_masks"} - add_bos_ids = self._add_bos and (getattr(self, "bos_token_id", None) is not None) - add_eos_ids = self._add_eos and (getattr(self, "eos_token_id", None) is not None) - if not "input_ids" in tokenized: - return tokenized - if add_bos_ids: - add_bos_ids = _add_token(tokenized, self.bos_token_id, 0, "input_ids") - if add_eos_ids: - add_eos_ids = _add_token(tokenized, self.eos_token_id, -1, "input_ids") - - for key in {"attention_mask", "assistant_masks"}: - if key not in tokenized: - continue - if add_bos_ids: - _add_token(tokenized, 1, 0, key) - if add_eos_ids: - _add_token(tokenized, 1, -1, key) - return tokenized - - def encode(self, *args, **kwargs): - encoded = self._base_tokenizer.encode(*args, **kwargs) - if not kwargs.get("add_special_tokens", True): - return encoded - if self._add_bos: - if encoded and (getattr(self, "bos_token_id", None) is not None) and encoded[0] != self.bos_token_id: - encoded = [self.bos_token_id] + encoded - if self._add_eos: - if encoded and (getattr(self, "eos_token_id", None) is not None) and encoded[-1] != self.eos_token_id: - encoded = encoded + [self.eos_token_id] - return encoded - - -def _add_token(tokenized, value, position, key): - def _extend_single(sequence, val, pos, always_add): - if pos == 0: - if always_add or not sequence or sequence[0] != val: - return [val] + sequence, True - return sequence, False - if pos == -1: - if always_add or not sequence or sequence[-1] != val: - return sequence + [val], True - return sequence, False - raise ValueError(f"Invalid position: {pos}") - - sequences = tokenized[key] - always_add = key != "input_ids" - if isinstance(sequences, list) and sequences and isinstance(sequences[0], list): - ans = [_extend_single(seq, value, position, always_add) for seq in sequences] - tokenized[key] = list(map(lambda x: x[0], ans)) - return any(map(lambda x: x[1], ans)) - elif isinstance(sequences, list): - ans = _extend_single(sequences, value, position, always_add) - tokenized[key] = ans[0] - return ans[1] - else: - raise ValueError(f"Invalid sequence type: {type(sequences)}") - return False + return AutoTokenizer.from_pretrained( + pretrained_model_name_or_path, *args, trust_remote_code=trust_remote_code, **kwargs + ) + + # Try to determine model type from config + model_type = cls._get_model_type(pretrained_model_name_or_path, trust_remote_code=trust_remote_code) + + if model_type and cls._registry.has_custom_tokenizer(model_type): + tokenizer_cls = cls._registry.get_tokenizer_cls(model_type) + logger.info(f"Using custom tokenizer {tokenizer_cls.__name__} for model type '{model_type}'") + return tokenizer_cls.from_pretrained(pretrained_model_name_or_path, *args, **kwargs) + + # Fall back to default BOS/EOS enforced tokenizer + return AutoTokenizerWithBosEosEnforced.from_pretrained( + pretrained_model_name_or_path, *args, trust_remote_code=trust_remote_code, **kwargs + ) + + @classmethod + def _get_model_type(cls, pretrained_model_name_or_path: str, trust_remote_code: bool = False) -> Optional[str]: + """ + Determine the model type from the config. + + Args: + pretrained_model_name_or_path: Model identifier or path + trust_remote_code: Whether to trust remote code + + Returns: + The model_type string, or None if it cannot be determined + """ + try: + config = AutoConfig.from_pretrained(pretrained_model_name_or_path, trust_remote_code=trust_remote_code) + return getattr(config, "model_type", None) + except Exception as e: + logger.debug(f"Could not load config to determine model type: {e}") + return None + + +__all__ = [ + "NeMoAutoTokenizer", + "AutoTokenizerWithBosEosEnforced", + "TokenizerRegistry", +] diff --git a/nemo_automodel/_transformers/tokenization/nemo_auto_tokenizer.py b/nemo_automodel/_transformers/tokenization/nemo_auto_tokenizer.py new file mode 100644 index 000000000..848f70f23 --- /dev/null +++ b/nemo_automodel/_transformers/tokenization/nemo_auto_tokenizer.py @@ -0,0 +1,134 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from transformers import AutoTokenizer +from transformers.tokenization_utils_base import BatchEncoding + + +class AutoTokenizerWithBosEosEnforced(AutoTokenizer): + """ + A wrapper around HuggingFace's AutoTokenizer that ensures consistent BOS/EOS token handling. + + There are pre-existing issues with some tokenizers (e.g. GPT2Tokenizer) where the BOS/EOS tokens + are not added automatically. This wrapper ensures they are always added when requested. + """ + + @classmethod + def from_pretrained( + cls, pretrained_model_name_or_path, *args, force_hf=False, add_bos_token=True, add_eos_token=True, **kwargs + ): + """ + Load the HF tokenizer class via AutoTokenizer and (optionally) wrap it to add BOS/EOS. + + Args: + pretrained_model_name_or_path: Model identifier or path + force_hf: If True, return the raw HF tokenizer without wrapping + add_bos_token: Whether to add BOS token (default: True) + add_eos_token: Whether to add EOS token (default: True) + """ + hf_tok = super().from_pretrained(pretrained_model_name_or_path, *args, **kwargs) + if force_hf: + return hf_tok + + return cls(hf_tok, add_bos_token=add_bos_token, add_eos_token=add_eos_token) + + def __init__(self, base_tokenizer, *, add_bos_token: bool, add_eos_token: bool): + self._base_tokenizer = base_tokenizer + self._add_bos = bool(add_bos_token) + self._add_eos = bool(add_eos_token) + + @property + def add_bos_token(self): + return self._add_bos + + @property + def add_eos_token(self): + return self._add_eos + + def __getattr__(self, name): + base = object.__getattribute__(self, "_base_tokenizer") + return getattr(base, name) + + def __setattr__(self, name, value): + # Route writes to the underlying tokenizer when appropriate + internal_fields = {"_base_tokenizer", "_add_bos", "_add_eos"} + if name in internal_fields: + return object.__setattr__(self, name, value) + base = self.__dict__.get("_base_tokenizer", None) + if base is not None and hasattr(base, name): + return setattr(base, name, value) + return object.__setattr__(self, name, value) + + def __call__(self, *args, **kwargs): + tokenized = self._base_tokenizer(*args, **kwargs) + if not kwargs.get("add_special_tokens", True): + return tokenized + if isinstance(tokenized, BatchEncoding): + _tokenized_keys = {"input_ids", "attention_mask", "assistant_masks"} + add_bos_ids = self._add_bos and (getattr(self, "bos_token_id", None) is not None) + add_eos_ids = self._add_eos and (getattr(self, "eos_token_id", None) is not None) + if not "input_ids" in tokenized: + return tokenized + if add_bos_ids: + add_bos_ids = _add_token(tokenized, self.bos_token_id, 0, "input_ids") + if add_eos_ids: + add_eos_ids = _add_token(tokenized, self.eos_token_id, -1, "input_ids") + + for key in {"attention_mask", "assistant_masks"}: + if key not in tokenized: + continue + if add_bos_ids: + _add_token(tokenized, 1, 0, key) + if add_eos_ids: + _add_token(tokenized, 1, -1, key) + return tokenized + + def encode(self, *args, **kwargs): + encoded = self._base_tokenizer.encode(*args, **kwargs) + if not kwargs.get("add_special_tokens", True): + return encoded + if self._add_bos: + if encoded and (getattr(self, "bos_token_id", None) is not None) and encoded[0] != self.bos_token_id: + encoded = [self.bos_token_id] + encoded + if self._add_eos: + if encoded and (getattr(self, "eos_token_id", None) is not None) and encoded[-1] != self.eos_token_id: + encoded = encoded + [self.eos_token_id] + return encoded + + +def _add_token(tokenized, value, position, key): + def _extend_single(sequence, val, pos, always_add): + if pos == 0: + if always_add or not sequence or sequence[0] != val: + return [val] + sequence, True + return sequence, False + if pos == -1: + if always_add or not sequence or sequence[-1] != val: + return sequence + [val], True + return sequence, False + raise ValueError(f"Invalid position: {pos}") + + sequences = tokenized[key] + always_add = key != "input_ids" + if isinstance(sequences, list) and sequences and isinstance(sequences[0], list): + ans = [_extend_single(seq, value, position, always_add) for seq in sequences] + tokenized[key] = list(map(lambda x: x[0], ans)) + return any(map(lambda x: x[1], ans)) + elif isinstance(sequences, list): + ans = _extend_single(sequences, value, position, always_add) + tokenized[key] = ans[0] + return ans[1] + else: + raise ValueError(f"Invalid sequence type: {type(sequences)}") + return False diff --git a/nemo_automodel/_transformers/tokenization/registry.py b/nemo_automodel/_transformers/tokenization/registry.py new file mode 100644 index 000000000..db6311117 --- /dev/null +++ b/nemo_automodel/_transformers/tokenization/registry.py @@ -0,0 +1,89 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from dataclasses import dataclass, field +from typing import Callable, Dict, Type, Union + +from nemo_automodel._transformers.tokenization.nemo_auto_tokenizer import AutoTokenizerWithBosEosEnforced + +logger = logging.getLogger(__name__) + + +@dataclass +class _TokenizerRegistry: + """ + Registry for custom tokenizer implementations. + + Maps model types (from config) to tokenizer classes or factory functions. + """ + + # Maps model_type -> tokenizer class or factory function + model_type_to_tokenizer: Dict[str, Union[Type, Callable]] = field(default_factory=dict) + + # Default tokenizer class when no custom implementation is found + default_tokenizer_cls: Type = AutoTokenizerWithBosEosEnforced + + def register(self, model_type: str, tokenizer_cls: Union[Type, Callable]) -> None: + """ + Register a custom tokenizer for a specific model type. + + Args: + model_type: The model type string (e.g., "mistral", "llama") + tokenizer_cls: The tokenizer class or factory function + """ + self.model_type_to_tokenizer[model_type] = tokenizer_cls + logger.debug(f"Registered tokenizer {tokenizer_cls} for model type '{model_type}'") + + def get_tokenizer_cls(self, model_type: str) -> Union[Type, Callable]: + """ + Get the tokenizer class for a given model type. + + Args: + model_type: The model type string + + Returns: + The registered tokenizer class, or the default if not found + """ + return self.model_type_to_tokenizer.get(model_type, self.default_tokenizer_cls) + + def has_custom_tokenizer(self, model_type: str) -> bool: + """Check if a custom tokenizer is registered for the given model type.""" + return model_type in self.model_type_to_tokenizer + + +# Global tokenizer registry +TokenizerRegistry = _TokenizerRegistry() + + +def _register_default_tokenizers(): + """Register default custom tokenizer implementations.""" + try: + from nemo_automodel._transformers.tokenization.tokenization_mistral_common import MistralCommonBackend + + # Register for Mistral model types + TokenizerRegistry.register("mistral", MistralCommonBackend) + TokenizerRegistry.register("pixtral", MistralCommonBackend) + TokenizerRegistry.register("mistral3", MistralCommonBackend) + except ImportError: + logger.debug("MistralCommonBackend not available, skipping registration") + + +# Register defaults on module load +_register_default_tokenizers() + + +__all__ = [ + "TokenizerRegistry", +] diff --git a/nemo_automodel/_transformers/tokenization/tokenization_mistral_common.py b/nemo_automodel/_transformers/tokenization/tokenization_mistral_common.py new file mode 100644 index 000000000..5ac349f22 --- /dev/null +++ b/nemo_automodel/_transformers/tokenization/tokenization_mistral_common.py @@ -0,0 +1,2004 @@ +# Permalink: https://github.com/huggingface/transformers/blob/dd24a80666b72c85f02c6cf9df18164cc174ab74/src/transformers/tokenization_mistral_common.py#L1 +# Copyright 2025 Mistral AI and The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import re +import shutil +import warnings +from collections.abc import Callable, Mapping, Sized +from enum import Enum +from pathlib import Path +from typing import Any, Union, overload + +import numpy as np +from huggingface_hub import create_repo +from transformers.audio_utils import load_audio_as +from transformers.tokenization_utils_base import ( + LARGE_INTEGER, + VERY_LARGE_INTEGER, + BatchEncoding, + EncodedInput, + PreTokenizedInput, + TextInput, + TruncationStrategy, +) +from transformers.utils import PaddingStrategy, TensorType, add_end_docstrings, logging, to_py_obj +from transformers.utils.generic import is_torch_tensor +from transformers.utils.hub import PushToHubMixin +from transformers.utils.import_utils import is_mistral_common_available, is_torch_available, requires + + +class ValidationMode(Enum): + r"""Enum for the validation mode. + + Attributes: + serving: The serving mode. + finetuning: The finetuning mode. + test: The test mode. + + Examples: + >>> mode = ValidationMode.serving + """ + + serving = "serving" + finetuning = "finetuning" + test = "test" + + +if is_mistral_common_available(): + from mistral_common.protocol.instruct.request import ChatCompletionRequest + from mistral_common.tokens.tokenizers.base import SpecialTokenPolicy, TokenizerVersion + from mistral_common.tokens.tokenizers.image import MultiModalVersion + from mistral_common.tokens.tokenizers.mistral import MistralTokenizer + from mistral_common.tokens.tokenizers.tekken import Tekkenizer + from mistral_common.tokens.tokenizers.utils import download_tokenizer_from_hf_hub + + +if is_torch_available(): + import torch + + +logger = logging.get_logger(__name__) + + +ENCODE_KWARGS_DOCSTRING = r""" + add_special_tokens (`bool`, *optional*, defaults to `True`): + Whether or not to add special tokens when encoding the sequences. This will use the underlying + `PretrainedTokenizerBase.build_inputs_with_special_tokens` function, which defines which tokens are + automatically added to the input ids. This is useful if you want to add `bos` or `eos` tokens + automatically. When Tokenizer is loading with `finetuning` mode it adds both `bos` and `eos`. Else, for "test" mode it only adds `bos`. + padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `False`): + Activates and controls padding. Accepts the following values: + + - `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single + sequence is provided). + - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum + acceptable input length for the model if that argument is not provided. + - `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different + lengths). + truncation (`bool`, `str` or [`~tokenization_utils_base.TruncationStrategy`], *optional*, defaults to `False`): + Activates and controls truncation. Accepts the following values: + + - `True` or `'longest_first'`: Truncate to a maximum length specified with the argument `max_length` or + to the maximum acceptable input length for the model if that argument is not provided. + - `False` or `'do_not_truncate'` (default): No truncation (i.e., can output batch with sequence lengths + greater than the model maximum admissible input size). + max_length (`int`, *optional*): + Controls the maximum length to use by one of the truncation/padding parameters. + + If left unset or set to `None`, this will use the predefined model maximum length if a maximum length + is required by one of the truncation/padding parameters. If the model has no specific maximum input + length (like XLNet) truncation/padding to a maximum length will be deactivated. + stride (`int`, *optional*, defaults to 0): + If set to a number along with `max_length`, the overflowing tokens returned when + `return_overflowing_tokens=True` will contain some tokens from the end of the truncated sequence + returned to provide some overlap between truncated and overflowing sequences. The value of this + argument defines the number of overlapping tokens. + pad_to_multiple_of (`int`, *optional*): + If set will pad the sequence to a multiple of the provided value. Requires `padding` to be activated. + This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability + `>= 7.5` (Volta). + padding_side (`str`, *optional*): + The side on which the model should have padding applied. Should be selected between ['right', 'left']. + Default value is picked from the class attribute of the same name. + return_tensors (`str` or [`~utils.TensorType`], *optional*): + If set, will return tensors instead of list of python integers. Acceptable values are: + + - `'pt'`: Return PyTorch `torch.Tensor` objects. +""" + +ENCODE_PLUS_ADDITIONAL_KWARGS_DOCSTRING = r""" + return_attention_mask (`bool`, *optional*): + Whether to return the attention mask. If left to the default, will return the attention mask according + to the specific tokenizer's default, defined by the `return_outputs` attribute. + + [What are attention masks?](../glossary#attention-mask) + return_overflowing_tokens (`bool`, *optional*, defaults to `False`): + Whether or not to return overflowing token sequences. If a pair of sequences of input ids (or a batch + of pairs) is provided with `truncation_strategy = longest_first` or `True`, an error is raised instead + of returning overflowing tokens. + return_special_tokens_mask (`bool`, *optional*, defaults to `False`): + Whether or not to return special tokens mask information. + return_length (`bool`, *optional*, defaults to `False`): + Whether or not to return the lengths of the encoded inputs. + verbose (`bool`, *optional*, defaults to `True`): + Whether or not to print more information and warnings. + **kwargs: passed to the `self.tokenize()` method + + Return: + [`BatchEncoding`]: A [`BatchEncoding`] with the following fields: + + - **input_ids** -- List of token ids to be fed to a model. + + [What are input IDs?](../glossary#input-ids) + + - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when + `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names`). + + [What are attention masks?](../glossary#attention-mask) + + - **overflowing_tokens** -- List of overflowing tokens sequences (when a `max_length` is specified and + `return_overflowing_tokens=True`). + - **num_truncated_tokens** -- Number of tokens truncated (when a `max_length` is specified and + `return_overflowing_tokens=True`). + - **special_tokens_mask** -- List of 0s and 1s, with 1 specifying added special tokens and 0 specifying + regular sequence tokens (when `add_special_tokens=True` and `return_special_tokens_mask=True`). + - **length** -- The length of the inputs (when `return_length=True`) +""" + + +class MistralTokenizerType(str, Enum): + """Enum for the different type of tokenizer.""" + + spm = "spm" + tekken = "tekken" + + +@requires(backends=("mistral-common",)) +class MistralCommonBackend(PushToHubMixin): + """ + Class to wrap `mistral-common` tokenizers. + + `mistral-common` is the official tokenizer library for Mistral AI models. To use it, you need to install it with: + + ```bash + pip install transformers[mistral-common] + ``` + + Otherwise the tokenizer falls back to the Transformers implementation of the tokenizer. + + For more info on `mistral-common`, see [mistral-common](https://github.com/mistralai/mistral-common). + + This class is a wrapper around a `mistral_common.tokens.tokenizers.mistral.MistralTokenizer`. + It provides a Hugging Face compatible interface to tokenize using the official mistral-common tokenizer. + + Supports the following methods from the `PreTrainedTokenizerBase` class: + + - [`~MistralCommonBackend.get_vocab`]: Returns the vocabulary as a dictionary of token to index. + This is a lossy conversion for Tekkenizer as some decoding errors are collapsed into the same token. + - [`~MistralCommonBackend.encode`]: Encode a string to a list of integers. + - [`~MistralCommonBackend.decode`]: Decode a list of integers to a string. + - [`~MistralCommonBackend.batch_decode`]: Decode a batch of list of integers to a list of strings. + - [`~MistralCommonBackend.convert_tokens_to_ids`]: Convert a list of tokens to a list of integers. + - [`~MistralCommonBackend.convert_ids_to_tokens`]: Convert a list of integers to a list of tokens. + - [`~MistralCommonBackend.tokenize`]: Tokenize a string. + - [`~MistralCommonBackend.get_special_tokens_mask`]: Get the special tokens mask for a list of tokens. + - [`~MistralCommonBackend.prepare_for_model`]: Prepare a list of inputs for the model. + - [`~MistralCommonBackend.pad`]: Pad a list of inputs to the same length. + - [`~MistralCommonBackend.truncate_sequences`]: Truncate a list of sequences to the same length. + - [`~MistralCommonBackend.apply_chat_template`]: Apply a chat template to a list of messages. + - [`~MistralCommonBackend.__call__`]: Tokenize a string or a list of strings. + - [`~MistralCommonBackend.from_pretrained`]: Download and cache a pretrained tokenizer from the Hugging Face model hub or local directory. + - [`~MistralCommonBackend.save_pretrained`]: Save a tokenizer to a directory, so it can be reloaded using the `from_pretrained` class method. + - [`~MistralCommonBackend.push_to_hub`]: Upload tokenizer to the Hugging Face model hub. + + Here are the key differences with the `PreTrainedTokenizerBase` class: + + - Pair of sequences are not supported. The signature have been kept for compatibility but all arguments related to pair of sequences are ignored. The return values of pairs are returned as `None`. + - The `is_split_into_words` argument is not supported. + - The `return_token_type_ids` argument is not supported. + - It is not possible to add new tokens to the tokenizer. Also the special tokens are handled differently from Transformers. In `mistral-common`, special tokens are never encoded directly. This means that: `tokenizer.encode("")` will not return the ID of the `` token. Instead, it will return a list of IDs corresponding to the tokenization of the string `""`. For more information, see the [mistral-common documentation](https://mistralai.github.io/mistral-common/usage/tokenizers/#special-tokens). + + If you have suggestions to improve this class, please open an issue on the [mistral-common GitHub repository](https://github.com/mistralai/mistral-common/issues) if it is related to the tokenizer or on the [Transformers GitHub repository](https://github.com/huggingface/transformers/issues) if it is related to the Hugging Face interface. + """ + + model_input_names: list[str] = ["input_ids", "attention_mask"] + padding_side: str = "left" + truncation_side: str = "right" + + def __init__( + self, + tokenizer_path: str | os.PathLike | Path, + mode: ValidationMode = ValidationMode.test, + model_max_length: int = VERY_LARGE_INTEGER, + padding_side: str = "left", + truncation_side: str = "right", + model_input_names: list[str] | None = None, + clean_up_tokenization_spaces: bool = False, + **kwargs, + ): + """ + Constructs a `MistralCommonBackend`. + + - **model_input_names** (`list[str]`) -- A list of inputs expected in the forward pass of the model. + - **padding_side** (`str`) -- The default value for the side on which the model should have padding applied. + Should be `'right'` or `'left'`. + - **truncation_side** (`str`) -- The default value for the side on which the model should have truncation + applied. Should be `'right'` or `'left'`. + + Args: + tokenizer_path (`str` or `os.PathLike` or `Path`): + Path to the tokenizer file to load the `MistralTokenizer`. + mode (`Union[str, ValidationMode]`, *optional*, defaults to `ValidationMode.test`): + The mode to use for the tokenizer. This will be passed to the `MistralTokenizer` constructor. Possible values are: + - `"finetuning"` or `ValidationMode.finetuning`: The finetuning mode. + - `"test"` or `ValidationMode.test`: The test mode. + It changes how the tokenizer validates the input and prepares the request to the model. + model_max_length (`int`, *optional*): + The maximum length (in number of tokens) for the inputs to the transformer model. When the tokenizer is + loaded with [`~tokenization_utils_base.PreTrainedTokenizerBase.from_pretrained`], this will be set to the + value stored for the associated model in `max_model_input_sizes` (see above). If no value is provided, will + default to VERY_LARGE_INTEGER (`int(1e30)`). + padding_side (`str`, *optional*): + The side on which the model should have padding applied. Should be selected between ['right', 'left']. + Default value is picked from the class attribute of the same name. + truncation_side (`str`, *optional*): + The side on which the model should have truncation applied. Should be selected between ['right', 'left']. + Default value is picked from the class attribute of the same name. + model_input_names (`List[string]`, *optional*): + The list of inputs accepted by the forward pass of the model (like `"token_type_ids"` or + `"attention_mask"`). Default value is picked from the class attribute of the same name. + clean_up_tokenization_spaces (`bool`, *optional*, defaults to `False`): + Whether or not the model should cleanup the spaces that were added when splitting the input text during the + tokenization process. + """ + if kwargs: + raise ValueError(f"Kwargs {list(kwargs.keys())} are not supported to init `MistralCommonBackend`.") + + self._tokenizer_path = Path(tokenizer_path) + self._mode = self._get_validation_mode(mode) + self.tokenizer: MistralTokenizer = MistralTokenizer.from_file(str(self._tokenizer_path), mode=self._mode) + self._tokenizer_type = ( + MistralTokenizerType.tekken + if isinstance(self.tokenizer.instruct_tokenizer.tokenizer, Tekkenizer) + else MistralTokenizerType.spm + ) + self.truncation_side = truncation_side + self.padding_side = padding_side + self.model_max_length = model_max_length + self.cleanup_tokenization_spaces = clean_up_tokenization_spaces + self.deprecation_warnings = {} # Use to store when we have already noticed a deprecation warning (avoid overlogging). + self._all_special_tokens_ids = self._get_all_special_ids() + + if model_input_names is not None: + if ( + not isinstance(model_input_names, (list, tuple)) + and len(model_input_names) == 0 + and not all(isinstance(i, str) for i in model_input_names) + ): + raise ValueError( + "`model_input_names` should be a non-empty list or tuple of str but got an empty value." + ) + self.model_input_names = model_input_names + + self._cache_get_vocab: dict[str, int] | None = None + + @staticmethod + def clean_up_tokenization(text: str) -> str: + """ + Clean up a list of simple English tokenization artifacts like spaces before punctuation. + """ + return ( + text.replace(" .", ".") + .replace(" ?", "?") + .replace(" !", "!") + .replace(" ,", ",") + .replace(" ' ", "'") + .replace(" n't", "n't") + .replace(" 'm", "'m") + .replace(" 's", "'s") + .replace(" 've", "'ve") + .replace(" 're", "'re") + ) + + @property + def mode(self) -> ValidationMode: + """ + `ValidationMode`: The mode used by the tokenizer. Possible values are: + - `"finetuning"` or `ValidationMode.finetuning`: The finetuning mode. + - `"test"` or `ValidationMode.test`: The test mode. + It changes how the tokenizer validates the input and prepares the request to the model. + """ + return self._mode + + @property + def bos_token_id(self) -> int: + """ + Id of the beginning of sentence token in the vocabulary. + """ + return self.tokenizer.instruct_tokenizer.tokenizer.bos_id + + @property + def eos_token_id(self) -> int: + """ + Id of the end of sentence token in the vocabulary. + """ + return self.tokenizer.instruct_tokenizer.tokenizer.eos_id + + @property + def unk_token_id(self) -> int: + """ + Id of the unknown token in the vocabulary. + """ + return self.tokenizer.instruct_tokenizer.tokenizer.unk_id + + @property + def pad_token_id(self) -> int: + """ + Id of the padding token in the vocabulary. + """ + return self.tokenizer.instruct_tokenizer.tokenizer.pad_id + + @property + def bos_token(self) -> str: + """ + String associated to the beginning of sentence token in the vocabulary. + """ + return self.convert_ids_to_tokens(self.bos_token_id) + + @property + def eos_token(self) -> str: + """ + String associated to the end of sentence token in the vocabulary. + """ + return self.convert_ids_to_tokens(self.eos_token_id) + + @property + def unk_token(self) -> str: + """ + String associated to the unknown token in the vocabulary. + """ + return self.convert_ids_to_tokens(self.unk_token_id) + + @property + def pad_token(self) -> str: + """ + String associated to the padding token in the vocabulary. + """ + return self.convert_ids_to_tokens(self.pad_token_id) + + @property + def all_special_ids(self) -> list[int]: + """ + `list[int]`: List the ids of the special tokens(`''`, `''`, etc.). + """ + return sorted(self._all_special_tokens_ids) + + @property + def all_special_tokens(self) -> list[str]: + """ + `list[str]`: A list of all unique special tokens. + """ + return self.convert_ids_to_tokens(self.all_special_ids) + + @property + def vocab_size(self) -> int: + """ + Returns the size of the vocabulary. + + `int`: Size of the vocabulary. + """ + return self.tokenizer.instruct_tokenizer.tokenizer.n_words + + def get_vocab(self) -> dict[str, int]: + """ + Returns the vocabulary as a dictionary of token to index. + + This is a lossy conversion. There may be multiple token ids that decode to the same + string due to partial UTF-8 byte sequences being converted to �. + + Returns: + `Dict[str, int]`: The vocabulary. + """ + if self._cache_get_vocab is None: + # We reverse the order to make sure that the first token is the one to be returned when there are multiple tokens with the same string representation. + vocab = self.tokenizer.instruct_tokenizer.tokenizer.vocab() + self._cache_get_vocab = {token: self._piece_to_id(token, False) for token in vocab} + # Order the dict. + self._cache_get_vocab = dict(sorted(((k, v) for k, v in self._cache_get_vocab.items()), key=lambda x: x[1])) + return self._cache_get_vocab + + def __len__(self): + """ + Size of the full vocabulary with the added tokens. + """ + return self.vocab_size + + @add_end_docstrings( + ENCODE_KWARGS_DOCSTRING, + """ + **kwargs: Not supported by `MistralCommonBackend.encode`. + Will raise an error if used. + """, + """ + Returns: + `list[int]`, `torch.Tensor`: The tokenized ids of the text. + """, + ) + def encode( + self, + text: TextInput | EncodedInput, + text_pair: None = None, + add_special_tokens: bool = True, + padding: bool | str | PaddingStrategy = False, + truncation: bool | str | TruncationStrategy | None = None, + max_length: int | None = None, + stride: int = 0, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_tensors: str | TensorType | None = None, + verbose: bool = True, + **kwargs, + ) -> list[int]: + """ + Converts a string to a sequence of ids (integer), using the tokenizer and vocabulary. + + Args: + text (`str` or `list[int]`): + The first sequence to be encoded. This can be a string or a list of integers (tokenized string ids). + text_pair (`None`, *optional*): + Not supported by `MistralCommonBackend.encode`. Kept to match `PreTrainedTokenizerBase.encode` signature. + """ + if kwargs: + raise ValueError(f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.encode`.") + if text_pair: + raise ValueError("`MistralCommonBackend.encode` does not support `text_pair`.") + + padding_strategy, truncation_strategy, max_length, _ = self._get_padding_truncation_strategies( + padding=padding, + truncation=truncation, + max_length=max_length, + pad_to_multiple_of=pad_to_multiple_of, + verbose=verbose, + ) + + encoded_inputs = self._encode_plus( + text, + add_special_tokens=add_special_tokens, + padding_strategy=padding_strategy, + truncation_strategy=truncation_strategy, + max_length=max_length, + stride=stride, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_tensors=return_tensors, + return_attention_mask=False, + return_overflowing_tokens=False, + return_special_tokens_mask=False, + return_length=False, + verbose=verbose, + ) + + return encoded_inputs["input_ids"] + + def decode( + self, + token_ids: Union[int, list[int], list[list[int]], np.ndarray, "torch.Tensor"], + skip_special_tokens: bool = False, + clean_up_tokenization_spaces: bool | None = None, + **kwargs, + ) -> Union[str, list[str]]: + """ + Converts a sequence of ids in a string, using the tokenizer and vocabulary with options to remove special + tokens and clean up tokenization spaces. + + Args: + token_ids (`Union[int, list[int], list[list[int]], np.ndarray, torch.Tensor]`): + A single sequence or a batch (list of sequences) of tokenized input ids. Can be obtained using the + `__call__` method. + skip_special_tokens (`bool`, *optional*, defaults to `False`): + Whether or not to remove special tokens in the decoding. + clean_up_tokenization_spaces (`bool`, *optional*): + Whether or not to clean up the tokenization spaces. If `None`, will default to + `self.clean_up_tokenization_spaces`. + kwargs (additional keyword arguments, *optional*): + Not supported by `MistralCommonBackend.decode`. + Will raise an error if used. + + Returns: + `Union[str, list[str]]`: The decoded string for a single sequence, or a list of decoded strings for a + batch of sequences. + """ + if kwargs: + raise ValueError(f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.decode`.") + + token_ids = to_py_obj(token_ids) + + if isinstance(token_ids, (list, tuple)) and len(token_ids) > 0 and isinstance(token_ids[0], (list, tuple)): + return self._batch_decode( + sequences=token_ids, + skip_special_tokens=skip_special_tokens, + clean_up_tokenization_spaces=clean_up_tokenization_spaces, + ) + + return self._decode( + token_ids=token_ids, + skip_special_tokens=skip_special_tokens, + clean_up_tokenization_spaces=clean_up_tokenization_spaces, + ) + + def batch_decode( + self, + sequences: Union[list[int], list[list[int]], np.ndarray, "torch.Tensor"], + skip_special_tokens: bool = False, + clean_up_tokenization_spaces: bool | None = None, + **kwargs, + ) -> list[str]: + """ + Convert a list of lists of token ids into a list of strings by calling decode. + + This method is provided for backwards compatibility. The `decode` method now handles batched input natively, + so you can use `decode` directly instead of `batch_decode`. + + Args: + sequences (`Union[list[int], list[list[int]], np.ndarray, torch.Tensor]`): + List of tokenized input ids. Can be obtained using the `__call__` method. + skip_special_tokens (`bool`, *optional*, defaults to `False`): + Whether or not to remove special tokens in the decoding. + clean_up_tokenization_spaces (`bool`, *optional*): + Whether or not to clean up the tokenization spaces. If `None`, will default to + `self.clean_up_tokenization_spaces`. + kwargs (additional keyword arguments, *optional*): + Not supported by `MistralCommonBackend.batch_decode`. + Will raise an error if used. + + Returns: + `list[str]`: The list of decoded sentences. + """ + if kwargs: + raise ValueError(f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.batch_decode`.") + + return self._batch_decode( + sequences=sequences, + skip_special_tokens=skip_special_tokens, + clean_up_tokenization_spaces=clean_up_tokenization_spaces, + ) + + def _decode( + self, + token_ids: Union[int, list[int], list[list[int]], np.ndarray, "torch.Tensor"], + skip_special_tokens: bool = False, + clean_up_tokenization_spaces: bool | None = None, + ) -> str: + clean_up_tokenization_spaces = clean_up_tokenization_spaces or self.cleanup_tokenization_spaces + + # Convert inputs to python lists + if isinstance(token_ids, int): + token_ids = [token_ids] + + token_ids = to_py_obj(token_ids) + + special_token_policy = SpecialTokenPolicy.IGNORE if skip_special_tokens else SpecialTokenPolicy.KEEP + + decoded_string = self.tokenizer.decode(token_ids, special_token_policy=special_token_policy) + if clean_up_tokenization_spaces: + decoded_string = self.clean_up_tokenization(decoded_string) + + # in the specific case of Voxtral, the added f"lang:xx" (always a two char language code since it follows ISO 639-1 alpha-2 format) + # is not considered as a special token by mistral-common and is encoded/ decoded as normal text. + # Nevertheless we should remove it to ease users life. + if skip_special_tokens: + decoded_string = re.sub(r"^lang:[a-z]{2}", "", decoded_string) + + return decoded_string + + def _batch_decode( + self, + sequences: Union[list[int], list[list[int]], np.ndarray, "torch.Tensor"], + skip_special_tokens: bool = False, + clean_up_tokenization_spaces: bool | None = None, + ) -> list[str]: + return [ + self._decode( + seq, + skip_special_tokens=skip_special_tokens, + clean_up_tokenization_spaces=clean_up_tokenization_spaces, + ) + for seq in sequences + ] + + def _is_control_token(self, token_id: int) -> bool: + if self._tokenizer_type == MistralTokenizerType.spm: + return token_id in self.tokenizer.instruct_tokenizer.tokenizer._control_tokens + elif self._tokenizer_type == MistralTokenizerType.tekken: + return token_id < self.tokenizer.instruct_tokenizer.tokenizer.num_special_tokens + else: + raise ValueError(f"Unknown tokenizer type: {self._tokenizer_type}") + + @overload + def convert_ids_to_tokens(self, ids: int, skip_special_tokens: bool = False) -> str: ... + @overload + def convert_ids_to_tokens(self, ids: list[int], skip_special_tokens: bool = False) -> list[str]: ... + def convert_ids_to_tokens(self, ids: int | list[int], skip_special_tokens: bool = False) -> str | list[str]: + """ + Converts a single index or a sequence of indices in a token or a sequence of tokens, using the vocabulary and + added tokens. + + Args: + ids (`int` or `list[int]`): + The token id (or token ids) to convert to tokens. + skip_special_tokens (`bool`, *optional*, defaults to `False`): + Whether or not to remove special tokens in the decoding. + + Returns: + `str` or `list[str]`: The decoded token(s). + """ + + if isinstance(ids, int): + one_token = True + ids = [ids] + else: + one_token = False + + tokens: list[str] = [] + for token_id in ids: + if self._is_control_token(token_id) and skip_special_tokens: + continue + tokens.append(self.tokenizer.instruct_tokenizer.tokenizer.id_to_piece(token_id)) + + if one_token: + if tokens == []: + raise ValueError(f"Invalid token id {ids}.") + + return tokens[0] + return tokens + + def _tekken_piece_to_id(self, piece: str, warn: bool) -> int: + tekken_tokenizer = self.tokenizer.instruct_tokenizer.tokenizer + assert isinstance(tekken_tokenizer, Tekkenizer), type(tekken_tokenizer) + + piece_bytes = piece.encode("utf-8") + shift = tekken_tokenizer.num_special_tokens + try: + return shift + tekken_tokenizer._tekken_token2id_nospecial[piece_bytes] + except KeyError: + piece_str = piece_bytes.decode("utf-8") + if piece_str in tekken_tokenizer._special_tokens_reverse_vocab: + return tekken_tokenizer._special_tokens_reverse_vocab[piece_str] + if warn: + logger.warning("Failed to convert token %s to id, replacing with ", piece_bytes) + return tekken_tokenizer.unk_id + + def _piece_to_id(self, piece: str, warn: bool) -> int: + if self._tokenizer_type == MistralTokenizerType.spm: + return self.tokenizer.instruct_tokenizer.tokenizer._model.piece_to_id(piece) + elif self._tokenizer_type == MistralTokenizerType.tekken: + return self._tekken_piece_to_id(piece, warn) + else: + raise ValueError(f"Unknown tokenizer type: {self._tokenizer_type}") + + def convert_tokens_to_ids(self, tokens: str | list[str]) -> int | list[int]: + """ + Converts a token string (or a sequence of tokens) in a single integer id (or a sequence of ids), using the + vocabulary. + + Args: + tokens (`str` or `list[str]`): One or several token(s) to convert to token id(s). + + Returns: + `int` or `list[int]`: The token id or list of token ids. + """ + + if isinstance(tokens, str): + one_token = True + tokens = [tokens] + else: + one_token = False + + ids: list[int] = [] + for token in tokens: + ids.append(self._piece_to_id(token, True)) + + if one_token: + return ids[0] + return ids + + def _text_to_ids(self, text: TextInput, add_special_tokens: bool) -> list[int]: + """ + Converts a string into a sequence of tokens ids, using the tokenizer. + """ + add_eos = add_special_tokens and self._mode == ValidationMode.finetuning + tokens_ids = self.tokenizer.instruct_tokenizer.tokenizer.encode(text, bos=add_special_tokens, eos=add_eos) + return tokens_ids + + def tokenize(self, text: TextInput, **kwargs) -> list[str]: + """ + Converts a string into a sequence of tokens, using the tokenizer. + + Split in words for word-based vocabulary or sub-words for sub-word-based vocabularies. + + Args: + text (`str`): + The sequence to be encoded. + **kwargs (additional keyword arguments): + Not supported by `MistralCommonBackend.tokenize`. + Will raise an error if used. + + Returns: + `list[str]`: The list of tokens. + """ + if kwargs: + raise ValueError(f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.tokenize`.") + + return self.convert_ids_to_tokens(self._text_to_ids(text, add_special_tokens=False), skip_special_tokens=False) + + def _encode_plus( + self, + text: TextInput | EncodedInput, + add_special_tokens: bool = True, + padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD, + truncation_strategy: TruncationStrategy = TruncationStrategy.DO_NOT_TRUNCATE, + max_length: int | None = None, + stride: int = 0, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_tensors: str | TensorType | None = None, + return_attention_mask: bool | None = None, + return_overflowing_tokens: bool = False, + return_special_tokens_mask: bool = False, + return_length: bool = False, + verbose: bool = True, + ) -> BatchEncoding: + def get_input_ids(text): + if isinstance(text, str): + return self._text_to_ids(text, add_special_tokens) + elif isinstance(text, (list, tuple)) and len(text) > 0 and isinstance(text[0], int): + return text + else: + raise ValueError(f"Input {text} is not valid. Should be a string, or a list/tuple of integers.") + + ids = get_input_ids(text) + + return self.prepare_for_model( + ids, + add_special_tokens=add_special_tokens, + padding=padding_strategy.value, + truncation=truncation_strategy.value, + max_length=max_length, + stride=stride, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_tensors=return_tensors, + prepend_batch_axis=True, + return_attention_mask=return_attention_mask, + return_overflowing_tokens=return_overflowing_tokens, + return_special_tokens_mask=return_special_tokens_mask, + return_length=return_length, + verbose=verbose, + ) + + def _batch_encode_plus( + self, + batch_text: list[TextInput] | list[EncodedInput], + add_special_tokens: bool = True, + padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD, + truncation_strategy: TruncationStrategy = TruncationStrategy.DO_NOT_TRUNCATE, + max_length: int | None = None, + stride: int = 0, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_tensors: str | TensorType | None = None, + return_attention_mask: bool | None = None, + return_overflowing_tokens: bool = False, + return_special_tokens_mask: bool = False, + return_length: bool = False, + verbose: bool = True, + ) -> BatchEncoding: + def get_input_ids(text): + if isinstance(text, str): + return self._text_to_ids(text, add_special_tokens) + elif isinstance(text, (list, tuple)) and len(text) > 0 and isinstance(text[0], int): + return text + else: + raise ValueError("Input is not valid. Should be a string or a list/tuple of integers.") + + input_ids = [] + for ids in batch_text: + input_ids.append(get_input_ids(ids)) + + batch_outputs = self._batch_prepare_for_model( + input_ids, + add_special_tokens=add_special_tokens, + padding_strategy=padding_strategy, + truncation_strategy=truncation_strategy, + max_length=max_length, + stride=stride, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_attention_mask=return_attention_mask, + return_overflowing_tokens=return_overflowing_tokens, + return_special_tokens_mask=return_special_tokens_mask, + return_length=return_length, + return_tensors=return_tensors, + verbose=verbose, + ) + + return BatchEncoding(batch_outputs) + + def _get_all_special_ids(self) -> set[int]: + if self._tokenizer_type == MistralTokenizerType.tekken: + return {t["rank"] for t in self.tokenizer.instruct_tokenizer.tokenizer._all_special_tokens} + elif self._tokenizer_type == MistralTokenizerType.spm: + return self.tokenizer.instruct_tokenizer.tokenizer._control_tokens + else: + raise ValueError(f"Unknown tokenizer type: {self._tokenizer_type}") + + def get_special_tokens_mask( + self, token_ids_0: list, token_ids_1: None = None, already_has_special_tokens: bool = False + ) -> list[int]: + """ + Retrieves sequence ids from a token list that has no special tokens added. This method is called when adding + special tokens using the tokenizer `prepare_for_model` or `encode_plus` methods. + + Args: + token_ids_0 (`list[int]`): + List of ids of the sequence. + token_ids_1 (`list[int]`, *optional*): + Not supported by `MistralCommonBackend`. Kept to match the interface of `PreTrainedTokenizerBase`. + already_has_special_tokens (`bool`, *optional*, defaults to `False`): + Whether or not the token list is already formatted with special tokens for the model. + + Returns: + A list of integers in the range [0, 1]: 1 for a special token, 0 for a sequence token. + """ + if token_ids_1 is not None: + raise ValueError( + "`token_ids_1` is not supported by `MistralCommonBackend` and should be `None`, kept for compatibility." + ) + if already_has_special_tokens: + raise ValueError( + "`already_has_special_tokens` is not supported by `MistralCommonBackend` and should be `False`." + ) + + special_tokens_mask = [1 if token in self._all_special_tokens_ids else 0 for token in token_ids_0] + return special_tokens_mask + + def _batch_prepare_for_model( + self, + batch_ids: list[PreTokenizedInput | list[int]], + add_special_tokens: bool = True, + padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD, + truncation_strategy: TruncationStrategy = TruncationStrategy.DO_NOT_TRUNCATE, + max_length: int | None = None, + stride: int = 0, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_tensors: str | None = None, + return_attention_mask: bool | None = None, + return_overflowing_tokens: bool = False, + return_special_tokens_mask: bool = False, + return_length: bool = False, + verbose: bool = True, + ) -> BatchEncoding: + """ + Prepares a sequence of input id so that it can be used by the model. It + adds special tokens, truncates sequences if overflowing while taking into account the special tokens and + manages a moving window (with user defined stride) for overflowing tokens. + + Args: + batch_ids: list of tokenized input ids + """ + + batch_outputs = {} + for ids in batch_ids: + outputs = self.prepare_for_model( + ids, + add_special_tokens=add_special_tokens, + padding=PaddingStrategy.DO_NOT_PAD.value, # we pad in batch afterward + truncation=truncation_strategy.value, + max_length=max_length, + stride=stride, + pad_to_multiple_of=None, # we pad in batch afterward + padding_side=None, # we pad in batch afterward + return_attention_mask=False, # we pad in batch afterward + return_overflowing_tokens=return_overflowing_tokens, + return_special_tokens_mask=return_special_tokens_mask, + return_length=return_length, + return_tensors=None, # We convert the whole batch to tensors at the end + prepend_batch_axis=False, + verbose=verbose, + ) + + for key, value in outputs.items(): + if key not in batch_outputs: + batch_outputs[key] = [] + batch_outputs[key].append(value) + + batch_outputs = self.pad( + batch_outputs, + padding=padding_strategy.value, + max_length=max_length, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_attention_mask=return_attention_mask, + ) + + batch_outputs = BatchEncoding(batch_outputs, tensor_type=return_tensors) + + return batch_outputs + + @add_end_docstrings(ENCODE_KWARGS_DOCSTRING, ENCODE_PLUS_ADDITIONAL_KWARGS_DOCSTRING) + def prepare_for_model( + self, + ids: list[int], + pair_ids: None = None, + add_special_tokens: bool = True, + padding: bool | str | PaddingStrategy = False, + truncation: bool | str | TruncationStrategy | None = None, + max_length: int | None = None, + stride: int = 0, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_tensors: str | TensorType | None = None, + return_attention_mask: bool | None = None, + return_overflowing_tokens: bool = False, + return_special_tokens_mask: bool = False, + return_length: bool = False, + verbose: bool = True, + prepend_batch_axis: bool = False, + **kwargs, + ) -> BatchEncoding: + """ + Prepares a sequence of input id so that it can be used by the model. It + adds special tokens, truncates sequences if overflowing while taking into account the special tokens and + manages a moving window (with user defined stride) for overflowing tokens. + + Args: + ids (`list[int]`): + Tokenized input ids of the first sequence. + pair_ids (`None`, *optional*): + Not supported by `MistralCommonBackend`. Kept to match the interface of `PreTrainedTokenizerBase`. + """ + if pair_ids is not None: + raise ValueError( + "`pair_ids` is not supported by `MistralCommonBackend` and should be `None`, kept for compatibility." + ) + if kwargs: + raise ValueError( + f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.prepare_for_model`." + ) + + padding_strategy, truncation_strategy, max_length, _ = self._get_padding_truncation_strategies( + padding=padding, + truncation=truncation, + max_length=max_length, + pad_to_multiple_of=pad_to_multiple_of, + verbose=verbose, + ) + + len_ids = len(ids) + + # Load from model defaults + if return_attention_mask is None: + return_attention_mask = "attention_mask" in self.model_input_names + + encoded_inputs = {} + + # Truncation: Handle max sequence length + overflowing_tokens = [] + if truncation_strategy != TruncationStrategy.DO_NOT_TRUNCATE and max_length and len_ids > max_length: + ids, _, overflowing_tokens = self.truncate_sequences( + ids, + num_tokens_to_remove=len_ids - max_length, + truncation_strategy=truncation_strategy, + stride=stride, + ) + + if return_overflowing_tokens: + encoded_inputs["overflowing_tokens"] = overflowing_tokens + encoded_inputs["num_truncated_tokens"] = len_ids - max_length + + # Build output dictionary + encoded_inputs[self.model_input_names[0]] = ids + if return_special_tokens_mask: + if add_special_tokens: + encoded_inputs["special_tokens_mask"] = self.get_special_tokens_mask(ids, None) + else: + encoded_inputs["special_tokens_mask"] = [0] * len(ids) + + # Padding + if padding_strategy != PaddingStrategy.DO_NOT_PAD or return_attention_mask: + encoded_inputs = self.pad( + encoded_inputs, + max_length=max_length, + padding=padding_strategy.value, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_attention_mask=return_attention_mask, + ) + + if return_length: + encoded_inputs["length"] = len(encoded_inputs["input_ids"]) + + batch_outputs = BatchEncoding(encoded_inputs, tensor_type=return_tensors, prepend_batch_axis=prepend_batch_axis) + + return batch_outputs + + def _get_padding_truncation_strategies( + self, + padding: str | PaddingStrategy | bool = False, + truncation: str | TruncationStrategy | bool | None = None, + max_length: int | None = None, + pad_to_multiple_of: int | None = None, + verbose: bool = True, + **kwargs, + ): + """ + Find the correct padding/truncation strategy. + """ + + # Backward compatibility for previous behavior, maybe we should deprecate it: + # If you only set max_length, it activates truncation for max_length + if max_length is not None and padding is False and truncation is None: + if verbose: + if not self.deprecation_warnings.get("Truncation-not-explicitly-activated", False): + logger.warning( + "Truncation was not explicitly activated but `max_length` is provided a specific value, please" + " use `truncation=True` to explicitly truncate examples to max length. Defaulting to" + " 'longest_first' truncation strategy." + ) + self.deprecation_warnings["Truncation-not-explicitly-activated"] = True + truncation = "longest_first" + + # Get padding strategy + if padding is not False: + if padding is True: + if verbose: + if max_length is not None and ( + truncation is None or truncation is False or truncation == "do_not_truncate" + ): + warnings.warn( + "`max_length` is ignored when `padding`=`True` and there is no truncation strategy. " + "To pad to max length, use `padding='max_length'`." + ) + padding_strategy = PaddingStrategy.LONGEST # Default to pad to the longest sequence in the batch + elif not isinstance(padding, PaddingStrategy): + padding_strategy = PaddingStrategy(padding) + elif isinstance(padding, PaddingStrategy): + padding_strategy = padding + else: + padding_strategy = PaddingStrategy.DO_NOT_PAD + + # Get truncation strategy + if truncation is not False and truncation is not None: + if truncation is True: + truncation_strategy = ( + TruncationStrategy.LONGEST_FIRST + ) # Default to truncate the longest sequences in pairs of inputs + elif not isinstance(truncation, TruncationStrategy): + truncation_strategy = TruncationStrategy(truncation) + elif isinstance(truncation, TruncationStrategy): + truncation_strategy = truncation + if truncation in [TruncationStrategy.ONLY_FIRST, TruncationStrategy.ONLY_SECOND]: + raise ValueError( + "Truncation strategy `only_first` and `only_second` are not supported by `MistralCommonBackend`." + ) + else: + truncation_strategy = TruncationStrategy.DO_NOT_TRUNCATE + + # Set max length if needed + if max_length is None: + if padding_strategy == PaddingStrategy.MAX_LENGTH: + if self.model_max_length > LARGE_INTEGER: + if verbose: + if not self.deprecation_warnings.get("Asking-to-pad-to-max_length", False): + logger.warning( + "Asking to pad to max_length but no maximum length is provided and the model has no" + " predefined maximum length. Default to no padding." + ) + self.deprecation_warnings["Asking-to-pad-to-max_length"] = True + padding_strategy = PaddingStrategy.DO_NOT_PAD + else: + max_length = self.model_max_length + + if truncation_strategy != TruncationStrategy.DO_NOT_TRUNCATE: + if self.model_max_length > LARGE_INTEGER: + if verbose: + if not self.deprecation_warnings.get("Asking-to-truncate-to-max_length", False): + logger.warning( + "Asking to truncate to max_length but no maximum length is provided and the model has" + " no predefined maximum length. Default to no truncation." + ) + self.deprecation_warnings["Asking-to-truncate-to-max_length"] = True + truncation_strategy = TruncationStrategy.DO_NOT_TRUNCATE + else: + max_length = self.model_max_length + + # Test if we have a padding token + if padding_strategy != PaddingStrategy.DO_NOT_PAD and (self.pad_token is None or self.pad_token_id < 0): + raise ValueError( + "Asking to pad but the tokenizer does not have a padding token. " + "Please select a token to use as `pad_token` `(tokenizer.pad_token = tokenizer.eos_token e.g.)` " + "or add a new pad token via `tokenizer.add_special_tokens({'pad_token': '[PAD]'})`." + ) + + # Check that we will truncate to a multiple of pad_to_multiple_of if both are provided + if ( + truncation_strategy != TruncationStrategy.DO_NOT_TRUNCATE + and padding_strategy != PaddingStrategy.DO_NOT_PAD + and pad_to_multiple_of is not None + and max_length is not None + and (max_length % pad_to_multiple_of != 0) + ): + raise ValueError( + "Truncation and padding are both activated but " + f"truncation length ({max_length}) is not a multiple of pad_to_multiple_of ({pad_to_multiple_of})." + ) + + return padding_strategy, truncation_strategy, max_length, kwargs + + def _pad( + self, + encoded_inputs: dict[str, EncodedInput] | BatchEncoding, + max_length: int | None = None, + padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_attention_mask: bool | None = None, + ) -> dict: + """ + Pad encoded inputs (on left/right and up to predefined length or max length in the batch) + + Args: + encoded_inputs: + Dictionary of tokenized inputs (`list[int]`) or batch of tokenized inputs (`list[list[int]]`). + max_length: maximum length of the returned list and optionally padding length (see below). + Will truncate by taking into account the special tokens. + padding_strategy: PaddingStrategy to use for padding. + + - PaddingStrategy.LONGEST Pad to the longest sequence in the batch + - PaddingStrategy.MAX_LENGTH: Pad to the max length (default) + - PaddingStrategy.DO_NOT_PAD: Do not pad + The tokenizer padding sides are defined in `padding_side` argument: + + - 'left': pads on the left of the sequences + - 'right': pads on the right of the sequences + pad_to_multiple_of: (optional) Integer if set will pad the sequence to a multiple of the provided value. + This is especially useful to enable the use of Tensor Core on NVIDIA hardware with compute capability + `>= 7.5` (Volta). + padding_side: + The side on which the model should have padding applied. Should be selected between ['right', 'left']. + Default value is picked from the class attribute of the same name. + return_attention_mask: + (optional) Set to False to avoid returning attention mask (default: set to model specifics) + """ + # Load from model defaults + if return_attention_mask is None: + return_attention_mask = "attention_mask" in self.model_input_names + + required_input = encoded_inputs[self.model_input_names[0]] + + if padding_strategy == PaddingStrategy.LONGEST: + max_length = len(required_input) + + if max_length is not None and pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0): + max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of + + needs_to_be_padded = padding_strategy != PaddingStrategy.DO_NOT_PAD and len(required_input) != max_length + + # Initialize attention mask if not present. + if return_attention_mask and "attention_mask" not in encoded_inputs: + encoded_inputs["attention_mask"] = [1] * len(required_input) + + if needs_to_be_padded: + difference = max_length - len(required_input) + padding_side = padding_side if padding_side is not None else self.padding_side + + if padding_side == "right": + if return_attention_mask: + encoded_inputs["attention_mask"] = encoded_inputs["attention_mask"] + [0] * difference + if "special_tokens_mask" in encoded_inputs: + encoded_inputs["special_tokens_mask"] = encoded_inputs["special_tokens_mask"] + [1] * difference + encoded_inputs[self.model_input_names[0]] = required_input + [self.pad_token_id] * difference + elif padding_side == "left": + if return_attention_mask: + encoded_inputs["attention_mask"] = [0] * difference + encoded_inputs["attention_mask"] + if "special_tokens_mask" in encoded_inputs: + encoded_inputs["special_tokens_mask"] = [1] * difference + encoded_inputs["special_tokens_mask"] + encoded_inputs[self.model_input_names[0]] = [self.pad_token_id] * difference + required_input + else: + raise ValueError(f"Invalid padding strategy:{padding_side}") + + return encoded_inputs + + def pad( + self, + encoded_inputs: BatchEncoding + | list[BatchEncoding] + | dict[str, EncodedInput] + | dict[str, list[EncodedInput]] + | list[dict[str, EncodedInput]], + padding: bool | str | PaddingStrategy = True, + max_length: int | None = None, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_attention_mask: bool | None = None, + return_tensors: str | TensorType | None = None, + verbose: bool = True, + ) -> BatchEncoding: + """ + Pad a single encoded input or a batch of encoded inputs up to predefined length or to the max sequence length + in the batch. + + Padding side (left/right) padding token ids are defined at the tokenizer level (with `self.padding_side`, + `self.pad_token_id`). + + + If the `encoded_inputs` passed are dictionary of numpy arrays, PyTorch tensors, the + result will use the same type unless you provide a different tensor type with `return_tensors`. In the case of + PyTorch tensors, you will lose the specific device of your tensors however. + + + + Args: + encoded_inputs ([`BatchEncoding`], list of [`BatchEncoding`], `Dict[str, list[int]]`, `Dict[str, list[list[int]]` or `List[Dict[str, list[int]]]`): + Tokenized inputs. Can represent one input ([`BatchEncoding`] or `Dict[str, list[int]]`) or a batch of + tokenized inputs (list of [`BatchEncoding`], *Dict[str, list[list[int]]]* or *List[Dict[str, + list[int]]]*) so you can use this method during preprocessing as well as in a PyTorch Dataloader + collate function. + + Instead of `list[int]` you can have tensors (numpy arrays, PyTorch tensors), see + the note above for the return type. + padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`): + Select a strategy to pad the returned sequences (according to the model's padding side and padding + index) among: + + - `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single + sequence if provided). + - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum + acceptable input length for the model if that argument is not provided. + - `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different + lengths). + max_length (`int`, *optional*): + Maximum length of the returned list and optionally padding length (see above). + pad_to_multiple_of (`int`, *optional*): + If set will pad the sequence to a multiple of the provided value. + + This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability + `>= 7.5` (Volta). + padding_side (`str`, *optional*): + The side on which the model should have padding applied. Should be selected between ['right', 'left']. + Default value is picked from the class attribute of the same name. + return_attention_mask (`bool`, *optional*): + Whether to return the attention mask. If left to the default, will return the attention mask according + to the specific tokenizer's default, defined by the `return_outputs` attribute. + + [What are attention masks?](../glossary#attention-mask) + return_tensors (`str` or [`~utils.TensorType`], *optional*): + If set, will return tensors instead of list of python integers. Acceptable values are: + + - `'pt'`: Return PyTorch `torch.Tensor` objects. + - `'np'`: Return Numpy `np.ndarray` objects. + verbose (`bool`, *optional*, defaults to `True`): + Whether or not to print more information and warnings. + """ + # If we have a list of dicts, let's convert it in a dict of lists + # We do this to allow using this method as a collate_fn function in PyTorch Dataloader + if isinstance(encoded_inputs, (list, tuple)) and isinstance(encoded_inputs[0], Mapping): + # Call .keys() explicitly for compatibility with TensorDict and other Mapping subclasses + encoded_inputs = {key: [example[key] for example in encoded_inputs] for key in encoded_inputs[0].keys()} + + # The model's main input name, usually `input_ids`, has been passed for padding + if self.model_input_names[0] not in encoded_inputs: + raise ValueError( + "You should supply an encoding or a list of encodings to this method " + f"that includes {self.model_input_names[0]}, but you provided {list(encoded_inputs.keys())}" + ) + + required_input = encoded_inputs[self.model_input_names[0]] + + if required_input is None or (isinstance(required_input, Sized) and len(required_input) == 0): + if return_attention_mask: + encoded_inputs["attention_mask"] = [] + return encoded_inputs + + # If we have PyTorch/NumPy tensors/arrays as inputs, we cast them as python objects + # and rebuild them afterwards if no return_tensors is specified + # Note that we lose the specific device the tensor may be on for PyTorch + + first_element = required_input[0] + if isinstance(first_element, (list, tuple)): + # first_element might be an empty list/tuple in some edge cases so we grab the first non empty element. + for item in required_input: + if len(item) != 0: + first_element = item[0] + break + # At this state, if `first_element` is still a list/tuple, it's an empty one so there is nothing to do. + if not isinstance(first_element, (int, list, tuple)): + if is_torch_tensor(first_element): + return_tensors = "pt" if return_tensors is None else return_tensors + elif isinstance(first_element, np.ndarray): + return_tensors = "np" if return_tensors is None else return_tensors + else: + raise ValueError( + f"type of {first_element} unknown: {type(first_element)}. " + "Should be one of a python, numpy, or pytorch object." + ) + + for key, value in encoded_inputs.items(): + encoded_inputs[key] = to_py_obj(value) + + # Convert padding_strategy in PaddingStrategy + padding_strategy, _, max_length, _ = self._get_padding_truncation_strategies( + padding=padding, max_length=max_length, verbose=verbose + ) + + required_input = encoded_inputs[self.model_input_names[0]] + if required_input and not isinstance(required_input[0], (list, tuple)): + encoded_inputs = self._pad( + encoded_inputs, + max_length=max_length, + padding_strategy=padding_strategy, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_attention_mask=return_attention_mask, + ) + return BatchEncoding(encoded_inputs, tensor_type=return_tensors) + + batch_size = len(required_input) + assert all(len(v) == batch_size for v in encoded_inputs.values()), ( + "Some items in the output dictionary have a different batch size than others." + ) + + if padding_strategy == PaddingStrategy.LONGEST: + max_length = max(len(inputs) for inputs in required_input) + padding_strategy = PaddingStrategy.MAX_LENGTH + + batch_outputs = {} + for i in range(batch_size): + inputs = {k: v[i] for k, v in encoded_inputs.items()} + outputs = self._pad( + inputs, + max_length=max_length, + padding_strategy=padding_strategy, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_attention_mask=return_attention_mask, + ) + + for key, value in outputs.items(): + if key not in batch_outputs: + batch_outputs[key] = [] + batch_outputs[key].append(value) + + return BatchEncoding(batch_outputs, tensor_type=return_tensors) + + def truncate_sequences( + self, + ids: list[int], + pair_ids: None = None, + num_tokens_to_remove: int = 0, + truncation_strategy: str | TruncationStrategy = "longest_first", + stride: int = 0, + **kwargs, + ) -> tuple[list[int], None, list[int]]: + """ + Truncates a sequence pair in-place following the strategy. + + Args: + ids (`list[int]`): + Tokenized input ids. Can be obtained from a string by chaining the `tokenize` and + `convert_tokens_to_ids` methods. + pair_ids (`None`, *optional*): + Not supported by `MistralCommonBackend`. Kept to match the signature of `PreTrainedTokenizerBase.truncate_sequences`. + num_tokens_to_remove (`int`, *optional*, defaults to 0): + Number of tokens to remove using the truncation strategy. + truncation_strategy (`str` or [`~tokenization_utils_base.TruncationStrategy`], *optional*, defaults to `'longest_first'`): + The strategy to follow for truncation. Can be: + + - `'longest_first'`: Truncate to a maximum length specified with the argument `max_length` or to the + maximum acceptable input length for the model if that argument is not provided. + - `'do_not_truncate'` (default): No truncation (i.e., can output batch with sequence lengths greater + than the model maximum admissible input size). + stride (`int`, *optional*, defaults to 0): + If set to a positive number, the overflowing tokens returned will contain some tokens from the main + sequence returned. The value of this argument defines the number of additional tokens. + + Returns: + `Tuple[list[int], None, list[int]]`: The truncated `ids` and the list of + overflowing tokens. `None` is returned to match Transformers signature. + """ + if kwargs: + raise ValueError( + f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.truncate_sequences`." + ) + if pair_ids: + raise ValueError("`pair_ids` is not supported by `MistralCommonBackend.truncate_sequences`.") + + if num_tokens_to_remove <= 0: + return (ids, None, []) + + if not isinstance(truncation_strategy, TruncationStrategy): + truncation_strategy = TruncationStrategy(truncation_strategy) + + if truncation_strategy in [TruncationStrategy.ONLY_FIRST, TruncationStrategy.ONLY_SECOND]: + raise ValueError( + f"Only {TruncationStrategy.LONGEST_FIRST} and {TruncationStrategy.DO_NOT_TRUNCATE} are supported." + ) + + overflowing_tokens = [] + if truncation_strategy == TruncationStrategy.LONGEST_FIRST: + if len(ids) > num_tokens_to_remove: + window_len = min(len(ids), stride + num_tokens_to_remove) + if self.truncation_side == "left": + overflowing_tokens = ids[:window_len] + ids = ids[num_tokens_to_remove:] + elif self.truncation_side == "right": + overflowing_tokens = ids[-window_len:] + ids = ids[:-num_tokens_to_remove] + else: + raise ValueError(f"invalid truncation strategy: {self.truncation_side}, use 'left' or 'right'.") + + else: + error_msg = ( + f"We need to remove {num_tokens_to_remove} to truncate the input " + f"but the first sequence has a length {len(ids)}. " + ) + logger.error(error_msg) + + return (ids, None, overflowing_tokens) + + def apply_chat_template( + self, + conversation: list[dict[str, str]] | list[list[dict[str, str]]], + tools: list[dict | Callable] | None = None, + add_generation_prompt: bool = False, + continue_final_message: bool = False, + tokenize: bool = True, + padding: bool | str | PaddingStrategy = False, + truncation: bool = False, + max_length: int | None = None, + return_tensors: str | TensorType | None = None, + return_dict: bool = True, + **kwargs, + ) -> str | list[int] | list[str] | list[list[int]] | BatchEncoding: + """ + Converts a list of dictionaries with `"role"` and `"content"` keys to a list of token + ids. + + Args: + conversation (Union[List[Dict[str, str]], List[List[Dict[str, str]]]]): A list of dicts + with "role" and "content" keys, representing the chat history so far. + tools (`List[Union[Dict, Callable]]`, *optional*): + A list of tools (callable functions) that will be accessible to the model. If the template does not + support function calling, this argument will have no effect. Each tool should be passed as a JSON Schema, + giving the name, description and argument types for the tool. See our + [chat templating guide](https://huggingface.co/docs/transformers/main/en/chat_templating#automated-function-conversion-for-tool-use) + for more information. + add_generation_prompt (`bool`, *optional*): + This argument is a no-op for `MistralCommonBackend`. However it cannot be used at the same time as `continue_final_message` to keep the API consistent and + if any conversation ends with an assistant message, it will raise an error. In such case, use `continue_final_message` instead. + continue_final_message (bool, *optional*): + If this is set, the chat will be formatted so that the final + message in the chat is open-ended, without any EOS tokens. The model will continue this message + rather than starting a new one. This allows you to "prefill" part of + the model's response for it. Cannot be used at the same time as `add_generation_prompt`. + tokenize (`bool`, defaults to `True`): + Whether to tokenize the output. If `False`, the output will be a string. + padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `False`): + Select a strategy to pad the returned sequences (according to the model's padding side and padding + index) among: + + - `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single + sequence if provided). + - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum + acceptable input length for the model if that argument is not provided. + - `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different + lengths). + truncation (`bool`, defaults to `False`): + Whether to truncate sequences at the maximum length. Has no effect if tokenize is `False`. + max_length (`int`, *optional*): + Maximum length (in tokens) to use for padding or truncation. Has no effect if tokenize is `False`. If + not specified, the tokenizer's `max_length` attribute will be used as a default. + return_tensors (`str` or [`~utils.TensorType`], *optional*): + If set, will return tensors of a particular framework. Has no effect if tokenize is `False`. Acceptable + values are: + - `'pt'`: Return PyTorch `torch.Tensor` objects. + return_dict (`bool`, defaults to `False`): + Whether to return a dictionary with named outputs. Has no effect if tokenize is `False`. + If at least one conversation contains an image, its pixel values will be returned in the `pixel_values` key. + kwargs (additional keyword arguments, *optional*): + Not supported by `MistralCommonBackend.apply_chat_template`. + Will raise an error if used. + + Returns: + `Union[str, list[int], list[str], list[list[int]], BatchEncoding]`: A list of token ids representing the tokenized chat so far, including control + tokens. This output is ready to pass to the model, either directly or via methods like `generate()`. + """ + if kwargs: + raise ValueError( + f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.apply_chat_template`." + ) + if not isinstance(truncation, bool): + raise TypeError("`truncation` must be a boolean for `apply_chat_template` method.") + + if add_generation_prompt and continue_final_message: + raise ValueError("Cannot use both `add_generation_prompt` and `continue_final_message`.") + + if isinstance(conversation, (list, tuple)) and ( + isinstance(conversation[0], (list, tuple)) or hasattr(conversation[0], "messages") + ): + conversations = conversation + is_batched = True + else: + conversations = [conversation] + is_batched = False + + if add_generation_prompt: + for conversation in conversations: + last_message = conversation[-1] + if last_message.get("role") == "assistant": + raise ValueError( + "The last message in the conversation is already an assistant message. Consider using `continue_final_message` instead." + ) + + def _maybe_adapt_message(message: dict[str, Any]) -> None: + """Adapt message to `mistral-common` format and leave validation to `mistral-common`.""" + if not isinstance(message, dict): + return message + maybe_list_content: str | list[dict[str, str | dict[str, Any]]] | None = message.get("content") + if not maybe_list_content or isinstance(maybe_list_content, str): + return message + + normalized_content: list[dict[str, str | dict[str, Any]]] = [] + message = message.copy() + for content in maybe_list_content: + content_type = content.get("type", None) + if not content_type: + continue + elif content_type == "image": + maybe_url: str | None = content.get("url") + maybe_path: str | None = content.get("path") + maybe_base64: str | None = content.get("base64") + if maybe_url: + image_content = maybe_url + elif maybe_path: + if not maybe_path.startswith("file://"): + maybe_path = Path(maybe_path).resolve().as_uri() + image_content = maybe_path + elif maybe_base64: + if not maybe_base64.startswith("data:image"): + maybe_base64 = "data:image/unk;base64," + maybe_base64 + image_content = maybe_base64 + else: + raise ValueError("Image content must be specified.") + normalized_content.append({"type": "image_url", "image_url": {"url": image_content}}) + elif content_type == "audio": + maybe_url: str | None = content.get("url") + maybe_path: str | None = content.get("path") + maybe_base64: str | None = content.get("base64") + if maybe_url or maybe_path: + audio_data = load_audio_as(maybe_url or maybe_path, return_format="dict", force_mono=True) + normalized_content.append({"type": "input_audio", "input_audio": audio_data}) + continue + if not maybe_base64: + raise ValueError("Audio content must be specified.") + normalized_content.append({"type": "audio_url", "audio_url": {"url": maybe_base64}}) + else: + normalized_content.append(content) + message["content"] = normalized_content + return message + + outputs = [] + images: list[np.ndarray] = [] + audios: list[np.ndarray] = [] + + for conversation in conversations: + messages: list[dict[str, str | list[dict[str, str | dict[str, Any]]]]] = [] + for message in conversation: + message = _maybe_adapt_message(message) + messages.append(message) + + chat_request = ChatCompletionRequest.from_openai( + messages=messages, + tools=tools, + continue_final_message=continue_final_message, + ) + + tokenized_request = self.tokenizer.encode_chat_completion(chat_request) + if tokenize: + outputs.append(tokenized_request.tokens) + else: + outputs.append(tokenized_request.text) + images.extend(tokenized_request.images) + audios.extend([el.audio_array for el in tokenized_request.audios]) + + if not is_batched: + outputs = outputs[0] + + if tokenize: + out = self( + outputs, + padding=padding, + truncation=truncation, + max_length=max_length, + add_special_tokens=False, + return_tensors=return_tensors, + ) + if return_dict: + if images: + pixel_values: list[np.ndarray] | np.ndarray | torch.Tensor + if return_tensors == "pt": + if not is_torch_available(): + raise ImportError( + "Unable to convert output to PyTorch tensors format, PyTorch is not installed." + ) + + pixel_values = torch.from_numpy(np.stack(images)) + elif return_tensors == "np": + pixel_values = np.array(images) + elif return_tensors is None: + pixel_values = images + else: + raise ValueError(f"Unsupported return_tensors type: {return_tensors}") + out.data["pixel_values"] = pixel_values + if audios: + if return_tensors is not None: + raise NotImplementedError( + "When passing audio content in apply_chat_template, `return_tensors` must be None since we cannot batch the audio inputs. The returned audio will be a list of numpy arrays." + ) + # Transformers convention is audio for plural audio (audio does not take a "s") + out.data["audio"] = audios + return out + else: + return out["input_ids"] + + else: + logger.warning( + "`MistralCommonBackend.apply_chat_template(..., tokenize=False)` is unsafe and may lead to unexpected behavior." + " Please consider using `tokenize=True` instead and don't encode the output manually." + ) + return outputs + + @add_end_docstrings(ENCODE_KWARGS_DOCSTRING, ENCODE_PLUS_ADDITIONAL_KWARGS_DOCSTRING) + def __call__( + self, + text: TextInput | EncodedInput | list[TextInput] | list[EncodedInput] | None = None, + text_pair: None = None, + text_target: None = None, + text_pair_target: None = None, + add_special_tokens: bool = True, + padding: bool | str | PaddingStrategy = False, + truncation: bool | str | TruncationStrategy | None = None, + max_length: int | None = None, + stride: int = 0, + pad_to_multiple_of: int | None = None, + padding_side: str | None = None, + return_tensors: str | TensorType | None = None, + return_attention_mask: bool | None = None, + return_overflowing_tokens: bool = False, + return_special_tokens_mask: bool = False, + return_length: bool = False, + verbose: bool = True, + **kwargs, + ) -> BatchEncoding: + """ + Main method to tokenize and prepare for the model one or several sequence(s) or one or several pair(s) of + sequences. + + Args: + text (`str`, `list[str]`, `list[list[str]]`, *optional*): + The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of int + (encoded strings). + text_pair (`None`, *optional*): + Not supported by `MistralCommonBackend`. Kept to match the signature of `PreTrainedTokenizerBase.__call__`. + text_target (`None`, *optional*): + Not supported by `MistralCommonBackend`. Kept to match the signature of `PreTrainedTokenizerBase.__call__`. + text_pair_target (`None`, *optional*): + Not supported by `MistralCommonBackend`. Kept to match the signature of `PreTrainedTokenizerBase.__call__`. + """ + if kwargs: + raise ValueError(f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.__call__`.") + + if text_pair or text_target or text_pair_target: + raise ValueError( + "`text_pair`, `text_target` and `text_pair_target` are not supported by `MistralCommonBackend`." + ) + + def _is_valid_text_input(t): + if isinstance(t, str): + # Strings are fine + return True + elif isinstance(t, (list, tuple)): + # List are fine as long as they are... + if len(t) == 0: + # ... empty + return True + elif isinstance(t[0], (str, int)): + # ... list of strings or int + return True + elif isinstance(t[0], (list, tuple)): + # ... list with an empty list or with a list of strings or with a list of ints + return len(t[0]) == 0 or isinstance(t[0][0], (str, int)) + else: + return False + else: + return False + + if not _is_valid_text_input(text): + raise ValueError( + "text input must be of type `str` (single example), `list[str]` (batch or single encoded example) " + "or `list[list[int]]` (batch of encoded examples)." + ) + + is_batched = isinstance(text, (list, tuple)) and isinstance(text[0], (str, list, tuple)) + + padding_strategy, truncation_strategy, max_length, kwargs = self._get_padding_truncation_strategies( + padding=padding, + truncation=truncation, + max_length=max_length, + pad_to_multiple_of=pad_to_multiple_of, + verbose=verbose, + **kwargs, + ) + + if is_batched: + return self._batch_encode_plus( + batch_text=text, + add_special_tokens=add_special_tokens, + padding_strategy=padding_strategy, + truncation_strategy=truncation_strategy, + max_length=max_length, + stride=stride, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_tensors=return_tensors, + return_attention_mask=return_attention_mask, + return_overflowing_tokens=return_overflowing_tokens, + return_special_tokens_mask=return_special_tokens_mask, + return_length=return_length, + verbose=verbose, + ) + else: + return self._encode_plus( + text=text, + add_special_tokens=add_special_tokens, + padding_strategy=padding_strategy, + truncation_strategy=truncation_strategy, + max_length=max_length, + stride=stride, + pad_to_multiple_of=pad_to_multiple_of, + padding_side=padding_side, + return_tensors=return_tensors, + return_attention_mask=return_attention_mask, + return_overflowing_tokens=return_overflowing_tokens, + return_special_tokens_mask=return_special_tokens_mask, + return_length=return_length, + verbose=verbose, + ) + + @classmethod + def from_pretrained( + cls, + pretrained_model_name_or_path: str | os.PathLike, + *init_inputs, + mode: Union[str, ValidationMode] = ValidationMode.test, + cache_dir: str | os.PathLike | None = None, + force_download: bool = False, + local_files_only: bool = False, + token: str | bool | None = None, + revision: str = "main", + model_max_length: int = VERY_LARGE_INTEGER, + padding_side: str = "left", + truncation_side: str = "right", + model_input_names: list[str] | None = None, + clean_up_tokenization_spaces: bool = False, + **kwargs, + ): + r""" + Instantiate a `MistralCommonBackend` from a predefined + tokenizer. + + Args: + pretrained_model_name_or_path (`str` or `os.PathLike`): + Can be either: + + - A string, the *model id* of a predefined tokenizer hosted inside a model repo on huggingface.co. + - A path to a *directory* containing the tokenizer config, for instance saved + using the [`MistralCommonBackend.tokenization_mistral_common.save_pretrained`] method, e.g., + `./my_model_directory/`. + mode (`Union[str, ValidationMode]`, *optional*, defaults to `ValidationMode.test`): + Validation mode for the `MistralTokenizer` tokenizer. Possible values are: + - `"finetuning"` or `ValidationMode.finetuning`: The finetuning mode. + - `"test"` or `ValidationMode.test`: The test mode. + It changes how the tokenizer validates the input and prepare the request to the model. + cache_dir (`str` or `os.PathLike`, *optional*): + Path to a directory in which a downloaded predefined tokenizer vocabulary files should be cached if the + standard cache should not be used. + force_download (`bool`, *optional*, defaults to `False`): + Whether or not to force the (re-)download the vocabulary files and override the cached versions if they + exist. + token (`str` or *bool*, *optional*): + The token to use as HTTP bearer authorization for remote files. If `True`, will use the token generated + when running `hf auth login` (stored in `~/.huggingface`). + local_files_only (`bool`, *optional*, defaults to `False`): + Whether or not to only rely on local files and not to attempt to download any files. + revision (`str`, *optional*, defaults to `"main"`): + The specific model version to use. It can be a branch name, a tag name, or a commit id, since we use a + git-based system for storing models and other artifacts on huggingface.co, so `revision` can be any + identifier allowed by git. + max_length (`int`, *optional*): + Controls the maximum length to use by one of the truncation/padding parameters. + + If left unset or set to `None`, this will use the predefined model maximum length if a maximum length + is required by one of the truncation/padding parameters. If the model has no specific maximum input + length (like XLNet) truncation/padding to a maximum length will be deactivated. + padding_side (`str`, *optional*, defaults to `"left"`): + The side on which the model should have padding applied. Should be selected between ['right', 'left']. + Default value is picked from the class attribute of the same name. + truncation_side (`str`, *optional*, defaults to `"right"`): + The side on which the model should have truncation applied. Should be selected between ['right', 'left']. + model_input_names (`List[string]`, *optional*): + The list of inputs accepted by the forward pass of the model (like `"token_type_ids"` or + `"attention_mask"`). Default value is picked from the class attribute of the same name. + clean_up_tokenization_spaces (`bool`, *optional*, defaults to `False`): + Whether or not the model should cleanup the spaces that were added when splitting the input text during the + tokenization process. + kwargs (additional keyword arguments, *optional*): + Not supported by `MistralCommonBackend.from_pretrained`. + Will raise an error if used. + """ + if init_inputs: + raise ValueError("`init_inputs` are not supported by `MistralCommonBackend.from_pretrained`.") + + # Handle kwargs and AutoTokenizer/AutoProcessor case + if kwargs and not set(kwargs.keys()).issubset( + {"trust_remote_code", "_from_pipeline", "_commit_hash", "dtype", "_from_auto"} + ): + raise ValueError(f"Some kwargs in {kwargs} are not supported by `MistralCommonBackend.from_pretrained`.") + + mode = cls._get_validation_mode(mode) + + if not os.path.isdir(pretrained_model_name_or_path): + tokenizer_path = download_tokenizer_from_hf_hub( + repo_id=pretrained_model_name_or_path, + cache_dir=cache_dir, + token=token, + revision=revision, + force_download=force_download, + local_files_only=local_files_only, + ) + else: + valid_tokenizer_files = [] + tokenizer_file: str + + instruct_versions = list(TokenizerVersion.__members__) + mm_versions = list(MultiModalVersion.__members__) + [""] # allow no mm version + sentencepiece_suffixes = [f".model.{v}{m}" for v in instruct_versions for m in mm_versions] + [".model"] + + for path in os.listdir(pretrained_model_name_or_path): + pathlib_repo_file = Path(path) + file_name = pathlib_repo_file.name + suffix = "".join(pathlib_repo_file.suffixes) + if file_name == "tekken.json" or suffix in sentencepiece_suffixes: + valid_tokenizer_files.append(file_name) + + if len(valid_tokenizer_files) == 0: + raise ValueError(f"No tokenizer file found in directory: {pretrained_model_name_or_path}") + # If there are multiple tokenizer files, we use tekken.json if it exists, otherwise the versioned one. + if len(valid_tokenizer_files) > 1: + if "tekken.json" in valid_tokenizer_files: + tokenizer_file = "tekken.json" + else: + tokenizer_file = max(valid_tokenizer_files) + logger.warning( + f"Multiple tokenizer files found in directory: {pretrained_model_name_or_path}. Using {tokenizer_file}." + ) + else: + tokenizer_file = valid_tokenizer_files[0] + + tokenizer_path = os.path.join(pretrained_model_name_or_path, tokenizer_file) + + return cls( + tokenizer_path=tokenizer_path, + mode=mode, + model_max_length=model_max_length, + padding_side=padding_side, + truncation_side=truncation_side, + model_input_names=model_input_names, + clean_up_tokenization_spaces=clean_up_tokenization_spaces, + ) + + def save_pretrained( + self, + save_directory: str | os.PathLike | Path, + push_to_hub: bool = False, + token: str | bool | None = None, + commit_message: str | None = None, + repo_id: str | None = None, + private: bool | None = None, + **kwargs, + ) -> tuple[str, ...]: + """ + Save the full tokenizer state. + + + This method make sure the full tokenizer can then be re-loaded using the + [`~MistralCommonBackend.tokenization_mistral_common.from_pretrained`] class method. + + Args: + save_directory (`str` or `os.PathLike`): The path to a directory where the tokenizer will be saved. + push_to_hub (`bool`, *optional*, defaults to `False`): + Whether or not to push your model to the Hugging Face model hub after saving it. You can specify the + repository you want to push to with `repo_id` (will default to the name of `save_directory` in your + namespace). + token (`str` or *bool*, *optional*, defaults to `None`): + The token to use to push to the model hub. If `True`, will use the token in the `HF_TOKEN` environment + variable. + commit_message (`str`, *optional*): The commit message to use when pushing to the hub. + repo_id (`str`, *optional*): The name of the repository to which push to the Hub. + private (`bool`, *optional*): Whether the model repository is private or not. + kwargs (`Dict[str, Any]`, *optional*): + Not supported by `MistralCommonBackend.save_pretrained`. + Will raise an error if used. + + Returns: + A tuple of `str`: The files saved. + """ + if kwargs: + raise ValueError( + f"Kwargs {list(kwargs.keys())} are not supported by `MistralCommonBackend.save_pretrained`." + ) + + save_directory = Path(save_directory) + save_directory.mkdir(parents=True, exist_ok=True) + + shutil.copy(self._tokenizer_path, save_directory) + + if push_to_hub: + repo_id = repo_id or str(save_directory).split(os.path.sep)[-1] + repo_id = create_repo(repo_id, token=token, private=private, exist_ok=True).repo_id + files_timestamps = self._get_files_timestamps(save_directory) + + self._upload_modified_files( + save_directory, + repo_id, + files_timestamps, + commit_message=commit_message, + token=token, + ) + + return (str(save_directory / self._tokenizer_path.name),) + + @staticmethod + def _get_validation_mode(mode: Union[str, ValidationMode]) -> ValidationMode: + """Get the validation mode from a string or a ValidationMode.""" + _invalid_mode_msg = ( + f"Invalid `mistral-common` tokenizer mode: {mode}. Possible values are 'finetuning' or 'test'." + ) + if isinstance(mode, str): + try: + mode = ValidationMode[mode] + except KeyError: + raise ValueError(_invalid_mode_msg) + elif not isinstance(mode, (str, ValidationMode)): + raise ValueError(_invalid_mode_msg) + + if mode not in [ValidationMode.finetuning, ValidationMode.test]: + raise ValueError(_invalid_mode_msg) + return mode + + +# Backward compatibility alias for codebases still importing the legacy name. +MistralCommonTokenizer = MistralCommonBackend diff --git a/nemo_automodel/components/datasets/llm/formatting_utils.py b/nemo_automodel/components/datasets/llm/formatting_utils.py index 665462ac9..dfbfe628c 100644 --- a/nemo_automodel/components/datasets/llm/formatting_utils.py +++ b/nemo_automodel/components/datasets/llm/formatting_utils.py @@ -146,7 +146,7 @@ def format_prompt_completion( if answer_only_loss_mask: # don't add eos token here. NOTE: this is only for calculating the length of the prompt. # we are not modifying the prompt to be returned here. - prompt_ids = [tokenizer.bos_token_id] if tokenizer.add_bos_token else [] + prompt_ids = [tokenizer.bos_token_id] if getattr(tokenizer, "add_bos_token", False) else [] prompt_ids += tokenizer(prompt, add_special_tokens=False)["input_ids"] len_prompt_ids = len(prompt_ids) else: diff --git a/nemo_automodel/components/models/mistral3/model.py b/nemo_automodel/components/models/mistral3/model.py index c50bdb5c7..3e63f3d15 100644 --- a/nemo_automodel/components/models/mistral3/model.py +++ b/nemo_automodel/components/models/mistral3/model.py @@ -12,14 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys -import types from dataclasses import dataclass from typing import Optional, Union import torch from torch import nn -from transformers import AutoConfig, AutoModel, AutoModelForCausalLM +from transformers import AutoConfig, AutoModel, AutoModelForCausalLM, AutoModelForImageTextToText from transformers.activations import ACT2FN from transformers.cache_utils import Cache, DynamicCache @@ -554,71 +552,61 @@ def forward( # ----------------------------------------------------------------------------- -# Register modules into transformers namespace for AutoModel +# Register Ministral3 with transformers Auto classes # ----------------------------------------------------------------------------- -mod_pkg = sys.modules.setdefault("transformers.models.ministral3", types.ModuleType("transformers.models.ministral3")) -config_mod = types.ModuleType("transformers.models.ministral3.configuration_ministral3") -modeling_mod = types.ModuleType("transformers.models.ministral3.modeling_ministral3") - -config_mod.Ministral3Config = Ministral3Config -config_mod.__all__ = ["Ministral3Config"] -modeling_mod.Ministral3Config = Ministral3Config -modeling_mod.Ministral3PreTrainedModel = Ministral3PreTrainedModel -modeling_mod.Ministral3Model = Ministral3Model -modeling_mod.Ministral3ForCausalLM = Ministral3ForCausalLM -modeling_mod.__all__ = [ - "Ministral3Config", - "Ministral3PreTrainedModel", - "Ministral3Model", - "Ministral3ForCausalLM", -] - -sys.modules["transformers.models.ministral3.configuration_ministral3"] = config_mod -sys.modules["transformers.models.ministral3.modeling_ministral3"] = modeling_mod -setattr(mod_pkg, "configuration_ministral3", config_mod) -setattr(mod_pkg, "modeling_ministral3", modeling_mod) - -# Monkeypatch AutoModel.from_config so text_config with model_type=ministral3 resolves here -_orig_auto_from_config = AutoModel.from_config - - -def _patched_from_config(cls, config, *model_args, **kwargs): - if getattr(config, "model_type", None) == "ministral3": - return Ministral3Model(config) - return _orig_auto_from_config.__func__(cls, config, *model_args, **kwargs) - - -AutoModel.from_config = classmethod(_patched_from_config) - - -# Register config with AutoConfig so CONFIG_MAPPING recognizes it -try: - AutoConfig.register("ministral3", Ministral3Config) -except Exception: - pass -# Also ensure CONFIG_MAPPING has the entry (older HF versions) -try: - CONFIG_MAPPING.register("ministral3", Ministral3Config) -except Exception: - pass -# Register model mappings so HF Auto classes can resolve Ministral3 and HF mistral3 -try: - AutoModel.register(Ministral3Config, Ministral3Model) -except Exception: - pass -try: - AutoModelForCausalLM.register(Ministral3Config, Ministral3ForCausalLM) -except Exception: - pass -# Register for HF mistral3 config to avoid AutoModelForCausalLM errors -try: - AutoModel.register(HFMistral3Config, Mistral3ForConditionalGeneration) -except Exception: - pass -try: - AutoModelForCausalLM.register(HFMistral3Config, Mistral3ForConditionalGeneration) -except Exception: - pass - +def _register_ministral3_with_transformers(): + """ + Register Ministral3Config and models with transformers Auto classes. -ModelClass = [Mistral3ForConditionalGeneration, Ministral3ForCausalLM] + This uses the official transformers registration API. Registration is idempotent + (re-registering the same config/model is a no-op in recent transformers versions). + """ + import logging + + _logger = logging.getLogger(__name__) + + # Register config with AutoConfig + if "ministral3" not in CONFIG_MAPPING: + try: + AutoConfig.register("ministral3", Ministral3Config) + except ValueError as e: + # Already registered (can happen on reimport) + _logger.debug(f"Ministral3Config registration skipped: {e}") + + # Register models with Auto classes + # Note: AutoModel.register / AutoModelForCausalLM.register will raise if + # the config is already mapped to a different model class, but that's the + # desired behavior (fail loudly rather than silently override). + try: + AutoModel.register(Ministral3Config, Ministral3Model) + except ValueError as e: + _logger.debug(f"Ministral3Model registration skipped: {e}") + + try: + AutoModelForCausalLM.register(Ministral3Config, Ministral3ForCausalLM) + except ValueError as e: + _logger.debug(f"Ministral3ForCausalLM registration skipped: {e}") + + # Register HuggingFace's Mistral3ForConditionalGeneration with Auto classes. + # HF's _from_config uses torch.set_default_dtype() context manager, so dtype + # should be handled correctly when torch_dtype is passed to from_config(). + try: + AutoModelForCausalLM.register(HFMistral3Config, Mistral3ForConditionalGeneration) + except ValueError as e: + _logger.debug(f"Mistral3ForConditionalGeneration (CausalLM) registration skipped: {e}") + + try: + AutoModelForImageTextToText.register(HFMistral3Config, Mistral3ForConditionalGeneration) + except ValueError as e: + _logger.debug(f"Mistral3ForConditionalGeneration (ImageTextToText) registration skipped: {e}") + + +# Perform registration at module import time +_register_ministral3_with_transformers() + +# Export ModelClass for discovery by nemo_automodel's ModelRegistry +# Note: We intentionally do NOT export Mistral3ForConditionalGeneration here. +# If exported, ModelRegistry would bypass HF's _from_config which handles torch_dtype +# via torch.set_default_dtype() context. By not exporting it, the model goes through +# HF's Auto class path which properly respects torch_dtype. +ModelClass = Ministral3ForCausalLM diff --git a/nemo_automodel/recipes/base_recipe.py b/nemo_automodel/recipes/base_recipe.py index 846281aeb..b5d73a77e 100644 --- a/nemo_automodel/recipes/base_recipe.py +++ b/nemo_automodel/recipes/base_recipe.py @@ -31,6 +31,7 @@ from transformers.tokenization_utils import PreTrainedTokenizerBase from nemo_automodel._transformers.auto_tokenizer import NeMoAutoTokenizer +from nemo_automodel._transformers.tokenization.nemo_auto_tokenizer import AutoTokenizerWithBosEosEnforced from nemo_automodel.components.checkpoint.checkpointing import save_config from nemo_automodel.components.config.loader import ConfigNode from nemo_automodel.components.optim.scheduler import OptimizerParamScheduler @@ -76,7 +77,17 @@ def is_tokenizer(object): Returns: bool: returns True if object is a tokenizer or VLM processor. """ - return isinstance(object, (PreTrainedTokenizerBase, ProcessorMixin, NeMoAutoTokenizer)) + # Note: some NeMo flows wrap HF tokenizers (e.g., BOS/EOS enforcement wrapper). Those + # wrappers still implement `save_pretrained()` via delegation and should be checkpointed. + return isinstance( + object, + ( + PreTrainedTokenizerBase, + ProcessorMixin, + NeMoAutoTokenizer, + AutoTokenizerWithBosEosEnforced, + ), + ) def is_lr_scheduler(object): diff --git a/pyproject.toml b/pyproject.toml index 482ec0f29..a8e5a0f2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -172,6 +172,10 @@ omit = [ "nemo_automodel/components/_peft/lora_kernel.py", "nemo_automodel/components/loss/triton/te_cross_entropy.py", "nemo_automodel/components/moe/megatron/*.py", + "config.py", + "config-3.py", + # transformer v5 backports + "nemo_automodel/_transformers/tokenization/tokenization_mistral_common.py", ] [tool.ruff] diff --git a/tests/unit_tests/_transformers/test_auto_tokenizer.py b/tests/unit_tests/_transformers/test_auto_tokenizer.py index 7a6aaf311..813c08c9b 100644 --- a/tests/unit_tests/_transformers/test_auto_tokenizer.py +++ b/tests/unit_tests/_transformers/test_auto_tokenizer.py @@ -17,7 +17,8 @@ from transformers.tokenization_utils_base import BatchEncoding -from nemo_automodel._transformers.auto_tokenizer import NeMoAutoTokenizer, _add_token +from nemo_automodel._transformers.auto_tokenizer import NeMoAutoTokenizer +from nemo_automodel._transformers.tokenization.nemo_auto_tokenizer import _add_token class _StubHFTokenizer: diff --git a/tests/unit_tests/models/mistral3/test_mistral3_model.py b/tests/unit_tests/models/mistral3/test_mistral3_model.py index e3dfca025..a81d77a60 100644 --- a/tests/unit_tests/models/mistral3/test_mistral3_model.py +++ b/tests/unit_tests/models/mistral3/test_mistral3_model.py @@ -98,10 +98,3 @@ def test_forward_emits_logits(self): assert outputs.logits.shape == (batch, seq_len, cfg.vocab_size) mock_forward.assert_called_once() - -class TestModelClassExport: - def test_model_class_points_to_models(self): - assert hasattr(mistral_mod, "ModelClass") - assert mistral_mod.Ministral3ForCausalLM in mistral_mod.ModelClass - assert Mistral3ForConditionalGeneration in mistral_mod.ModelClass -