-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathmodel_builder.py
More file actions
2812 lines (2497 loc) · 117 KB
/
model_builder.py
File metadata and controls
2812 lines (2497 loc) · 117 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import builtins
import importlib
import inspect
import os
import re
import shutil
import subprocess
import sys
import tarfile
import time
import uuid
from string import Template
from typing import Any, Dict, Literal, Optional
from unittest.mock import MagicMock
import yaml
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2
from google.protobuf import json_format
from clarifai.client import Model, Nodepool
from clarifai.client.base import BaseClient
from clarifai.client.user import User
from clarifai.errors import UserError
from clarifai.runners.models.agentic_class import AgenticModelClass
from clarifai.runners.models.model_class import ModelClass
from clarifai.runners.utils import code_script
from clarifai.runners.utils.const import (
AMD_PYTHON_BASE_IMAGE,
AMD_TORCH_BASE_IMAGE,
AMD_VLLM_BASE_IMAGE,
AVAILABLE_PYTHON_IMAGES,
AVAILABLE_TORCH_IMAGES,
CONCEPTS_REQUIRED_MODEL_TYPE,
DEFAULT_AMD_GPU_VERSION,
DEFAULT_AMD_TORCH_VERSION,
DEFAULT_DOWNLOAD_CHECKPOINT_WHEN,
DEFAULT_PYTHON_VERSION,
DEFAULT_RUNTIME_DOWNLOAD_PATH,
PYTHON_BASE_IMAGE,
TORCH_BASE_IMAGE,
)
from clarifai.runners.utils.loader import HuggingFaceLoader
from clarifai.runners.utils.method_signatures import signatures_to_yaml
from clarifai.urls.helper import ClarifaiUrlHelper
from clarifai.utils.logging import logger
from clarifai.versions import get_latest_version_from_pypi
CLARIFAI_LATEST_VERSION = get_latest_version_from_pypi()
CLARIFAI_PROTOCOL_LATEST_VERSION = get_latest_version_from_pypi('clarifai-protocol')
# Additional package installation if the model will be used w/ a streaming video runner:
# Dockerfile: Install ffmpeg and clarifai-protocol[auto-annotation].
#
# Our base images are distroless, so we do not have apt-get or other package managers
# available; however, we will also not be able to use those package repositories on-prem.
# As a result, we build our own static ffmpeg image to serve as the source of these deps.
# See: https://github.com/Clarifai/models-images/tree/main/static_streaming
#
# TODO: before we make this public, we need to figure out how to distribute the src;
# line to copy in src commented out because it's 500MB
STREAMING_VIDEO_ADDITIONAL_PACKAGE_INSTALLATION = f"""
COPY --from=public.ecr.aws/clarifai-models/static-streaming:5.1.8 /ffmpeg /usr/local/bin/
COPY --from=public.ecr.aws/clarifai-models/static-streaming:5.1.8 /ffprobe /usr/local/bin/
# COPY --from=public.ecr.aws/clarifai-models/static-streaming:5.1.8 /src /usr/local/src/
RUN uv pip install 'clarifai-protocol[auto-annotation]=={CLARIFAI_PROTOCOL_LATEST_VERSION}'
"""
def is_related(object_class, main_class):
# Check if the object_class is a subclass of main_class
if issubclass(object_class, main_class):
return True
# Check if the object_class is a subclass of any of the parent classes of main_class
parent_classes = object_class.__bases__
for parent in parent_classes:
if main_class in parent.__bases__:
return True
return False
def get_user_input(prompt, required=True, default=None):
"""Get user input with optional default value."""
if default:
prompt = f"{prompt} [{default}]: "
else:
prompt = f"{prompt}: "
while True:
value = input(prompt).strip()
if not value and default:
return default
if not value and required:
print("❌ This field is required. Please enter a value.")
continue
return value
def get_yes_no_input(prompt, default=None):
"""Get yes/no input from user."""
if default is not None:
prompt = f"{prompt} [{'Y/n' if default else 'y/N'}]: "
else:
prompt = f"{prompt} [y/n]: "
while True:
response = input(prompt).strip().lower()
if not response and default is not None:
return default
if response in ['y', 'yes']:
return True
if response in ['n', 'no']:
return False
print("❌ Please enter 'y' or 'n'.")
def select_compute_option(user_id: str, pat: Optional[str] = None, base_url: Optional[str] = None):
"""
Dynamically list compute-clusters and node-pools that belong to `user_id`
and return a dict with nodepool_id, compute_cluster_id, cluster_user_id.
"""
user = User(
user_id=user_id, pat=pat, base_url=base_url
) # PAT / BASE URL are picked from env-vars
clusters = list(user.list_compute_clusters())
if not clusters:
print("❌ No compute clusters found for this user.")
return None
print("\n🖥️ Available Compute Clusters:")
for idx, cc in enumerate(clusters, 1):
desc = getattr(cc, "description", "") or "No description"
print(f"{idx}. {cc.id} – {desc}")
while True:
try:
sel = int(input("Select compute cluster (number): ")) - 1
if 0 <= sel < len(clusters):
cluster = clusters[sel]
break
print("❌ Invalid selection.")
except ValueError:
print("❌ Please enter a number.")
nodepools = list(cluster.list_nodepools())
if not nodepools:
print("❌ No nodepools in selected cluster.")
return None
print("\n📦 Available Nodepools:")
for idx, np in enumerate(nodepools, 1):
desc = getattr(np, "description", "") or "No description"
print(f"{idx}. {np.id} – {desc}")
while True:
try:
sel = int(input("Select nodepool (number): ")) - 1
if 0 <= sel < len(nodepools):
nodepool = nodepools[sel]
break
print("❌ Invalid selection.")
except ValueError:
print("❌ Please enter a number.")
return {
"nodepool_id": nodepool.id,
"compute_cluster_id": cluster.id,
"cluster_user_id": getattr(cluster, "user_id", user_id),
}
class ModelBuilder:
DEFAULT_CHECKPOINT_SIZE = 50 * 1024**3 # 50 GiB
def __init__(
self,
folder: str,
validate_api_ids: bool = True,
download_validation_only: bool = False,
app_not_found_action: Literal["auto_create", "prompt", "error"] = "error",
platform: Optional[str] = None,
pat: Optional[str] = None,
base_url: Optional[str] = None,
user_id: Optional[str] = None,
app_id: Optional[str] = None,
compute_info_required: bool = False,
):
"""
:param folder: The folder containing the model.py, config.yaml, requirements.txt and
checkpoints.
:param validate_api_ids: Whether to validate the user_id and app_id in the config file. TODO(zeiler):
deprecate in favor of download_validation_only.
:param download_validation_only: Whether to skip the API config validation. Set to True if
just downloading a checkpoint.
:param app_not_found_action: Defines how to handle the case when the app is not found.
Options: 'auto_create' - create automatically, 'prompt' - ask user, 'error' - raise exception.
:param platform: Target platform(s) for Docker image build (e.g., "linux/amd64" or "linux/amd64,linux/arm64"). This overrides the platform specified in config.yaml.
:param pat: Personal access token for authentication. If None, will use environment variables.
:param base_url: Base URL for the API. If None, will use environment variables.
:param user_id: Optional user ID to inject into config if missing (for simplified configs).
:param app_id: Optional app ID to inject into config if missing (for simplified configs).
:param compute_info_required: Whether inference compute info is required. This affects certain validation and behavior.
"""
assert app_not_found_action in ["auto_create", "prompt", "error"], ValueError(
f"Expected one of {['auto_create', 'prompt', 'error']}, got {app_not_found_action=}"
)
self.app_not_found_action = app_not_found_action
self._client = None
self._pat = pat
self._base_url = base_url
self._cli_platform = platform
if not validate_api_ids: # for backwards compatibility
download_validation_only = True
self.download_validation_only = download_validation_only
self.folder = self._validate_folder(folder)
self.config = self._load_config(os.path.join(self.folder, 'config.yaml'))
# Auto-resolve user_id if not provided and not in config
config_had_user_id = 'user_id' in self.config.get('model', {})
if not user_id and not config_had_user_id:
from clarifai.utils.config import resolve_user_id
user_id = resolve_user_id(pat=pat, base_url=base_url)
if user_id:
logger.info(f"Using user_id '{user_id}' (auto-resolved from CLI config/PAT).")
self.config = self.normalize_config(self.config, user_id=user_id, app_id=app_id)
self._validate_config()
self._validate_config_secrets()
self._validate_stream_options()
self.model_proto = self._get_model_proto()
self.model_id = self.model_proto.id
self.model_version_id = None
self.inference_compute_info = self._get_inference_compute_info(
compute_info_required=compute_info_required
)
self.is_v3 = True # Do model build for v3
def create_model_instance(self, load_model=True, mocking=False) -> ModelClass:
"""
Create an instance of the model class, as specified in the config file.
"""
model_class = self.load_model_class(mocking=mocking)
# initialize the model
model = model_class()
if load_model:
model.load_model()
return model
def get_model_proto(self) -> resources_pb2.Model:
"""
Retrieve the model and model version proto using self.model_id and self.model_version_id.
Args:
None
Returns:
resources_pb2.Model: The retrieved model proto.
Raises:
UserError: If the model or model version cannot be retrieved.
"""
request = service_pb2.GetModelRequest(
user_app_id=self.client.user_app_id,
model_id=self.model_id,
)
# Add secrets to additional_fields to get request-type secrets
request.additional_fields.append("secrets")
if self.model_version_id is not None:
request.version_id = self.model_version_id
resp: service_pb2.SingleModelResponse = self.client.STUB.GetModel(request)
if resp.status.code != status_code_pb2.SUCCESS:
if self.model_version_id is None:
raise UserError(f"Failed to get model '{self.model_id}': {resp.status.details}")
else:
raise UserError(
f"Failed to get model '{self.model_id}'"
f" version '{self.model_version_id}': {resp.status.details}"
)
return resp.model
def load_model_class(self, mocking=False):
"""
Import the model class from the model.py file, dynamically handling missing dependencies
"""
# look for default model.py file location
for loc in ["model.py", "1/model.py"]:
model_file = os.path.join(self.folder, loc)
if os.path.exists(model_file):
break
if not os.path.exists(model_file):
raise Exception("Model file not found.")
module_name = os.path.basename(model_file).replace(".py", "")
spec = importlib.util.spec_from_file_location(module_name, model_file)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
original_import = builtins.__import__
# Prevent __pycache__ folder generation during module execution
original_dont_write_bytecode = sys.dont_write_bytecode
def custom_import(name, globals=None, locals=None, fromlist=(), level=0):
# Allow standard libraries and clarifai
if self._is_standard_or_clarifai(name):
return original_import(name, globals, locals, fromlist, level)
# Mock all third-party imports to avoid ImportErrors or other issues
return MagicMock()
if mocking:
# Replace the built-in __import__ function with our custom one
builtins.__import__ = custom_import
try:
# Set sys.dont_write_bytecode to prevent __pycache__ folder generation
sys.dont_write_bytecode = True
spec.loader.exec_module(module)
except Exception as e:
logger.error(f"Error loading model.py: {e}")
raise
finally:
# Restore the original __import__ function and bytecode setting
builtins.__import__ = original_import
sys.dont_write_bytecode = original_dont_write_bytecode
# Find all classes in the model.py file that are subclasses of ModelClass
classes = [
cls
for _, cls in inspect.getmembers(module, inspect.isclass)
if is_related(cls, ModelClass) and cls.__module__ == module.__name__
]
# Ensure there is exactly one subclass of BaseRunner in the model.py file
if len(classes) != 1:
# check for old inheritence structure, ModelRunner used to be a ModelClass
runner_classes = [
cls
for _, cls in inspect.getmembers(module, inspect.isclass)
if cls.__module__ == module.__name__
and any(c.__name__ == 'ModelRunner' for c in cls.__bases__)
]
if runner_classes and len(runner_classes) == 1:
raise Exception(
f'Could not determine model class.'
f' Models should now inherit from {ModelClass.__module__}.ModelClass, not ModelRunner.'
f' Please update your model "{runner_classes[0].__name__}" to inherit from ModelClass.'
)
raise Exception(
"Could not determine model class. There should be exactly one model inheriting from ModelClass defined in the model.py"
)
model_class = classes[0]
return model_class
def _is_standard_or_clarifai(self, name):
"""Check if import is from standard library or clarifai"""
if name.startswith("clarifai"):
return True
# Handle Python <3.10 compatibility
stdlib_names = getattr(sys, "stdlib_module_names", sys.builtin_module_names)
if name in stdlib_names:
return True
# Handle submodules (e.g., os.path)
parts = name.split(".")
for i in range(1, len(parts)):
if ".".join(parts[:i]) in stdlib_names:
return True
return False
def _validate_folder(self, folder):
if folder == ".":
folder = "" # will getcwd() next which ends with /
if not os.path.isabs(folder):
folder = os.path.join(os.getcwd(), folder)
logger.debug(f"Validating folder: {folder}")
if not os.path.exists(folder):
raise FileNotFoundError(
f"Folder {folder} not found, please provide a valid folder path"
)
files = os.listdir(folder)
if "config.yaml" not in files:
raise UserError(f"config.yaml not found in {folder}")
# If just downloading we don't need requirements.txt or the python code, we do need the
# 1/ folder to put 1/checkpoints into.
if "1" not in files:
raise UserError(f"Subfolder '1' not found in {folder}")
if not self.download_validation_only:
if "requirements.txt" not in files:
raise UserError(f"requirements.txt not found in {folder}")
subfolder_files = os.listdir(os.path.join(folder, '1'))
if 'model.py' not in subfolder_files:
raise UserError(f"model.py not found in {folder}/1/")
return folder
@staticmethod
def _load_config(config_file: str):
with open(config_file, 'r') as file:
config = yaml.safe_load(file)
return config
@staticmethod
def _backup_config(config_file: str):
if not os.path.exists(config_file):
return
backup_file = config_file + ".bak"
if os.path.exists(backup_file):
raise FileExistsError(
f"Backup file {backup_file} already exists. Please remove it before proceeding."
)
shutil.copy(config_file, backup_file)
@staticmethod
def _save_config(config_file: str, config: dict):
with open(config_file, 'w') as f:
yaml.safe_dump(config, f)
def _validate_config_checkpoints(self):
"""
Validates the checkpoints section in the config file.
return loader_type, repo_id, hf_token, when, allowed_file_patterns, ignore_file_patterns
:return: loader_type the type of loader or None if no checkpoints.
:return: repo_id location of checkpoint.
:return: hf_token token to access checkpoint.
:return: when one of ['upload', 'build', 'runtime'] to download checkpoint
:return: allowed_file_patterns patterns to allow in downloaded checkpoint
:return: ignore_file_patterns patterns to ignore in downloaded checkpoint
"""
if "checkpoints" not in self.config:
return None, None, None, DEFAULT_DOWNLOAD_CHECKPOINT_WHEN, None, None
if "type" not in self.config.get("checkpoints"):
raise UserError("No loader type specified in checkpoints section of config.yaml")
loader_type = self.config.get("checkpoints").get("type")
if not loader_type:
logger.info("No loader type specified in the config file for checkpoints")
return None, None, None, DEFAULT_DOWNLOAD_CHECKPOINT_WHEN, None, None
checkpoints = self.config.get("checkpoints")
if 'when' not in checkpoints:
logger.warn(
f"No 'when' specified in the config file for checkpoints, defaulting to download at {DEFAULT_DOWNLOAD_CHECKPOINT_WHEN}"
)
when = checkpoints.get("when", DEFAULT_DOWNLOAD_CHECKPOINT_WHEN)
if when not in ["upload", "build", "runtime"]:
raise UserError(
f"Invalid value '{when}' for checkpoints.when. Must be one of: upload, build, runtime"
)
if loader_type != "huggingface":
raise UserError(
f"Unsupported checkpoint loader type '{loader_type}'. Only 'huggingface' is supported."
)
if loader_type == "huggingface":
if "repo_id" not in self.config.get("checkpoints"):
raise UserError("No repo_id specified in checkpoints section of config.yaml")
repo_id = self.config.get("checkpoints").get("repo_id")
# get from config.yaml otherwise fall back to HF_TOKEN env var.
hf_token = self.config.get("checkpoints").get(
"hf_token", os.environ.get("HF_TOKEN", None)
)
allowed_file_patterns = self.config.get("checkpoints").get(
'allowed_file_patterns', None
)
if isinstance(allowed_file_patterns, str):
allowed_file_patterns = [allowed_file_patterns]
ignore_file_patterns = self.config.get("checkpoints").get('ignore_file_patterns', None)
if isinstance(ignore_file_patterns, str):
ignore_file_patterns = [ignore_file_patterns]
return (
loader_type,
repo_id,
hf_token,
when,
allowed_file_patterns,
ignore_file_patterns,
)
def _check_app_exists(self):
resp = self.client.STUB.GetApp(
service_pb2.GetAppRequest(user_app_id=self.client.user_app_id)
)
if resp.status.code == status_code_pb2.SUCCESS:
return True
if resp.status.code == status_code_pb2.CONN_KEY_INVALID:
logger.error(
f"Invalid PAT provided for user {self.client.user_app_id.user_id}. Please check your PAT and try again."
)
return False
user_id = self.client.user_app_id.user_id
app_id = self.client.user_app_id.app_id
if self.app_not_found_action == "error":
logger.error(
f"Error checking API {self._base_api} for user app `{user_id}/{app_id}`. Error code: {resp.status.code}"
)
logger.error(
f"App `{app_id}` not found for user `{user_id}`. Please create the app first and try again."
)
return False
else:
user = User(
user_id=user_id,
pat=self.client.pat,
token=self.client.token,
base_url=self.client.base,
)
def create_app():
logger.info(f"Creating App `{app_id}` for user `{user_id}`.")
try:
user.create_app(app_id=app_id)
except Exception as e:
error_msg = str(e)
if 'CONN_DOES_NOT_EXIST' in error_msg or 'not found' in error_msg:
logger.error(
f"Failed to create app '{app_id}' for user '{user_id}'. "
f"This usually means your PAT doesn't have access to this user account.\n"
f"To fix this, verify your PAT matches the user_id:\n"
f" - Run 'clarifai config list' to check your saved configs\n"
f" - Run 'clarifai config use <context>' to switch to the correct config\n"
f" - Or set 'user_id' explicitly in your config.yaml"
)
else:
logger.error(f"Failed to create app '{app_id}': {e}")
raise
logger.info(f"App {app_id} not found for user {user_id}.")
if self.app_not_found_action == "prompt":
create_app_prompt = input(f"Do you want to create App `{app_id}`? (y/n): ")
if create_app_prompt.lower() == 'y':
create_app()
return True
else:
logger.error(
f"App `{app_id}` has not been created for user `{user_id}`. Please create the app first or switch to an existing one, then try again."
)
return False
elif self.app_not_found_action == "auto_create":
create_app()
return True
@staticmethod
def normalize_config(config, user_id=None, app_id=None):
"""Expand simplified config format to full format.
Handles:
1. Inject user_id/app_id from CLI context if missing
2. Expand compute.instance (or legacy compute.gpu) -> inference_compute_info
3. Expand simplified checkpoints (infer type, default when)
4. Set build_info defaults
This is a no-op for configs that already have all fields.
"""
config = dict(config)
# 1. Inject user_id/app_id into model section if missing
model = dict(config.get('model', {}))
if user_id and 'user_id' not in model:
model['user_id'] = user_id
if app_id and 'app_id' not in model:
model['app_id'] = app_id
# Default app_id to "main" if still missing (auto-created on deploy/upload)
if 'app_id' not in model:
model['app_id'] = 'main'
# Default model_type_id to "any-to-any" if not specified
if 'model_type_id' not in model:
model['model_type_id'] = 'any-to-any'
config['model'] = model
# 2. Expand compute.instance (or legacy compute.gpu) -> inference_compute_info
compute = config.get('compute')
if compute and 'inference_compute_info' not in config:
instance = compute.get('instance') or compute.get('gpu')
if instance:
from clarifai.utils.compute_presets import get_inference_compute_for_gpu
try:
ici = get_inference_compute_for_gpu(instance)
# Always use wildcard accelerator_type so the model can be scheduled
# on any compatible GPU, not locked to a specific type.
if ici.get('num_accelerators', 0) > 0:
from clarifai.utils.compute_presets import get_accelerator_wildcard
ici['accelerator_type'] = [
get_accelerator_wildcard(
instance_type_id=instance,
accelerator_types=ici.get('accelerator_type'),
)
]
config['inference_compute_info'] = ici
except ValueError:
logger.debug(
f"Could not resolve compute instance '{instance}'. "
"Skipping inference_compute_info normalization."
)
# Normalize to compute.instance
compute['instance'] = instance
compute.pop('gpu', None)
# 3. Expand simplified checkpoints
checkpoints = config.get('checkpoints')
if checkpoints:
checkpoints = dict(checkpoints)
if 'type' not in checkpoints and 'repo_id' in checkpoints:
checkpoints['type'] = 'huggingface'
if 'when' not in checkpoints:
checkpoints['when'] = 'runtime'
config['checkpoints'] = checkpoints
# 4. Build info defaults
if 'build_info' not in config:
config['build_info'] = {'python_version': '3.12'}
return config
def _validate_config_model(self):
if "model" not in self.config:
raise UserError("'model' section not found in config.yaml")
model = self.config.get('model')
if "user_id" not in model:
raise UserError(
"user_id could not be resolved. Either:\n"
" - Add 'user_id' to the model section in config.yaml\n"
" - Run 'clarifai login' to set up your CLI config"
)
if "app_id" not in model:
raise UserError("app_id not found in config.yaml")
if "model_type_id" not in model:
model["model_type_id"] = "any-to-any"
if "id" not in model:
raise UserError("model id not found in the model section of config.yaml")
if '.' in model.get('id', ''):
raise UserError(
"Model ID cannot contain '.'. Please remove it from the model id in config.yaml."
)
if not model.get('user_id'):
raise UserError("user_id cannot be empty in config.yaml")
if not model.get('app_id'):
raise UserError("app_id cannot be empty in config.yaml")
if not model.get('id'):
raise UserError("model id cannot be empty in config.yaml")
if not self._check_app_exists():
sys.exit(1)
@staticmethod
def _set_local_runner_model(config, user_id, app_id, model_id, model_type_id):
"""
Sets the model configuration for local development.
This is used when running the model locally without uploading it to Clarifai.
"""
if 'model' not in config:
config['model'] = {}
config["model"]["user_id"] = user_id
config["model"]["app_id"] = app_id
config["model"]["id"] = model_id
config["model"]["model_type_id"] = model_type_id
return config
def _validate_config(self):
if not self.download_validation_only:
self._validate_config_model()
if "inference_compute_info" not in self.config:
logger.warning(
"inference_compute_info not found in config. "
"Set 'compute.instance' or 'inference_compute_info' in config.yaml for deployment."
)
if self.config.get("concepts"):
model_type_id = self.config.get('model').get('model_type_id')
if model_type_id not in CONCEPTS_REQUIRED_MODEL_TYPE:
raise UserError(f"Model type '{model_type_id}' not supported for concepts")
if self.config.get("checkpoints"):
loader_type, _, hf_token, when, _, _ = self._validate_config_checkpoints()
if loader_type == "huggingface":
is_valid_token = hf_token and HuggingFaceLoader.validate_hftoken(hf_token)
if not is_valid_token and hf_token:
logger.info(
"Continuing without Hugging Face token for validating config in model builder."
)
repo_id = self.config.get("checkpoints", {}).get("repo_id")
config_hf_token = self.config.get("checkpoints", {}).get("hf_token")
# First, check anonymous access (no cached login) to detect gated repos.
has_access, reason = HuggingFaceLoader.validate_hf_repo_access(
repo_id=repo_id,
token=False, # bypass cached huggingface-cli login
)
if has_access:
# Public repo — no token needed anywhere.
pass
elif reason == "gated_no_token":
# Repo requires auth. Validate with the available token.
if is_valid_token:
has_access, reason = HuggingFaceLoader.validate_hf_repo_access(
repo_id=repo_id,
token=hf_token,
)
if not is_valid_token or not has_access:
if not is_valid_token:
reason = "gated_no_token"
self._raise_hf_access_error(repo_id, reason)
# Token works — for build/runtime, persist it to config so
# the container has it too.
if when in ("build", "runtime") and not config_hf_token:
self.config.setdefault("checkpoints", {})["hf_token"] = hf_token
config_path = os.path.join(self.folder, "config.yaml")
if os.path.exists(config_path):
with open(config_path, 'r') as f:
file_config = yaml.safe_load(f) or {}
file_config.setdefault("checkpoints", {})["hf_token"] = hf_token
with open(config_path, 'w') as f:
yaml.dump(file_config, f, sort_keys=False)
logger.info(
"Wrote HF_TOKEN from environment to config.yaml "
"so the build container can access the gated repo."
)
else:
# not_found or gated_no_access
self._raise_hf_access_error(repo_id, reason)
num_threads = self.config.get("num_threads")
if num_threads or num_threads == 0:
if not isinstance(num_threads, int) or num_threads < 1:
raise UserError(f"num_threads must be an integer >= 1. Got: {num_threads!r}")
else:
num_threads = int(os.environ.get("CLARIFAI_NUM_THREADS", 16))
self.config["num_threads"] = num_threads
dereference_symlinks = self.config.get("build_info", {}).get("dereference_symlinks")
if dereference_symlinks is not None and not isinstance(dereference_symlinks, bool):
raise UserError(
"build_info.dereference_symlinks must be a boolean when provided. "
f"Got: {dereference_symlinks!r}"
)
# Validate AgenticModelClass requirements
if not self.download_validation_only:
self._validate_agentic_model_requirements()
def _should_dereference_symlinks(self):
"""Whether tar packaging should follow symlinks and embed target file contents."""
return self.config.get("build_info", {}).get("dereference_symlinks", False)
@staticmethod
def _raise_hf_access_error(repo_id, reason):
"""Raise UserError with actionable guidance for HuggingFace access failures."""
if reason == "gated_no_token":
raise UserError(
f"HuggingFace repo '{repo_id}' requires authentication.\n"
" Set HF_TOKEN in your environment:\n"
" export HF_TOKEN=hf_...\n"
" Or add to config.yaml:\n"
" checkpoints:\n"
" hf_token: hf_...\n"
f" Request access at: https://huggingface.co/{repo_id}"
)
elif reason == "gated_no_access":
raise UserError(
f"Your HF token does not have access to gated repo '{repo_id}'.\n"
f" Request access at: https://huggingface.co/{repo_id}\n"
" Then wait for approval and retry."
)
elif reason == "not_found":
raise UserError(
f"HuggingFace repo '{repo_id}' not found.\n"
" Check the repo_id in your config.yaml checkpoints section."
)
else:
raise UserError(f"Cannot access HuggingFace repo '{repo_id}'.")
def _validate_agentic_model_requirements(self):
"""
Validate that AgenticModelClass models have required dependencies (fastmcp and mcp) in requirements.txt.
"""
try:
# Load the model class with mocking to avoid import errors
model_class = self.load_model_class(mocking=True)
# Check if the model class is a subclass of AgenticModelClass
if issubclass(model_class, AgenticModelClass):
# Parse requirements.txt to check for required packages
dependencies = self._parse_requirements()
missing_packages = []
if 'fastmcp' not in dependencies:
missing_packages.append('fastmcp')
if 'mcp' not in dependencies:
missing_packages.append('mcp')
if missing_packages:
logger.error(
f"Model class '{model_class.__name__}' inherits from AgenticModelClass, "
f"but the following required packages are missing from requirements.txt: {', '.join(missing_packages)}, which are required for agentic models. "
f"Please add these packages to your requirements.txt file."
)
sys.exit(1)
except Exception as e:
# If we can't load the model class, log a warning but don't fail
# This could happen if there are import errors, but we don't want to block
# non-agentic models from being uploaded
logger.debug(f"Could not validate AgenticModelClass requirements: {e}")
def _validate_stream_options(self):
"""
Validate OpenAI streaming configuration for Clarifai models.
"""
if self.download_validation_only:
return
if not self._is_clarifai_internal():
return # Skip validation for non-clarifai models
# Parse all Python files once
all_python_content = self._get_all_python_content()
if self._uses_openai_streaming(all_python_content):
logger.info(
"Detected OpenAI chat completions for Clarifai model streaming - validating stream_options..."
)
if not self.has_proper_usage_tracking(all_python_content):
logger.warning(
"Missing configuration to track usage for OpenAI chat completion calls. "
"Go to your model scripts and make sure to set both: "
"1) stream_options={'include_usage': True}"
"2) set_output_context"
)
def _validate_config_secrets(self):
"""
Validate the secrets section in the config file.
"""
if "secrets" not in self.config:
return
secrets = self.config.get("secrets", [])
if not isinstance(secrets, list):
raise ValueError("The 'secrets' field must be an array.")
for i, secret in enumerate(secrets):
if not isinstance(secret, dict):
raise ValueError(f"Secret at index {i} must be a dictionary.")
# Validate required fields
if "id" not in secret or not secret["id"]:
raise ValueError(f"Secret at index {i} must have a non-empty 'id' field.")
if "type" not in secret or not secret["type"]:
secret["type"] = "env"
if "env_var" not in secret or not secret["env_var"]:
raise ValueError(f"Secret at index {i} must have a non-empty 'env_var' field.")
# Validate secret type
if secret["type"] != "env":
raise ValueError(
f"Secret at index {i} has invalid type '{secret['type']}'. Must be 'env'."
)
logger.info(f"Validated {len(secrets)} secrets in config file.")
def _process_secrets(self):
"""
Process secrets from config file and create/validate them using the User client.
Returns the processed secrets array for inclusion in ModelVersion.OutputInfo.Params.
"""
if "secrets" not in self.config:
return []
secrets = self.config.get("secrets", [])
if not secrets:
return []
# Get user client for secret operations
user = User(
user_id=self.config.get('model').get('user_id'),
pat=self.client.pat,
token=self.client.token,
base_url=self.client.base,
)
processed_secrets = []
secrets_to_create = []
for secret in secrets:
secret_id = secret["id"]
secret_type = secret.get("type", "env")
env_var = secret["env_var"]
secret_value = secret.get("value") # Optional for existing secrets
# Check if secret already exists
try:
existing_secret = user.get_secret(secret_id)
logger.info(f"Secret '{secret_id}' already exists, using existing secret.")
# Add to processed secrets without the value
processed_secret = {
"id": secret_id,
"type": secret_type,
"env_var": env_var,
}
processed_secrets.append(processed_secret)
except Exception:
# Secret doesn't exist, need to create it
if secret_value:
logger.info(f"Secret '{secret_id}' does not exist, will create it.")
secrets_to_create.append(
{
"id": secret_id,
"value": secret_value,
"description": secret.get("description", f"Secret for {env_var}"),
}
)
# Add to processed secrets
processed_secret = {
"id": secret_id,
"type": secret_type,
"env_var": env_var,
}
processed_secrets.append(processed_secret)
else:
raise ValueError(
f"Secret '{secret_id}' does not exist and no value provided for creation."
)
# Create new secrets if any
if secrets_to_create:
try:
created_secrets = user.create_secrets(secrets_to_create)
logger.info(f"Successfully created {len(created_secrets)} new secrets.")
except Exception as e:
logger.error(f"Failed to create secrets: {e}")
raise
return processed_secrets
def _is_clarifai_internal(self):
"""
Check if the current user is a Clarifai internal user based on email domain.
Returns:
bool: True if user is a Clarifai internal user, False otherwise
"""
try:
# Get user info from Clarifai API
user_client = User(
pat=self.client.pat, user_id=self.config.get('model').get('user_id')
)
user_response = user_client.get_user_info()
user = user_response.user
# Check primary email domain
if hasattr(user, 'primary_email') and user.primary_email:
return user.primary_email.endswith('@clarifai.com')
return False
except Exception as e:
# Gracefully handle insufficient scopes (dev environment) or any other errors
error_msg = str(e)
if "CONN_INSUFFICIENT_SCOPES" in error_msg:
logger.debug("Skipping user validation due to insufficient scopes")
else:
logger.debug(f"User validation failed (skip validation and continue): {e}")
return False
def _get_all_python_content(self):
"""
Parse and concatenate all Python files in the model's 1/ subfolder.
"""
model_folder = os.path.join(self.folder, '1')
if not os.path.exists(model_folder):
return ""
all_content = []
for root, _, files in os.walk(model_folder):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
all_content.append(f.read())
except Exception:
continue
return "\n".join(all_content)
def _uses_openai_streaming(self, python_content):
return 'chat.completions.create' in python_content and 'generate(' in python_content
def has_proper_usage_tracking(self, python_content):
include_usage_patterns = ["'include_usage': True", '"include_usage": True']
has_include_usage = any(pattern in python_content for pattern in include_usage_patterns)
has_set_output_context = 'set_output_context' in python_content