forked from redhat-developer/rhdh
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinstall-dynamic-plugins.py
More file actions
executable file
·1175 lines (952 loc) · 54.8 KB
/
install-dynamic-plugins.py
File metadata and controls
executable file
·1175 lines (952 loc) · 54.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright (c) 2023 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
from enum import StrEnum
import hashlib
import json
import os
import sys
import tempfile
import yaml
import tarfile
import shutil
import subprocess
import base64
import binascii
import atexit
import time
import signal
import re
"""
Dynamic Plugin Installer for Backstage Application
This script is used to install dynamic plugins in the Backstage application, and is available in the container image to be called at container initialization, for example in an init container when using Kubernetes.
It expects, as the only argument, the path to the root directory where the dynamic plugins will be installed.
Environment Variables:
MAX_ENTRY_SIZE: Maximum size of a file in the archive (default: 20MB)
SKIP_INTEGRITY_CHECK: Set to "true" to skip integrity check of remote packages
CATALOG_INDEX_IMAGE: OCI image reference for the plugin catalog index (e.g., quay.io/rhdh/plugin-catalog-index:1.9)
Configuration:
The script expects the `dynamic-plugins.yaml` file to be present in the current directory and to contain the list of plugins to install along with their optional configuration.
The `dynamic-plugins.yaml` file must contain:
- a `plugins` list of objects with the following properties:
- `package`: the package to install (NPM package name, local path starting with './', or OCI image starting with 'oci://')
- For OCI packages ONLY, the tag or digest can be replaced by the `{{inherit}}` tag (requires the included configuration to contain a valid tag or digest to inherit from)
- If the OCI image contains only a single plugin, the plugin path can be omitted and will be auto-detected from the image metadata (normally specified by !<plugin-path>)
- When using `{{inherit}}`, the plugin path can also be omitted to inherit both version and path from a base configuration (only works if exactly one plugin from that image is defined in included files)
- `integrity`: a string containing the integrity hash of the package (required for remote NPM packages unless SKIP_INTEGRITY_CHECK is set, optional for local packages, not used for OCI packages)
- `pluginConfig`: an optional plugin-specific configuration fragment
- `disabled`: an optional boolean to disable the plugin (`false` by default)
- `pullPolicy`: download behavior control - 'IfNotPresent' (default) or 'Always' (OCI packages with ':latest!' default to 'Always')
- `forceDownload`: an optional boolean to force download for NPM packages even if already installed (`false` by default)
- an optional `includes` list of yaml files to include, each file containing a list of plugins
The plugins listed in the included files will be included in the main list of considered plugins and possibly overwritten by the plugins already listed in the main `plugins` list.
A simple empty example `dynamic-plugins.yaml` file:
```yaml
includes:
- dynamic-plugins.default.yaml
plugins: []
```
Package Types:
1. NPM packages: Standard package names (e.g., '@backstage/plugin-catalog')
2. Local packages: Paths starting with './' (e.g., './my-local-plugin') - automatically detects changes via package.json version, modification times, and lock files
3. OCI packages: Images starting with 'oci://' (e.g., 'oci://quay.io/user/plugin:v1.0!plugin-name')
Pull Policies:
- IfNotPresent: Only download if not already installed (default for most packages)
- Always: Always check for updates and download if different (default for OCI packages with ':latest!' tag)
Process:
For each enabled plugin mentioned in the main `plugins` list and the various included files, the script will:
- For NPM packages: call `npm pack` to get the package archive and extract it
- For OCI packages: use `skopeo` to download and extract the specified plugin from the container image
- For local packages: pack and extract from the local filesystem
- Verify package integrity (for remote NPM packages only, unless skipped)
- Track installation state using hash files to detect changes and avoid unnecessary re-downloads
- Merge the plugin-specific configuration fragment in a global configuration file named `app-config.dynamic-plugins.yaml`
"""
class PullPolicy(StrEnum):
IF_NOT_PRESENT = 'IfNotPresent'
ALWAYS = 'Always'
# NEVER = 'Never' not needed
class InstallException(Exception):
"""Exception class from which every exception in this library will derive."""
pass
# Refer to https://github.com/opencontainers/image-spec/blob/main/descriptor.md#registered-algorithms
RECOGNIZED_ALGORITHMS = (
'sha512',
'sha256',
'blake3',
)
DOCKER_PROTOCOL_PREFIX = 'docker://'
OCI_PROTOCOL_PREFIX = 'oci://'
def merge(source, destination, prefix = ''):
for key, value in source.items():
if isinstance(value, dict):
# get node or create one
node = destination.setdefault(key, {})
merge(value, node, key + '.')
else:
# if key exists in destination trigger an error
if key in destination and destination[key] != value:
raise InstallException(f"Config key '{ prefix + key }' defined differently for 2 dynamic plugins")
destination[key] = value
return destination
def maybe_merge_config(config, global_config):
if config is not None and isinstance(config, dict):
print('\t==> Merging plugin-specific configuration', flush=True)
return merge(config, global_config)
else:
return global_config
def merge_plugin(plugin: dict, all_plugins: dict, dynamic_plugins_file: str, level: int):
package = plugin['package']
if not isinstance(package, str):
raise InstallException(f"content of the \'plugins.package\' field must be a string in {dynamic_plugins_file}")
if package.startswith(OCI_PROTOCOL_PREFIX):
return OciPackageMerger(plugin, dynamic_plugins_file, all_plugins).merge_plugin(level)
else:
# Use NPMPackageMerger for all other package types (NPM, git, local, tarball, etc.)
return NPMPackageMerger(plugin, dynamic_plugins_file, all_plugins).merge_plugin(level)
def get_oci_plugin_paths(image: str) -> list[str]:
"""
Get list of plugin paths from OCI image via manifest annotation.
Args:
image: OCI image reference (e.g., 'oci://registry/path:tag')
Returns:
List of plugin paths from the manifest annotation
"""
skopeo_path = shutil.which('skopeo')
if not skopeo_path:
raise InstallException('skopeo executable not found in PATH')
try:
image_url = image.replace(OCI_PROTOCOL_PREFIX, DOCKER_PROTOCOL_PREFIX)
result = subprocess.run(
[skopeo_path, 'inspect', '--raw', image_url],
check=True,
capture_output=True
)
manifest = json.loads(result.stdout)
annotations = manifest.get('annotations', {})
annotation_value = annotations.get('io.backstage.dynamic-packages')
if not annotation_value:
return []
decoded = base64.b64decode(annotation_value).decode('utf-8')
plugins_metadata = json.loads(decoded)
plugin_paths = []
for plugin_obj in plugins_metadata:
if isinstance(plugin_obj, dict):
plugin_paths.extend(plugin_obj.keys())
return plugin_paths
except Exception as e:
raise InstallException(f"Failed to read plugin metadata from {image}: {e}")
class PackageMerger:
def __init__(self, plugin: dict, dynamic_plugins_file: str, all_plugins: dict):
self.plugin = plugin
self.dynamic_plugins_file = dynamic_plugins_file
self.all_plugins = all_plugins
def parse_plugin_key(self, package: str) -> str:
"""Parses the package and returns the plugin key. Must be implemented by subclasses."""
return package
def add_new_plugin(self, _version: str, _inherit_version: bool, plugin_key: str):
"""Adds a new plugin to the all_plugins dict."""
self.all_plugins[plugin_key] = self.plugin
def override_plugin(self, _version: str, _inherit_version: bool, plugin_key: str):
"""Overrides an existing plugin config with a new plugin config in the all_plugins dict."""
for key in self.plugin:
self.all_plugins[plugin_key][key] = self.plugin[key]
def merge_plugin(self, level: int):
plugin_key = self.plugin['package']
if not isinstance(plugin_key, str):
raise InstallException(f"content of the \'package\' field must be a string in {self.dynamic_plugins_file}")
plugin_key = self.parse_plugin_key(plugin_key)
if plugin_key not in self.all_plugins:
print(f'\n======= Adding new dynamic plugin configuration for {plugin_key}', flush=True)
# Keep track of the level of the plugin modification to know when dupe conflicts occur in `includes` and main config files
self.plugin["last_modified_level"] = level
self.add_new_plugin("", False, plugin_key)
else:
# Override the included plugins with fields in the main plugins list
print('\n======= Overriding dynamic plugin configuration', plugin_key, flush=True)
# Check for duplicate plugin configurations defined at the same level (level = 0 for `includes` and 1 for the main config file)
if self.all_plugins[plugin_key].get("last_modified_level") == level:
raise InstallException(f"Duplicate plugin configuration for {self.plugin['package']} found in {self.dynamic_plugins_file}.")
self.all_plugins[plugin_key]["last_modified_level"] = level
self.override_plugin("", False, plugin_key)
class NPMPackageMerger(PackageMerger):
"""Handles NPM package merging with version stripping for plugin keys."""
# Ref: https://docs.npmjs.com/cli/v11/using-npm/package-spec
# Pattern for standard NPM packages: [@scope/]package[@version|@tag|@version-range|] or [@scope/]package
# Pattern for standard NPM packages: [@scope/]package[@version|@tag|@version-range|] or [@scope/]package
NPM_PACKAGE_PATTERN = (
r'(@[^/]+/)?' # Optional @scope
r'([^@]+)' # Package name
r'(?:@(.+))?' # Optional @version, @tag, or @version-range
r'$'
)
STANDARD_NPM_PACKAGE_PATTERN = r'^' + NPM_PACKAGE_PATTERN
# Pattern for NPM aliases: alias@npm:[@scope/]package[@version|@tag]
NPM_ALIAS_PATTERN = r'^([^@]+)@npm:' + NPM_PACKAGE_PATTERN
GITHUB_USERNAME_PATTERN = r'([^/@]+)/([^/#]+)' # username/repo
# Pattern for git URLs to strip out the #ref part for the plugin key
GIT_URL_PATTERNS = [
# git+https://...[#ref]
(
r'^git\+https?://[^#]+' # git+http(s)://<repo>
r'(?:#(.+))?' # Optional #ref
r'$'
),
# git+ssh://...[#ref]
(
r'^git\+ssh://[^#]+'
r'(?:#(.+))?'
r'$'
),
# git://...[#ref]
(
r'^git://[^#]+'
r'(?:#(.+))?'
r'$'
),
# https://github.com/user/repo(.git)?[#ref]
(
r'^https://github\.com/[^/]+/[^/#]+'
r'(?:\.git)?'
r'(?:#(.+))?'
r'$'
),
# git@github.com:user/repo(.git)?[#ref]
(
r'^git@github\.com:[^/]+/[^/#]+'
r'(?:\.git)?'
r'(?:#(.+))?'
r'$'
),
# github:user/repo[#ref]
(
r'^github:' + GITHUB_USERNAME_PATTERN +
r'(?:#(.+))?' +
r'$'
),
# user/repo[#ref]
(
r'^' + GITHUB_USERNAME_PATTERN +
r'(?:#(.+))?' +
r'$'
)
]
def __init__(self, plugin: dict, dynamic_plugins_file: str, all_plugins: dict):
super().__init__(plugin, dynamic_plugins_file, all_plugins)
def parse_plugin_key(self, package: str) -> str:
"""
Parses NPM package specification and returns a version-stripped plugin key.
Handles various NPM package formats specified in https://docs.npmjs.com/cli/v11/using-npm/package-spec:
- Standard packages: [@scope/]package[@version] -> [@scope/]package
- Aliases: alias@npm:package[@version] -> alias@npm:package
- Git URLs: git+https://... -> git+https://... (without #ref)
- GitHub shorthand: user/repo#ref -> user/repo
- Local paths: ./path -> ./path (unchanged)
- Tarballs: kept as-is since there is no standard format for them
"""
# Local packages don't need version stripping
if package.startswith('./'):
return package
# Tarballs are kept as-is since there is no standard format for them
if package.endswith('.tgz'):
return package
# remove @version from NPM aliases: alias@npm:package[@version]
alias_match = re.match(self.NPM_ALIAS_PATTERN, package)
if alias_match:
alias_name = alias_match.group(1)
package_scope = alias_match.group(2) or ''
npm_package = alias_match.group(3)
# Recursively parse the npm package part to strip its version
npm_key = self._strip_npm_package_version(package_scope + npm_package)
return f"{alias_name}@npm:{npm_key}"
# Check for git URLs
for git_pattern in self.GIT_URL_PATTERNS:
git_match = re.match(git_pattern, package)
if git_match:
# Remove the #ref part if present
return package.split('#')[0]
# Handle standard NPM packages
return self._strip_npm_package_version(package)
def _strip_npm_package_version(self, package: str) -> str:
"""Strip version from standard NPM package name."""
npm_match = re.match(self.STANDARD_NPM_PACKAGE_PATTERN, package)
if npm_match:
scope = npm_match.group(1) or ''
pkg_name = npm_match.group(2)
return f"{scope}{pkg_name}"
# If no pattern matches, return as-is (could be tarball URL or other format)
return package
class PluginInstaller:
"""Base class for plugin installers with common functionality."""
def __init__(self, destination: str, skip_integrity_check: bool = False):
self.destination = destination
self.skip_integrity_check = skip_integrity_check
def should_skip_installation(self, plugin: dict, plugin_path_by_hash: dict) -> tuple[bool, str]:
"""Check if plugin installation should be skipped based on pull policy and current state."""
plugin_hash = plugin['plugin_hash']
pull_policy = plugin.get('pullPolicy', PullPolicy.IF_NOT_PRESENT)
force_download = plugin.get('forceDownload', False)
if plugin_hash not in plugin_path_by_hash:
return False, "not_installed"
if pull_policy == PullPolicy.ALWAYS or force_download:
return False, "force_download"
return True, "already_installed"
def install(self, plugin: dict, plugin_path_by_hash: dict) -> str:
"""Install a plugin and return the plugin path. Must be implemented by subclasses."""
raise NotImplementedError()
class OciPackageMerger(PackageMerger):
EXPECTED_OCI_PATTERN = (
r'^(' + OCI_PROTOCOL_PREFIX + r'[^\s:@]+)'
r'(?:'
r':([^\s!@:]+)' # tag only
r'|'
r'@((?:sha256|sha512|blake3):[^\s!@:]+)' # digest only
r')'
r'(?:!([^\s]+))?$' # plugin path is optional for single plugin packages
)
def __init__(self, plugin: dict, dynamic_plugins_file: str, all_plugins: dict):
super().__init__(plugin, dynamic_plugins_file, all_plugins)
def parse_plugin_key(self, package: str) -> tuple[str, str, bool, str]:
"""
Parses and validates OCI package name format.
Generates a plugin key and version from the OCI package name.
Also checks if the {{inherit}} tag is used correctly.
Args:
package: The OCI package name.
Returns:
plugin_key: plugin key generated from the OCI package name
version: detected tag or digest of the plugin
inherit_version: boolean indicating if the `{{inherit}}` tag is used
resolved_path: the resolved plugin path (either explicit or auto-detected)
"""
match = re.match(self.EXPECTED_OCI_PATTERN, package)
if not match:
raise InstallException(f"oci package \'{package}\' is not in the expected format \'{OCI_PROTOCOL_PREFIX}<registry>:<tag>\' or \'{OCI_PROTOCOL_PREFIX}<registry>@sha<algo>:<digest>\' (optionally followed by \'!<path>\') in {self.dynamic_plugins_file} where <algo> is one of {RECOGNIZED_ALGORITHMS}")
# Strip away the version (tag or digest) from the package string, resulting in oci://<registry>:!<path>
# This helps ensure keys used to identify OCI plugins are independent of the version of the plugin
registry = match.group(1)
tag_version = match.group(2)
digest_version = match.group(3)
version = tag_version if tag_version else digest_version
path = match.group(4)
# {{inherit}} tag indicates that the version should be inherited from the included configuration. Must NOT have a SHA digest included.
inherit_version = (tag_version == "{{inherit}}" and digest_version == None)
# If {{inherit}} without path, we'll use plugin name with registry as the plugin key
if inherit_version and not path:
# Return None for resolved_path - will be inherited during merge_plugin()
return registry, version, inherit_version, None
# If path is None, auto-detect from OCI manifest
if not path:
full_image = f"{registry}:{version}" if tag_version else f"{registry}@{version}"
print(f"\n======= No plugin path specified for {full_image}, auto-detecting from OCI manifest", flush=True)
plugin_paths = get_oci_plugin_paths(full_image)
if len(plugin_paths) == 0:
raise InstallException(
f"No plugins found in OCI image {full_image}."
f"The image might not contain the 'io.backstage.dynamic-packages' annotation."
f"Please ensure this was packaged correctly using the @red-hat-developer-hub/cli plugin package command."
)
if len(plugin_paths) > 1:
plugins_list = '\n - '.join(plugin_paths)
raise InstallException(
f"Multiple plugins found in OCI image {full_image}:\n - {plugins_list}\n"
f"Please specify which plugin to install using the syntax: {full_image}!<plugin-name>"
)
path = plugin_paths[0]
print(f'\n======= Auto-resolving OCI package {full_image} to use plugin path: {path}', flush=True)
# At this point, path always exists (either explicitly provided or auto-detected)
plugin_key = f"{registry}:!{path}"
return plugin_key, version, inherit_version, path
def add_new_plugin(self, version: str, inherit_version: bool, plugin_key: str):
"""
Adds a new plugin to the all_plugins dict.
"""
if inherit_version is True:
# Cannot use {{inherit}} for the initial plugin configuration
raise InstallException(f"ERROR: {{{{inherit}}}} tag is set and there is currently no resolved tag or digest for {self.plugin['package']} in {self.dynamic_plugins_file}.")
else:
self.plugin["version"] = version
self.all_plugins[plugin_key] = self.plugin
def override_plugin(self, version: str, inherit_version: bool, plugin_key: str):
"""
Overrides an existing plugin config with a new plugin config in the all_plugins dict.
If `inherit_version` is True, the version of the existing plugin config will be ignored.
"""
if inherit_version is not True:
self.all_plugins[plugin_key]['package'] = self.plugin['package'] # Override package since no version inheritance
if self.all_plugins[plugin_key]['version'] != version:
print(f"INFO: Overriding version for {plugin_key} from `{self.all_plugins[plugin_key]['version']}` to `{version}`")
self.all_plugins[plugin_key]["version"] = version
for key in self.plugin:
if key == 'package':
continue
if key == "version":
continue
self.all_plugins[plugin_key][key] = self.plugin[key]
def merge_plugin(self, level: int):
package = self.plugin['package']
if not isinstance(package, str):
raise InstallException(f"content of the \'package\' field must be a string in {self.dynamic_plugins_file}")
plugin_key, version, inherit_version, resolved_path = self.parse_plugin_key(package)
# Special case: {{inherit}} without explicit path - match on image only
if inherit_version and resolved_path is None:
# plugin_key is the registry (oci://registry/image) when path is omitted
# Find plugins from same image (ignoring path component)
matches = [key for key in self.all_plugins.keys()
if key.startswith(f"{plugin_key}:!")]
if len(matches) == 0:
raise InstallException(
f"Cannot use {{{{inherit}}}} for {plugin_key}: no existing plugin configuration found. "
f"Ensure a plugin from this image is defined in an included file with an explicit version."
)
if len(matches) > 1:
full_packages = []
for m in matches:
base_plugin = self.all_plugins[m]
base_version = base_plugin.get('version', '')
formatted = f"{m.split(':!')[0]}:{base_version}!{m.split(':!')[-1]}"
full_packages.append(formatted)
paths_formatted = '\n - '.join(full_packages)
raise InstallException(
f"Cannot use {{{{inherit}}}} for {plugin_key}: multiple plugins from this image are defined in the included files:\n - {paths_formatted}\n"
f"Please specify which plugin configuration to inherit from using: {plugin_key}:{{{{inherit}}}}!<plugin_path>"
)
# inherit both version AND path from the existing plugin configuration
plugin_key = matches[0]
base_plugin = self.all_plugins[plugin_key]
version = base_plugin['version']
resolved_path = plugin_key.split(':!')[-1]
registry_part = plugin_key.split(':!')[0]
self.plugin['package'] = f"{registry_part}:{version}!{resolved_path}"
print(f'\n======= Inheriting version `{version}` and plugin path `{resolved_path}` for {plugin_key}', flush=True)
# Update package with resolved path if it was auto-detected (package didn't originally contain !path)
elif '!' not in package:
self.plugin['package'] = f"{package}!{resolved_path}"
# If package does not already exist, add it
if plugin_key not in self.all_plugins:
print(f'\n======= Adding new dynamic plugin configuration for version `{version}` of {plugin_key}', flush=True)
# Keep track of the level of the plugin modification to know when dupe conflicts occur in `includes` and main config files
self.plugin["last_modified_level"] = level
self.add_new_plugin(version, inherit_version, plugin_key)
else:
# Override the included plugins with fields in the main plugins list
print('\n======= Overriding dynamic plugin configuration', plugin_key, flush=True)
# Check for duplicate plugin configurations defined at the same level (level = 0 for `includes` and 1 for the main config file)
if self.all_plugins[plugin_key].get("last_modified_level") == level:
raise InstallException(f"Duplicate plugin configuration for {self.plugin['package']} found in {self.dynamic_plugins_file}.")
self.all_plugins[plugin_key]["last_modified_level"] = level
self.override_plugin(version, inherit_version, plugin_key)
class OciDownloader:
"""Helper class for downloading and extracting plugins from OCI container images."""
def __init__(self, destination: str):
self._skopeo = shutil.which('skopeo')
if self._skopeo is None:
raise InstallException('skopeo executable not found in PATH')
self.tmp_dir_obj = tempfile.TemporaryDirectory()
self.tmp_dir = self.tmp_dir_obj.name
self.image_to_tarball = {}
self.destination = destination
self.max_entry_size = int(os.environ.get('MAX_ENTRY_SIZE', 20000000))
def skopeo(self, command):
rv = subprocess.run([self._skopeo] + command, check=True, capture_output=True)
if rv.returncode != 0:
raise InstallException(f'Error while running skopeo command: {rv.stderr}')
return rv.stdout
def get_plugin_tar(self, image: str) -> str:
if image not in self.image_to_tarball:
# run skopeo copy to copy the tar ball to the local filesystem
print(f'\t==> Copying image {image} to local filesystem', flush=True)
image_digest = hashlib.sha256(image.encode('utf-8'), usedforsecurity=False).hexdigest()
local_dir = os.path.join(self.tmp_dir, image_digest)
# replace oci:// prefix with docker://
image_url = image.replace(OCI_PROTOCOL_PREFIX, DOCKER_PROTOCOL_PREFIX)
self.skopeo(['copy', image_url, f'dir:{local_dir}'])
manifest_path = os.path.join(local_dir, 'manifest.json')
manifest = json.load(open(manifest_path))
# get the first layer of the image
layer = manifest['layers'][0]['digest']
(_sha, filename) = layer.split(':')
local_path = os.path.join(local_dir, filename)
self.image_to_tarball[image] = local_path
return self.image_to_tarball[image]
def extract_plugin(self, tar_file: str, plugin_path: str) -> None:
with tarfile.open(tar_file, 'r:*') as tar: # NOSONAR
# extract only the files in specified directory
files_to_extract = []
for member in tar.getmembers():
if not member.name.startswith(plugin_path):
continue
# zip bomb protection
if member.size > self.max_entry_size:
raise InstallException('Zip bomb detected in ' + member.name)
if member.islnk() or member.issym():
realpath = os.path.realpath(os.path.join(plugin_path, *os.path.split(member.linkname)))
if not realpath.startswith(plugin_path):
print(f'\t==> WARNING: skipping file containing link outside of the archive: {member.name} -> {member.linkpath}', flush=True)
continue
files_to_extract.append(member)
tar.extractall(os.path.abspath(self.destination), members=files_to_extract, filter='tar')
def download(self, package: str) -> str:
# At this point, package always contains ! since parse_plugin_key resolved it
(image, plugin_path) = package.split('!')
tar_file = self.get_plugin_tar(image)
plugin_directory = os.path.join(self.destination, plugin_path)
if os.path.exists(plugin_directory):
print('\t==> Removing previous plugin directory', plugin_directory, flush=True)
shutil.rmtree(plugin_directory, ignore_errors=True, onerror=None)
self.extract_plugin(tar_file=tar_file, plugin_path=plugin_path)
return plugin_path
def digest(self, package: str) -> str:
# Extract image reference (before the ! if present)
if '!' in package:
(image, _) = package.split('!')
else:
image = package
image_url = image.replace(OCI_PROTOCOL_PREFIX, DOCKER_PROTOCOL_PREFIX)
output = self.skopeo(['inspect', image_url])
data = json.loads(output)
# OCI artifact digest field is defined as "hash method" ":" "hash"
digest = data['Digest'].split(':')[1]
return f"{digest}"
class OciPluginInstaller(PluginInstaller):
"""Handles OCI container-based plugin installation using skopeo."""
def __init__(self, destination: str, skip_integrity_check: bool = False):
super().__init__(destination, skip_integrity_check)
self.downloader = OciDownloader(destination)
def should_skip_installation(self, plugin: dict, plugin_path_by_hash: dict) -> tuple[bool, str]:
"""OCI packages have special digest-based checking for ALWAYS pull policy."""
package = plugin['package']
plugin_hash = plugin['plugin_hash']
pull_policy = plugin.get('pullPolicy', PullPolicy.ALWAYS if ':latest!' in package else PullPolicy.IF_NOT_PRESENT)
if plugin_hash not in plugin_path_by_hash:
return False, "not_installed"
if pull_policy == PullPolicy.IF_NOT_PRESENT:
return True, "already_installed"
if pull_policy == PullPolicy.ALWAYS:
# Check if digest has changed
installed_path = plugin_path_by_hash[plugin_hash]
digest_file_path = os.path.join(self.destination, installed_path, 'dynamic-plugin-image.hash')
local_digest = None
if os.path.isfile(digest_file_path):
with open(digest_file_path, 'r') as f:
local_digest = f.read().strip()
remote_digest = self.downloader.digest(package)
if remote_digest == local_digest:
return True, "digest_unchanged"
return False, "force_download"
def install(self, plugin: dict, plugin_path_by_hash: dict) -> str:
"""Install an OCI plugin package."""
package = plugin['package']
if plugin.get('version') is None:
raise InstallException(f"Tag or Digest is not set for {package}. Please ensure there is at least one plugin configurations contains a valid tag or digest.")
try:
plugin_path = self.downloader.download(package)
# Save digest for future comparison
plugin_directory = os.path.join(self.destination, plugin_path)
os.makedirs(plugin_directory, exist_ok=True) # Ensure directory exists
digest_file_path = os.path.join(plugin_directory, 'dynamic-plugin-image.hash')
with open(digest_file_path, 'w') as f:
f.write(self.downloader.digest(package))
# Clean up duplicate hashes
for key in [k for k, v in plugin_path_by_hash.items() if v == plugin_path]:
plugin_path_by_hash.pop(key)
return plugin_path
except Exception as e:
raise InstallException(f"Error while installing OCI plugin {package}: {e}")
class NpmPluginInstaller(PluginInstaller):
"""Handles NPM and local package installation using npm pack."""
def __init__(self, destination: str, skip_integrity_check: bool = False):
super().__init__(destination, skip_integrity_check)
self.max_entry_size = int(os.environ.get('MAX_ENTRY_SIZE', 20000000))
def install(self, plugin: dict, plugin_path_by_hash: dict) -> str:
"""Install an NPM or local plugin package."""
package = plugin['package']
package_is_local = package.startswith('./')
if package_is_local:
package = os.path.join(os.getcwd(), package[2:])
# Verify integrity requirements
if not package_is_local and not self.skip_integrity_check and 'integrity' not in plugin:
raise InstallException(f"No integrity hash provided for Package {package}")
# Download package
print('\t==> Grabbing package archive through `npm pack`', flush=True)
result = subprocess.run(['npm', 'pack', package], capture_output=True, cwd=self.destination)
if result.returncode != 0:
raise InstallException(f'Error while installing plugin {package} with \'npm pack\' : {result.stderr.decode("utf-8")}')
archive = os.path.join(self.destination, result.stdout.decode('utf-8').strip())
# Verify integrity for remote packages
if not (package_is_local or self.skip_integrity_check):
print('\t==> Verifying package integrity', flush=True)
verify_package_integrity(plugin, archive)
# Extract package
plugin_path = self._extract_npm_package(archive)
return plugin_path
def _extract_npm_package(self, archive: str) -> str:
"""Extract NPM package archive with security protections."""
PACKAGE_DIRECTORY_PREFIX = 'package/'
directory = archive.replace('.tgz', '')
directory_realpath = os.path.realpath(directory)
plugin_path = os.path.basename(directory_realpath)
if os.path.exists(directory):
print('\t==> Removing previous plugin directory', directory, flush=True)
shutil.rmtree(directory, ignore_errors=True)
os.mkdir(directory)
print('\t==> Extracting package archive', archive, flush=True)
with tarfile.open(archive, 'r:*') as tar: # NOSONAR
for member in tar.getmembers():
if member.isreg():
if not member.name.startswith(PACKAGE_DIRECTORY_PREFIX):
raise InstallException(f"NPM package archive does not start with 'package/' as it should: {member.name}")
if member.size > self.max_entry_size:
raise InstallException(f'Zip bomb detected in {member.name}')
member.name = member.name.removeprefix(PACKAGE_DIRECTORY_PREFIX)
tar.extract(member, path=directory, filter='data')
elif member.isdir():
print('\t\tSkipping directory entry', member.name, flush=True)
elif member.islnk() or member.issym():
if not member.linkpath.startswith(PACKAGE_DIRECTORY_PREFIX):
raise InstallException(f'NPM package archive contains a link outside of the archive: {member.name} -> {member.linkpath}')
member.name = member.name.removeprefix(PACKAGE_DIRECTORY_PREFIX)
member.linkpath = member.linkpath.removeprefix(PACKAGE_DIRECTORY_PREFIX)
realpath = os.path.realpath(os.path.join(directory, *os.path.split(member.linkname)))
if not realpath.startswith(directory_realpath):
raise InstallException(f'NPM package archive contains a link outside of the archive: {member.name} -> {member.linkpath}')
tar.extract(member, path=directory, filter='data')
else:
type_mapping = {
tarfile.CHRTYPE: "character device",
tarfile.BLKTYPE: "block device",
tarfile.FIFOTYPE: "FIFO"
}
type_str = type_mapping.get(member.type, "unknown")
raise InstallException(f'NPM package archive contains a non regular file: {member.name} - {type_str}')
print('\t==> Removing package archive', archive, flush=True)
os.remove(archive)
return plugin_path
def create_plugin_installer(package: str, destination: str, skip_integrity_check: bool = False) -> PluginInstaller:
"""Factory function to create appropriate plugin installer based on package type."""
if package.startswith(OCI_PROTOCOL_PREFIX):
return OciPluginInstaller(destination, skip_integrity_check)
else:
return NpmPluginInstaller(destination, skip_integrity_check)
def install_plugin(plugin: dict, plugin_path_by_hash: dict, destination: str, skip_integrity_check: bool = False) -> tuple[str, dict]:
"""Install a single plugin and handle configuration merging."""
package = plugin['package']
# Check if plugin is disabled
if plugin.get('disabled', False):
print(f'\n======= Skipping disabled dynamic plugin {package}', flush=True)
return None, {}
# Create appropriate installer
installer = create_plugin_installer(package, destination, skip_integrity_check)
# Check if installation should be skipped
should_skip, reason = installer.should_skip_installation(plugin, plugin_path_by_hash)
if should_skip:
print(f'\n======= Skipping download of already installed dynamic plugin {package} ({reason})', flush=True)
# Remove from tracking dict so we don't delete it later
if plugin['plugin_hash'] in plugin_path_by_hash:
plugin_path_by_hash.pop(plugin['plugin_hash'])
return None, plugin.get('pluginConfig', {})
# Install the plugin
print(f'\n======= Installing dynamic plugin {package}', flush=True)
plugin_path = installer.install(plugin, plugin_path_by_hash)
# Create hash file for tracking
hash_file_path = os.path.join(destination, plugin_path, 'dynamic-plugin-config.hash')
with open(hash_file_path, 'w') as f:
f.write(plugin['plugin_hash'])
print(f'\t==> Successfully installed dynamic plugin {package}', flush=True)
return plugin_path, plugin.get('pluginConfig', {})
RECOGNIZED_ALGORITHMS = (
'sha512',
'sha384',
'sha256',
)
def get_local_package_info(package_path: str) -> dict:
"""Get package information from a local package to include in hash calculation."""
try:
if package_path.startswith('./'):
abs_package_path = os.path.join(os.getcwd(), package_path[2:])
else:
abs_package_path = package_path
package_json_path = os.path.join(abs_package_path, 'package.json')
if not os.path.isfile(package_json_path):
# If no package.json, fall back to directory modification time
if os.path.isdir(abs_package_path):
mtime = os.path.getmtime(abs_package_path)
return {'_directory_mtime': mtime}
else:
return {'_not_found': True}
with open(package_json_path, 'r') as f:
package_json = json.load(f)
# Extract relevant fields that indicate package changes
info = {}
info['_package_json'] = package_json
# Also include package.json modification time as additional change detection
info['_package_json_mtime'] = os.path.getmtime(package_json_path)
# Include package-lock.json or yarn.lock modification time if present
for lock_file in ['package-lock.json', 'yarn.lock']:
lock_path = os.path.join(abs_package_path, lock_file)
if os.path.isfile(lock_path):
info[f'_{lock_file}_mtime'] = os.path.getmtime(lock_path)
return info
except (json.JSONDecodeError, OSError, IOError) as e:
# If we can't read the package info, include the error in hash
# This ensures we'll try to reinstall if there are permission issues, etc.
return {'_error': str(e)}
def verify_package_integrity(plugin: dict, archive: str) -> None:
package = plugin['package']
if 'integrity' not in plugin:
raise InstallException(f'Package integrity for {package} is missing')
integrity = plugin['integrity']
if not isinstance(integrity, str):
raise InstallException(f'Package integrity for {package} must be a string')
integrity = integrity.split('-')
if len(integrity) != 2:
raise InstallException(f'Package integrity for {package} must be a string of the form <algorithm>-<hash>')
algorithm = integrity[0]
if algorithm not in RECOGNIZED_ALGORITHMS:
raise InstallException(f'{package}: Provided Package integrity algorithm {algorithm} is not supported, please use one of following algorithms {RECOGNIZED_ALGORITHMS} instead')
hash_digest = integrity[1]
try:
base64.b64decode(hash_digest, validate=True)
except binascii.Error:
raise InstallException(f'{package}: Provided Package integrity hash {hash_digest} is not a valid base64 encoding')
cat_process = subprocess.Popen(["cat", archive], stdout=subprocess.PIPE)
openssl_dgst_process = subprocess.Popen(["openssl", "dgst", "-" + algorithm, "-binary"], stdin=cat_process.stdout, stdout=subprocess.PIPE)
openssl_base64_process = subprocess.Popen(["openssl", "base64", "-A"], stdin=openssl_dgst_process.stdout, stdout=subprocess.PIPE)
output, _ = openssl_base64_process.communicate()
if hash_digest != output.decode('utf-8').strip():
raise InstallException(f'{package}: The hash of the downloaded package {output.decode("utf-8").strip()} does not match the provided integrity hash {hash_digest} provided in the configuration file')
# Create the lock file, so that other instances of the script will wait for this one to finish
def create_lock(lock_file_path):
while True:
try:
with open(lock_file_path, 'x'):
print(f"======= Created lock file: {lock_file_path}")
return
except FileExistsError:
wait_for_lock_release(lock_file_path)
# Remove the lock file
def remove_lock(lock_file_path):
os.remove(lock_file_path)
print(f"======= Removed lock file: {lock_file_path}")
# Wait for the lock file to be released
def wait_for_lock_release(lock_file_path):
print(f"======= Waiting for lock release (file: {lock_file_path})...", flush=True)
while True:
if not os.path.exists(lock_file_path):
break
time.sleep(1)
print("======= Lock released.")
# Clean up temporary catalog index directory
def cleanup_catalog_index_temp_dir(dynamic_plugins_root):
"""Clean up temporary catalog index directory."""
catalog_index_temp_dir = os.path.join(dynamic_plugins_root, '.catalog-index-temp')
if os.path.exists(catalog_index_temp_dir):
print('\n======= Cleaning up temporary catalog index directory', flush=True)
shutil.rmtree(catalog_index_temp_dir, ignore_errors=True, onerror=None)
def _extract_catalog_index_layers(manifest: dict, local_dir: str, catalog_index_temp_dir: str) -> None:
"""Extract layers from the catalog index OCI image."""
max_entry_size = int(os.environ.get('MAX_ENTRY_SIZE', 20000000))
for layer in manifest.get('layers', []):
layer_digest = layer.get('digest', '')
if not layer_digest:
continue
(_sha, filename) = layer_digest.split(':')
layer_file = os.path.join(local_dir, filename)
if not os.path.isfile(layer_file):
print(f"\t==> WARNING: Layer file {filename} not found", flush=True)
continue
print(f"\t==> Extracting layer {filename}", flush=True)
_extract_layer_tarball(layer_file, catalog_index_temp_dir, max_entry_size)
def _extract_layer_tarball(layer_file: str, catalog_index_temp_dir: str, max_entry_size: int) -> None:
"""Extract a single layer tarball with security checks."""
with tarfile.open(layer_file, 'r:*') as tar: # NOSONAR
for member in tar.getmembers():
# Security checks
if member.size > max_entry_size:
print(f"\t==> WARNING: Skipping large file {member.name} in catalog index", flush=True)
continue
if member.islnk() or member.issym():
realpath = os.path.realpath(os.path.join(catalog_index_temp_dir, *os.path.split(member.linkname)))
if not realpath.startswith(catalog_index_temp_dir):
print(f"\t==> WARNING: Skipping link outside archive: {member.name}", flush=True)
continue
tar.extract(member, path=catalog_index_temp_dir, filter='data')
def extract_catalog_index(catalog_index_image: str, catalog_index_mount: str, catalog_entities_parent_dir: str) -> str:
"""Extract the catalog index OCI image and return the path to dynamic-plugins.default.yaml if found."""
print(f"\n======= Extracting catalog index from {catalog_index_image}", flush=True)
skopeo_path = shutil.which('skopeo')
if skopeo_path is None:
raise InstallException("CATALOG_INDEX_IMAGE is set but skopeo executable not found in PATH. Cannot extract catalog index.")
catalog_index_temp_dir = os.path.join(catalog_index_mount, '.catalog-index-temp')
os.makedirs(catalog_index_temp_dir, exist_ok=True)
with tempfile.TemporaryDirectory() as tmp_dir:
image_url = catalog_index_image
if not image_url.startswith(DOCKER_PROTOCOL_PREFIX):
image_url = f'{DOCKER_PROTOCOL_PREFIX}{image_url}'
print("\t==> Copying catalog index image to local filesystem", flush=True)
local_dir = os.path.join(tmp_dir, 'catalog-index-oci')
# Download the OCI image using skopeo
result = subprocess.run(
[skopeo_path, 'copy', image_url, f'dir:{local_dir}'],
capture_output=True,
text=True
)
if result.returncode != 0:
raise InstallException(f"Failed to download catalog index image {catalog_index_image}: {result.stderr}")
manifest_path = os.path.join(local_dir, 'manifest.json')
if not os.path.isfile(manifest_path):
raise InstallException(f"manifest.json not found in catalog index image {catalog_index_image}")
with open(manifest_path, 'r') as f:
manifest = json.load(f)
print("\t==> Extracting catalog index layers", flush=True)
_extract_catalog_index_layers(manifest, local_dir, catalog_index_temp_dir)
default_plugins_file = os.path.join(catalog_index_temp_dir, 'dynamic-plugins.default.yaml')
if not os.path.isfile(default_plugins_file):