diff --git a/cloudinit/stages.py b/cloudinit/stages.py index 50fbd8e463c..9f1ed6b9cb5 100644 --- a/cloudinit/stages.py +++ b/cloudinit/stages.py @@ -9,7 +9,6 @@ import logging import os import sys -from collections import namedtuple from contextlib import suppress from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union @@ -55,6 +54,12 @@ NO_PREVIOUS_INSTANCE_ID = "NO_PREVIOUS_INSTANCE_ID" +class _Semaphore: + def __init__(self, semaphore, args): + self.semaphore = semaphore + self.args = args + + COMBINED_CLOUD_CONFIG_DOC = ( "Aggregated cloud-config created by merging merged_system_cfg" " (/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d), metadata," @@ -318,8 +323,6 @@ def _restore_from_cache(self): return sources.pkl_load(self.paths.get_ipath_cur("obj_pkl")) def _write_to_cache(self): - if self.datasource is None: - return False if util.get_cfg_option_bool(self.cfg, "manual_cache_clean", False): # The empty file in instance/ dir indicates manual cleaning, # and can be read by ds-identify. @@ -328,9 +331,7 @@ def _write_to_cache(self): omode="w", content="", ) - return sources.pkl_store( - self.datasource, self.paths.get_ipath_cur("obj_pkl") - ) + return sources.pkl_store(self.ds, self.paths.get_ipath_cur("obj_pkl")) def _get_datasources(self): # Any config provided??? @@ -480,7 +481,7 @@ def _reflect_cur_instance(self): dp = self.paths.get_cpath("data") # Write what the datasource was and is.. - ds = "%s: %s" % (type_utils.obj_name(self.datasource), self.datasource) + ds = "%s: %s" % (type_utils.obj_name(self.ds), self.ds) previous_ds = None ds_fn = os.path.join(idir, "datasource") try: @@ -495,7 +496,7 @@ def _reflect_cur_instance(self): ) # What the instance id was and is... - iid = self.datasource.get_instance_id() + iid = self.ds.get_instance_id() iid_fn = os.path.join(dp, "instance-id") previous_iid = self.previous_iid() @@ -518,6 +519,19 @@ def _reflect_cur_instance(self): self._reset() return iid + @property + def ds(self) -> sources.DataSource: + """ds has a reference to the DataSource object + + self.datasource can contain a DataSource or None + self.ds is preferred for any code path that is known to require an + initialized DataSource, basically any code besides the initial + bootstrapping of various objects. + """ + if not self.datasource: + raise RuntimeError("Datasource is not initialized.") + return self.datasource + def previous_iid(self): if self._previous_iid is not None: return self._previous_iid @@ -539,11 +553,10 @@ def is_new_instance(self): even on first boot. """ previous = self.previous_iid() - ret = ( + return ( previous == NO_PREVIOUS_INSTANCE_ID - or previous != self.datasource.get_instance_id() + or previous != self.ds.get_instance_id() ) - return ret def fetch(self, existing="check"): """optionally load datasource from cache, otherwise discover @@ -557,7 +570,7 @@ def instancify(self): def cloudify(self): # Form the needed options to cloudify our members return cloud.Cloud( - self.datasource, + self.ds, self.paths, self.cfg, self.distro, @@ -566,28 +579,20 @@ def cloudify(self): ) def update(self): - self._store_rawdata(self.datasource.get_userdata_raw(), "userdata") - self._store_processeddata(self.datasource.get_userdata(), "userdata") + self._store_rawdata(self.ds.get_userdata_raw(), "userdata") + self._store_processeddata(self.ds.get_userdata(), "userdata") + self._store_raw_vendordata(self.ds.get_vendordata_raw(), "vendordata") + self._store_processeddata(self.ds.get_vendordata(), "vendordata") self._store_raw_vendordata( - self.datasource.get_vendordata_raw(), "vendordata" - ) - self._store_processeddata( - self.datasource.get_vendordata(), "vendordata" - ) - self._store_raw_vendordata( - self.datasource.get_vendordata2_raw(), "vendordata2" - ) - self._store_processeddata( - self.datasource.get_vendordata2(), "vendordata2" + self.ds.get_vendordata2_raw(), "vendordata2" ) + self._store_processeddata(self.ds.get_vendordata2(), "vendordata2") def setup_datasource(self): with events.ReportEventStack( "setup-datasource", "setting up datasource", parent=self.reporter ): - if self.datasource is None: - raise RuntimeError("Datasource is None, cannot setup.") - self.datasource.setup(is_new_instance=self.is_new_instance()) + self.ds.setup(is_new_instance=self.is_new_instance()) def activate_datasource(self): with events.ReportEventStack( @@ -595,9 +600,7 @@ def activate_datasource(self): "activating datasource", parent=self.reporter, ): - if self.datasource is None: - raise RuntimeError("Datasource is None, cannot activate.") - self.datasource.activate( + self.ds.activate( cfg=self.cfg, is_new_instance=self.is_new_instance() ) self._write_to_cache() @@ -634,7 +637,7 @@ def _default_handlers(self, opts=None) -> List[handlers.Handler]: opts.update( { "paths": self.paths, - "datasource": self.datasource, + "datasource": self.ds, } ) # TODO(harlowja) Hmmm, should we dynamically import these?? @@ -883,12 +886,12 @@ def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE): # vendor provided), and check whether or not we should consume # vendor data at all. That gives user or system a chance to override. if vendor_source == "vendordata": - if not self.datasource.get_vendordata_raw(): + if not self.ds.get_vendordata_raw(): LOG.debug("no vendordata from datasource") return cfg_name = "vendor_data" elif vendor_source == "vendordata2": - if not self.datasource.get_vendordata2_raw(): + if not self.ds.get_vendordata2_raw(): LOG.debug("no vendordata2 from datasource") return cfg_name = "vendor_data2" @@ -900,7 +903,7 @@ def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE): _cc_merger = helpers.ConfigMerger( paths=self._paths, - datasource=self.datasource, + datasource=self.ds, additional_fns=[], base_cfg=self.cfg, include_vendor=False, @@ -940,10 +943,10 @@ def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE): # excluding what the users doesn't want run, i.e. boot_hook, # cloud_config, shell_script if vendor_source == "vendordata": - vendor_data_msg = self.datasource.get_vendordata() + vendor_data_msg = self.ds.get_vendordata() c_handlers_list = self._default_vendordata_handlers() else: - vendor_data_msg = self.datasource.get_vendordata2() + vendor_data_msg = self.ds.get_vendordata2() c_handlers_list = self._default_vendordata2_handlers() # Run the handlers @@ -957,7 +960,7 @@ def _consume_userdata(self, frequency=PER_INSTANCE): """ # Ensure datasource fetched before activation (just in case) - user_data_msg = self.datasource.get_userdata(True) + user_data_msg = self.ds.get_userdata(True) # This keeps track of all the active handlers c_handlers_list = self._default_handlers() @@ -990,15 +993,10 @@ def _find_networking_config( NetworkConfigSource.SYSTEM_CFG: self.cfg.get("network"), } - if self.datasource and hasattr(self.datasource, "network_config"): - available_cfgs[NetworkConfigSource.DS] = ( - self.datasource.network_config - ) + if hasattr(self.ds, "network_config"): + available_cfgs[NetworkConfigSource.DS] = self.ds.network_config - if self.datasource: - order = self.datasource.network_config_sources - else: - order = sources.DataSource.network_config_sources + order = self.ds.network_config_sources for cfg_source in order: if not isinstance(cfg_source, NetworkConfigSource): # This won't happen in the cloud-init codebase, but out-of-tree @@ -1035,8 +1033,8 @@ def _apply_netcfg_names(self, netcfg): except Exception as e: LOG.warning("Failed to rename devices: %s", e) - def _get_per_boot_network_semaphore(self): - return namedtuple("Semaphore", "semaphore args")( + def _get_per_boot_network_semaphore(self) -> _Semaphore: + return _Semaphore( helpers.FileSemaphores(self.paths.get_runpath("sem")), ("apply_network_config", PER_ONCE), ) @@ -1063,11 +1061,11 @@ def apply_network_config(self, bring_up): def event_enabled_and_metadata_updated(event_type): return update_event_enabled( - datasource=self.datasource, + datasource=self.ds, cfg=self.cfg, event_source_type=event_type, scope=EventScope.NETWORK, - ) and self.datasource.update_metadata_if_supported([event_type]) + ) and self.ds.update_metadata_if_supported([event_type]) def should_run_on_boot_event(): return ( @@ -1091,9 +1089,8 @@ def should_run_on_boot_event(): # refresh netcfg after update netcfg, src = self._find_networking_config() - self._write_network_config_json(netcfg) - if netcfg: + self._write_network_config_json(netcfg) validate_cloudconfig_schema( config=netcfg, schema_type=SchemaType.NETWORK_CONFIG, diff --git a/pyproject.toml b/pyproject.toml index f625ae0883b..df492b8ed0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,7 +93,6 @@ module = [ "cloudinit.sources.helpers.vmware.imc.config_nic", "cloudinit.sources.helpers.vultr", "cloudinit.ssh_util", - "cloudinit.stages", "cloudinit.temp_utils", "cloudinit.templater", "cloudinit.user_data",