Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 48 additions & 51 deletions cloudinit/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import logging
import os
import sys
from collections import namedtuple
from contextlib import suppress
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
Expand Down Expand Up @@ -55,6 +54,12 @@
NO_PREVIOUS_INSTANCE_ID = "NO_PREVIOUS_INSTANCE_ID"


class _Semaphore:
def __init__(self, semaphore, args):
self.semaphore = semaphore
self.args = args


COMBINED_CLOUD_CONFIG_DOC = (
"Aggregated cloud-config created by merging merged_system_cfg"
" (/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d), metadata,"
Expand Down Expand Up @@ -318,8 +323,6 @@ def _restore_from_cache(self):
return sources.pkl_load(self.paths.get_ipath_cur("obj_pkl"))

def _write_to_cache(self):
if self.datasource is None:
return False
if util.get_cfg_option_bool(self.cfg, "manual_cache_clean", False):
# The empty file in instance/ dir indicates manual cleaning,
# and can be read by ds-identify.
Expand All @@ -328,9 +331,7 @@ def _write_to_cache(self):
omode="w",
content="",
)
return sources.pkl_store(
self.datasource, self.paths.get_ipath_cur("obj_pkl")
)
return sources.pkl_store(self.ds, self.paths.get_ipath_cur("obj_pkl"))

def _get_datasources(self):
# Any config provided???
Expand Down Expand Up @@ -480,7 +481,7 @@ def _reflect_cur_instance(self):
dp = self.paths.get_cpath("data")

# Write what the datasource was and is..
ds = "%s: %s" % (type_utils.obj_name(self.datasource), self.datasource)
ds = "%s: %s" % (type_utils.obj_name(self.ds), self.ds)
previous_ds = None
ds_fn = os.path.join(idir, "datasource")
try:
Expand All @@ -495,7 +496,7 @@ def _reflect_cur_instance(self):
)

# What the instance id was and is...
iid = self.datasource.get_instance_id()
iid = self.ds.get_instance_id()
iid_fn = os.path.join(dp, "instance-id")

previous_iid = self.previous_iid()
Expand All @@ -518,6 +519,19 @@ def _reflect_cur_instance(self):
self._reset()
return iid

@property
def ds(self) -> sources.DataSource:
"""ds has a reference to the DataSource object

self.datasource can contain a DataSource or None
self.ds is preferred for any code path that is known to require an
initialized DataSource, basically any code besides the initial
bootstrapping of various objects.
"""
if not self.datasource:
raise RuntimeError("Datasource is not initialized.")
return self.datasource

def previous_iid(self):
if self._previous_iid is not None:
return self._previous_iid
Expand All @@ -539,11 +553,10 @@ def is_new_instance(self):
even on first boot.
"""
previous = self.previous_iid()
ret = (
return (
previous == NO_PREVIOUS_INSTANCE_ID
or previous != self.datasource.get_instance_id()
or previous != self.ds.get_instance_id()
)
return ret

def fetch(self, existing="check"):
"""optionally load datasource from cache, otherwise discover
Expand All @@ -557,7 +570,7 @@ def instancify(self):
def cloudify(self):
# Form the needed options to cloudify our members
return cloud.Cloud(
self.datasource,
self.ds,
self.paths,
self.cfg,
self.distro,
Expand All @@ -566,38 +579,28 @@ def cloudify(self):
)

def update(self):
self._store_rawdata(self.datasource.get_userdata_raw(), "userdata")
self._store_processeddata(self.datasource.get_userdata(), "userdata")
self._store_rawdata(self.ds.get_userdata_raw(), "userdata")
self._store_processeddata(self.ds.get_userdata(), "userdata")
self._store_raw_vendordata(self.ds.get_vendordata_raw(), "vendordata")
self._store_processeddata(self.ds.get_vendordata(), "vendordata")
self._store_raw_vendordata(
self.datasource.get_vendordata_raw(), "vendordata"
)
self._store_processeddata(
self.datasource.get_vendordata(), "vendordata"
)
self._store_raw_vendordata(
self.datasource.get_vendordata2_raw(), "vendordata2"
)
self._store_processeddata(
self.datasource.get_vendordata2(), "vendordata2"
self.ds.get_vendordata2_raw(), "vendordata2"
)
self._store_processeddata(self.ds.get_vendordata2(), "vendordata2")

def setup_datasource(self):
with events.ReportEventStack(
"setup-datasource", "setting up datasource", parent=self.reporter
):
if self.datasource is None:
raise RuntimeError("Datasource is None, cannot setup.")
self.datasource.setup(is_new_instance=self.is_new_instance())
self.ds.setup(is_new_instance=self.is_new_instance())

def activate_datasource(self):
with events.ReportEventStack(
"activate-datasource",
"activating datasource",
parent=self.reporter,
):
if self.datasource is None:
raise RuntimeError("Datasource is None, cannot activate.")
self.datasource.activate(
self.ds.activate(
cfg=self.cfg, is_new_instance=self.is_new_instance()
)
self._write_to_cache()
Expand Down Expand Up @@ -634,7 +637,7 @@ def _default_handlers(self, opts=None) -> List[handlers.Handler]:
opts.update(
{
"paths": self.paths,
"datasource": self.datasource,
"datasource": self.ds,
}
)
# TODO(harlowja) Hmmm, should we dynamically import these??
Expand Down Expand Up @@ -883,12 +886,12 @@ def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE):
# vendor provided), and check whether or not we should consume
# vendor data at all. That gives user or system a chance to override.
if vendor_source == "vendordata":
if not self.datasource.get_vendordata_raw():
if not self.ds.get_vendordata_raw():
LOG.debug("no vendordata from datasource")
return
cfg_name = "vendor_data"
elif vendor_source == "vendordata2":
if not self.datasource.get_vendordata2_raw():
if not self.ds.get_vendordata2_raw():
LOG.debug("no vendordata2 from datasource")
return
cfg_name = "vendor_data2"
Expand All @@ -900,7 +903,7 @@ def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE):

_cc_merger = helpers.ConfigMerger(
paths=self._paths,
datasource=self.datasource,
datasource=self.ds,
additional_fns=[],
base_cfg=self.cfg,
include_vendor=False,
Expand Down Expand Up @@ -940,10 +943,10 @@ def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE):
# excluding what the users doesn't want run, i.e. boot_hook,
# cloud_config, shell_script
if vendor_source == "vendordata":
vendor_data_msg = self.datasource.get_vendordata()
vendor_data_msg = self.ds.get_vendordata()
c_handlers_list = self._default_vendordata_handlers()
else:
vendor_data_msg = self.datasource.get_vendordata2()
vendor_data_msg = self.ds.get_vendordata2()
c_handlers_list = self._default_vendordata2_handlers()

# Run the handlers
Expand All @@ -957,7 +960,7 @@ def _consume_userdata(self, frequency=PER_INSTANCE):
"""

# Ensure datasource fetched before activation (just in case)
user_data_msg = self.datasource.get_userdata(True)
user_data_msg = self.ds.get_userdata(True)

# This keeps track of all the active handlers
c_handlers_list = self._default_handlers()
Expand Down Expand Up @@ -990,15 +993,10 @@ def _find_networking_config(
NetworkConfigSource.SYSTEM_CFG: self.cfg.get("network"),
}

if self.datasource and hasattr(self.datasource, "network_config"):
available_cfgs[NetworkConfigSource.DS] = (
self.datasource.network_config
)
if hasattr(self.ds, "network_config"):
available_cfgs[NetworkConfigSource.DS] = self.ds.network_config

if self.datasource:
order = self.datasource.network_config_sources
else:
order = sources.DataSource.network_config_sources
order = self.ds.network_config_sources
for cfg_source in order:
if not isinstance(cfg_source, NetworkConfigSource):
# This won't happen in the cloud-init codebase, but out-of-tree
Expand Down Expand Up @@ -1035,8 +1033,8 @@ def _apply_netcfg_names(self, netcfg):
except Exception as e:
LOG.warning("Failed to rename devices: %s", e)

def _get_per_boot_network_semaphore(self):
return namedtuple("Semaphore", "semaphore args")(
def _get_per_boot_network_semaphore(self) -> _Semaphore:
return _Semaphore(
helpers.FileSemaphores(self.paths.get_runpath("sem")),
("apply_network_config", PER_ONCE),
)
Expand All @@ -1063,11 +1061,11 @@ def apply_network_config(self, bring_up):

def event_enabled_and_metadata_updated(event_type):
return update_event_enabled(
datasource=self.datasource,
datasource=self.ds,
cfg=self.cfg,
event_source_type=event_type,
scope=EventScope.NETWORK,
) and self.datasource.update_metadata_if_supported([event_type])
) and self.ds.update_metadata_if_supported([event_type])

def should_run_on_boot_event():
return (
Expand All @@ -1091,9 +1089,8 @@ def should_run_on_boot_event():

# refresh netcfg after update
netcfg, src = self._find_networking_config()
self._write_network_config_json(netcfg)

if netcfg:
self._write_network_config_json(netcfg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated to typing changes, but I agree we don't want to attempt to persist netcfg if it is empty or None, that'd cause a traceback in JSON marshaling within writE_network_config_json.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated to typing changes

That's not true. Undo this change and mypy spots the error:

cloudinit/stages.py:1092: error: Argument 1 to "_write_network_config_json" of "Init" has incompatible type "dict[Any, Any] | None"; expected "dict[Any, Any]"  [arg-type]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh excellent. Thank you mypy! :)

validate_cloudconfig_schema(
config=netcfg,
schema_type=SchemaType.NETWORK_CONFIG,
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ module = [
"cloudinit.sources.helpers.vmware.imc.config_nic",
"cloudinit.sources.helpers.vultr",
"cloudinit.ssh_util",
"cloudinit.stages",
"cloudinit.temp_utils",
"cloudinit.templater",
"cloudinit.user_data",
Expand Down