Skip to content

Commit aadc150

Browse files
ethanwharrislantiga
authored andcommitted
[App] Content for plugins (#17243)
Co-authored-by: Yurij Mikhalevich <[email protected]> Co-authored-by: Luca Antiga <[email protected]> (cherry picked from commit 2c3dfc0)
1 parent 7827f66 commit aadc150

File tree

5 files changed

+132
-81
lines changed

5 files changed

+132
-81
lines changed

src/lightning/app/plugin/plugin.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ def run_job(self, name: str, app_entrypoint: str, env_vars: Dict[str, str] = {})
6161
"""
6262
from lightning.app.runners.cloud import CloudRuntime
6363

64+
logger.info(f"Processing job run request. name: {name}, app_entrypoint: {app_entrypoint}, env_vars: {env_vars}")
65+
6466
# Dispatch the job
6567
_set_flow_context()
6668

@@ -123,6 +125,8 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:
123125

124126
# Download the tarball
125127
try:
128+
logger.info(f"Downloading plugin source: {run.source_code_url}")
129+
126130
# Sometimes the URL gets encoded, so we parse it here
127131
source_code_url = urlparse(run.source_code_url).geturl()
128132

@@ -141,6 +145,8 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:
141145

142146
# Extract
143147
try:
148+
logger.info("Extracting plugin source.")
149+
144150
with tarfile.open(download_path, "r:gz") as tf:
145151
tf.extractall(source_path)
146152
except Exception as ex:
@@ -151,6 +157,8 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:
151157

152158
# Import the plugin
153159
try:
160+
logger.info(f"Importing plugin: {run.plugin_entrypoint}")
161+
154162
plugin = _load_plugin_from_file(os.path.join(source_path, run.plugin_entrypoint))
155163
except Exception as ex:
156164
raise HTTPException(
@@ -163,6 +171,11 @@ def _run_plugin(run: _Run) -> Dict[str, Any]:
163171

164172
# Setup and run the plugin
165173
try:
174+
logger.info(
175+
"Running plugin. "
176+
f"project_id: {run.project_id}, cloudspace_id: {run.cloudspace_id}, cluster_id: {run.cluster_id}."
177+
)
178+
166179
plugin._setup(
167180
project_id=run.project_id,
168181
cloudspace_id=run.cloudspace_id,

src/lightning/app/runners/cloud.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -218,16 +218,22 @@ def cloudspace_dispatch(
218218
# Dispatch in four phases: resolution, validation, spec creation, API transactions
219219
# Resolution
220220
root = self._resolve_root()
221-
repo = self._resolve_repo(root)
221+
# If the root will already be there, we don't need to upload and preserve the absolute entrypoint
222+
absolute_entrypoint = str(root).startswith("/project")
223+
# If system customization files found, it will set their location path
224+
sys_customizations_root = self._resolve_env_root()
225+
repo = self._resolve_repo(
226+
root,
227+
default_ignore=False,
228+
package_source=not absolute_entrypoint,
229+
sys_customizations_root=sys_customizations_root,
230+
)
222231
project = self._resolve_project(project_id=project_id)
223232
existing_instances = self._resolve_run_instances_by_name(project_id, name)
224233
name = self._resolve_run_name(name, existing_instances)
225234
cloudspace = self._resolve_cloudspace(project_id, cloudspace_id)
226235
queue_server_type = self._resolve_queue_server_type()
227236

228-
# If system customization files found, it will set their location path
229-
sys_customizations_sync_root = self._resolve_env_root()
230-
231237
self.app._update_index_file()
232238

233239
# Validation
@@ -241,17 +247,26 @@ def cloudspace_dispatch(
241247
flow_servers = self._get_flow_servers()
242248
network_configs = self._get_network_configs(flow_servers)
243249
works = self._get_works(cloudspace=cloudspace)
244-
run_body = self._get_run_body(cluster_id, flow_servers, network_configs, works, False, root, True)
250+
run_body = self._get_run_body(
251+
cluster_id,
252+
flow_servers,
253+
network_configs,
254+
works,
255+
False,
256+
root,
257+
True,
258+
True,
259+
absolute_entrypoint,
260+
)
245261
env_vars = self._get_env_vars(self.env_vars, self.secrets, self.run_app_comment_commands)
246262

247-
# If the system customization root is set, prepare files for environment synchronization
248-
if sys_customizations_sync_root is not None:
249-
repo.prepare_sys_customizations_sync(sys_customizations_sync_root)
250-
251263
# API transactions
264+
logger.info(f"Creating cloudspace run. run_body: {run_body}")
252265
run = self._api_create_run(project_id, cloudspace_id, run_body)
266+
253267
self._api_package_and_upload_repo(repo, run)
254268

269+
logger.info(f"Creating cloudspace run instance. name: {name}")
255270
run_instance = self._api_create_run_instance(
256271
cluster_id,
257272
project_id,
@@ -454,6 +469,9 @@ def _resolve_repo(
454469
self,
455470
root: Path,
456471
ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None,
472+
default_ignore: bool = True,
473+
package_source: bool = True,
474+
sys_customizations_root: Optional[Path] = None,
457475
) -> LocalSourceCodeDir:
458476
"""Gather and merge all lightningignores from the app children and create the ``LocalSourceCodeDir``
459477
object."""
@@ -470,7 +488,13 @@ def _resolve_repo(
470488
patterns = _parse_lightningignore(merged)
471489
ignore_functions = [*ignore_functions, partial(_filter_ignored, root, patterns)]
472490

473-
return LocalSourceCodeDir(path=root, ignore_functions=ignore_functions)
491+
return LocalSourceCodeDir(
492+
path=root,
493+
ignore_functions=ignore_functions,
494+
default_ignore=default_ignore,
495+
package_source=package_source,
496+
sys_customizations_root=sys_customizations_root,
497+
)
474498

475499
def _resolve_project(self, project_id: Optional[str] = None) -> V1Membership:
476500
"""Determine the project to run on, choosing a default if multiple projects are found."""
@@ -785,7 +809,7 @@ def _get_works(self, cloudspace: Optional[V1CloudSpace] = None) -> List[V1Work]:
785809
network_config=[V1NetworkConfig(name=random_name, port=work.port)],
786810
data_connection_mounts=data_connection_mounts,
787811
)
788-
works.append(V1Work(name=work.name, spec=work_spec))
812+
works.append(V1Work(name=work.name, display_name=work.display_name, spec=work_spec))
789813

790814
return works
791815

@@ -798,12 +822,18 @@ def _get_run_body(
798822
no_cache: bool,
799823
root: Path,
800824
start_server: bool,
825+
should_mount_cloudspace_content: bool = False,
826+
absolute_entrypoint: bool = False,
801827
) -> CloudspaceIdRunsBody:
802828
"""Get the specification of the run creation request."""
803-
# The entry point file needs to be relative to the root of the uploaded source file directory,
804-
# because the backend will invoke the lightning commands relative said source directory
805-
# TODO: we shouldn't set this if the entrypoint isn't a file but the backend gives an error if we don't
806-
app_entrypoint_file = Path(self.entrypoint).absolute().relative_to(root)
829+
if absolute_entrypoint:
830+
# If the entrypoint will already exist in the cloud then we can choose to keep it as an absolute path.
831+
app_entrypoint_file = Path(self.entrypoint).absolute()
832+
else:
833+
# The entry point file needs to be relative to the root of the uploaded source file directory,
834+
# because the backend will invoke the lightning commands relative said source directory
835+
# TODO: we shouldn't set this if the entrypoint isn't a file but the backend gives an error if we don't
836+
app_entrypoint_file = Path(self.entrypoint).absolute().relative_to(root)
807837

808838
run_body = CloudspaceIdRunsBody(
809839
cluster_id=cluster_id,
@@ -813,6 +843,7 @@ def _get_run_body(
813843
network_config=network_configs,
814844
works=works,
815845
local_source=True,
846+
should_mount_cloudspace_content=should_mount_cloudspace_content,
816847
)
817848

818849
if self.app is not None:
@@ -827,9 +858,10 @@ def _get_run_body(
827858
# if requirements file at the root of the repository is present,
828859
# we pass just the file name to the backend, so backend can find it in the relative path
829860
requirements_file = root / "requirements.txt"
830-
if requirements_file.is_file():
861+
if requirements_file.is_file() and requirements_file.exists():
862+
requirements_path = requirements_file if absolute_entrypoint else "requirements.txt"
831863
run_body.image_spec = Gridv1ImageSpec(
832-
dependency_file_info=V1DependencyFileInfo(package_manager=V1PackageManager.PIP, path="requirements.txt")
864+
dependency_file_info=V1DependencyFileInfo(package_manager=V1PackageManager.PIP, path=requirements_path)
833865
)
834866
if not DISABLE_DEPENDENCY_CACHE and not no_cache:
835867
# hash used for caching the dependencies
@@ -997,7 +1029,10 @@ def _api_create_run_instance(
9971029
)
9981030

9991031
@staticmethod
1000-
def _api_package_and_upload_repo(repo: LocalSourceCodeDir, run: V1LightningRun) -> None:
1032+
def _api_package_and_upload_repo(
1033+
repo: LocalSourceCodeDir,
1034+
run: V1LightningRun,
1035+
) -> None:
10011036
"""Package and upload the provided local source code directory to the provided run."""
10021037
if run.source_upload_url == "":
10031038
raise RuntimeError("The source upload url is empty.")

src/lightning/app/source_code/local.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
# limitations under the License.
1414

1515
import os
16+
import uuid
1617
from contextlib import contextmanager
1718
from pathlib import Path
1819
from shutil import copytree, rmtree
1920
from typing import List, Optional
2021

2122
from lightning.app.core.constants import DOT_IGNORE_FILENAME, SYS_CUSTOMIZATIONS_SYNC_PATH
2223
from lightning.app.source_code.copytree import _copytree, _IGNORE_FUNCTION
23-
from lightning.app.source_code.hashing import _get_hash
2424
from lightning.app.source_code.tar import _tar_path
2525
from lightning.app.source_code.uploader import FileUploader
2626

@@ -30,20 +30,35 @@ class LocalSourceCodeDir:
3030

3131
cache_location: Path = Path.home() / ".lightning" / "cache" / "repositories"
3232

33-
def __init__(self, path: Path, ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None) -> None:
33+
def __init__(
34+
self,
35+
path: Path,
36+
ignore_functions: Optional[List[_IGNORE_FUNCTION]] = None,
37+
default_ignore: bool = True,
38+
package_source: bool = True,
39+
sys_customizations_root: Optional[Path] = None,
40+
) -> None:
41+
if "LIGHTNING_VSCODE_WORKSPACE" in os.environ:
42+
# Don't use home to store the tar ball. This won't play nice with symlinks
43+
self.cache_location: Path = Path("/tmp", ".lightning", "cache", "repositories") # noqa: S108
44+
else:
45+
self.cache_location: Path = Path.home() / ".lightning" / "cache" / "repositories"
46+
3447
self.path = path
3548
self.ignore_functions = ignore_functions
49+
self.package_source = package_source
50+
self.sys_customizations_root = sys_customizations_root
3651

37-
# cache checksum version
52+
# cache version
3853
self._version: Optional[str] = None
3954
self._non_ignored_files: Optional[List[str]] = None
4055

4156
# create global cache location if it doesn't exist
4257
if not self.cache_location.exists():
4358
self.cache_location.mkdir(parents=True, exist_ok=True)
4459

45-
# Create a default dotignore if it doesn't exist
46-
if not (path / DOT_IGNORE_FILENAME).is_file():
60+
# Create a default dotignore if requested and it doesn't exist
61+
if default_ignore and not (path / DOT_IGNORE_FILENAME).is_file():
4762
with open(path / DOT_IGNORE_FILENAME, "w") as f:
4863
f.write("venv/\n")
4964
if (path / "bin" / "activate").is_file() or (path / "pyvenv.cfg").is_file():
@@ -57,7 +72,10 @@ def __init__(self, path: Path, ignore_functions: Optional[List[_IGNORE_FUNCTION]
5772
def files(self) -> List[str]:
5873
"""Returns a set of files that are not ignored by .lightningignore."""
5974
if self._non_ignored_files is None:
60-
self._non_ignored_files = _copytree(self.path, "", ignore_functions=self.ignore_functions, dry_run=True)
75+
if self.package_source:
76+
self._non_ignored_files = _copytree(self.path, "", ignore_functions=self.ignore_functions, dry_run=True)
77+
else:
78+
self._non_ignored_files = []
6179
return self._non_ignored_files
6280

6381
@property
@@ -67,8 +85,8 @@ def version(self):
6785
if self._version is not None:
6886
return self._version
6987

70-
# stores both version and a set with the files used to generate the checksum
71-
self._version = _get_hash(files=self.files, algorithm="blake2")
88+
# create a random version ID and store it
89+
self._version = uuid.uuid4().hex
7290
return self._version
7391

7492
@property
@@ -83,7 +101,11 @@ def packaging_session(self) -> Path:
83101
session_path = self.cache_location / "packaging_sessions" / self.version
84102
try:
85103
rmtree(session_path, ignore_errors=True)
86-
_copytree(self.path, session_path, ignore_functions=self.ignore_functions)
104+
if self.package_source:
105+
_copytree(self.path, session_path, ignore_functions=self.ignore_functions)
106+
if self.sys_customizations_root is not None:
107+
path_to_sync = Path(session_path, SYS_CUSTOMIZATIONS_SYNC_PATH)
108+
copytree(self.sys_customizations_root, path_to_sync, dirs_exist_ok=True)
87109
yield session_path
88110
finally:
89111
rmtree(session_path, ignore_errors=True)
@@ -104,12 +126,6 @@ def package(self) -> Path:
104126
_tar_path(source_path=session_path, target_file=str(self.package_path), compression=True)
105127
return self.package_path
106128

107-
def prepare_sys_customizations_sync(self, sys_customizations_root: Path) -> None:
108-
"""Prepares files for system environment customization setup by copying conda and system environment files
109-
to an app files directory."""
110-
path_to_sync = Path(self.path, SYS_CUSTOMIZATIONS_SYNC_PATH)
111-
copytree(sys_customizations_root, path_to_sync, dirs_exist_ok=True)
112-
113129
def upload(self, url: str) -> None:
114130
"""Uploads package to URL, usually pre-signed URL.
115131

0 commit comments

Comments
 (0)