Skip to content

Commit a9f79d2

Browse files
authored
Dbconnect progress (#1355)
## Changes Show progress bar for spark tasks using DB Connect progress API. * Visual progress in notebooks * Text-based progress when running Python files with DB Connect * Setting to toggle progress bars ![Screenshot 2024-09-13 at 17 46 15](https://github.com/user-attachments/assets/a20fc282-8ea3-47e7-b028-6d58b157b200) ![Screenshot 2024-09-17 at 10 25 31](https://github.com/user-attachments/assets/cf91e188-34f5-456f-a7c1-b098592d1492) ![Screenshot 2024-09-17 at 10 26 28](https://github.com/user-attachments/assets/eb502d26-9061-4698-837d-87be33264364) ## Tests Manually
1 parent d7cbb50 commit a9f79d2

File tree

8 files changed

+172
-11
lines changed

8 files changed

+172
-11
lines changed

databricks-vscode.code-workspace

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
},
2323
"[typescript]": {
2424
"editor.defaultFormatter": "esbenp.prettier-vscode"
25-
}
25+
},
26+
"jupyter.interactiveWindow.cellMarker.codeRegex": "^# COMMAND ----------|^# Databricks notebook source|^(#\\s*%%|#\\s*\\<codecell\\>|#\\s*In\\[\\d*?\\]|#\\s*In\\[ \\])",
27+
"jupyter.interactiveWindow.cellMarker.default": "# COMMAND ----------"
2628
}
2729
}

packages/databricks-vscode/package.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,6 @@
856856
"views.workspace"
857857
],
858858
"enumDescriptions": [
859-
"Limited local notebook support using DB Connect v2.",
860859
"Show cluster view in the explorer.",
861860
"Show workspace browser in the explorer."
862861
],
@@ -870,6 +869,11 @@
870869
"default": true,
871870
"description": "Enable/disable rearranging cells in wrapper files created when using `workspace` as the sync destination. **Note:** It is recommended to NOT disable this setting. If you do disable it, you will need to manually handle sys.path for local imports in your notebooks."
872871
},
872+
"databricks.connect.progress": {
873+
"type": "boolean",
874+
"default": true,
875+
"description": "Show PySpark progress bar when using Databricks Connect."
876+
},
873877
"databricks.ipythonDir": {
874878
"type": "string",
875879
"description": "Absolute path to a directory for storing IPython files. Defaults to IPYTHONDIR environment variable (if set) or ~/.ipython."
@@ -1008,4 +1012,4 @@
10081012
],
10091013
"report-dir": "coverage"
10101014
}
1011-
}
1015+
}

packages/databricks-vscode/resources/python/00-databricks-init.py

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
from typing import Any, Union, List
55
import os
6+
import time
67
import shlex
78
import warnings
89
import tempfile
@@ -24,6 +25,7 @@ def logError(function_name: str, e: Union[str, Exception]):
2425

2526
try:
2627
from IPython import get_ipython
28+
from IPython.display import display
2729
from IPython.core.magic import magics_class, Magics, line_magic, needs_local_scope
2830
except Exception as e:
2931
logError("Ipython Imports", e)
@@ -101,7 +103,13 @@ def __init__(self, env_name: str, default: any = None, required: bool = False):
101103

102104
def __get__(self, instance, owner):
103105
if self.env_name in os.environ:
104-
return self.transform(os.environ[self.env_name])
106+
if self.transform is not bool:
107+
return self.transform(os.environ[self.env_name])
108+
109+
if os.environ[self.env_name].lower() == "true" or os.environ[self.env_name] == "1":
110+
return True
111+
elif os.environ[self.env_name].lower() == "false" or os.environ[self.env_name] == "0":
112+
return False
105113

106114
if self.required:
107115
raise AttributeError(
@@ -117,6 +125,7 @@ def __set__(self, instance, value):
117125
class LocalDatabricksNotebookConfig:
118126
project_root: str = EnvLoader("DATABRICKS_PROJECT_ROOT", required=True)
119127
dataframe_display_limit: int = EnvLoader("DATABRICKS_DF_DISPLAY_LIMIT", 20)
128+
show_progress: bool = EnvLoader("SPARK_CONNECT_PROGRESS_BAR_ENABLED", default=False)
120129

121130
def __new__(cls):
122131
annotations = cls.__dict__['__annotations__']
@@ -357,6 +366,99 @@ def df_html(df):
357366
html_formatter.for_type(SparkConnectDataframe, df_html)
358367
html_formatter.for_type(DataFrame, df_html)
359368

369+
@logErrorAndContinue
370+
@disposable
371+
def register_spark_progress(spark, show_progress: bool):
372+
try:
373+
import ipywidgets as widgets
374+
except Exception as e:
375+
return
376+
377+
class Progress:
378+
SI_BYTE_SIZES = (1 << 60, 1 << 50, 1 << 40, 1 << 30, 1 << 20, 1 << 10, 1)
379+
SI_BYTE_SUFFIXES = ("EiB", "PiB", "TiB", "GiB", "MiB", "KiB", "B")
380+
381+
def __init__(
382+
self
383+
) -> None:
384+
self._ticks = None
385+
self._tick = None
386+
self._started = time.time()
387+
self._bytes_read = 0
388+
self._running = 0
389+
self.init_ui()
390+
391+
def init_ui(self):
392+
self.w_progress = widgets.IntProgress(
393+
value=0,
394+
min=0,
395+
max=100,
396+
bar_style='success',
397+
orientation='horizontal'
398+
)
399+
self.w_status = widgets.Label(value="")
400+
if show_progress:
401+
display(widgets.HBox([self.w_progress, self.w_status]))
402+
403+
def update_ticks(
404+
self,
405+
stages,
406+
inflight_tasks: int
407+
) -> None:
408+
total_tasks = sum(map(lambda x: x.num_tasks, stages))
409+
completed_tasks = sum(map(lambda x: x.num_completed_tasks, stages))
410+
if total_tasks > 0:
411+
self._ticks = total_tasks
412+
self._tick = completed_tasks
413+
self._bytes_read = sum(map(lambda x: x.num_bytes_read, stages))
414+
if self._tick is not None and self._tick >= 0:
415+
self.output()
416+
self._running = inflight_tasks
417+
418+
def output(self) -> None:
419+
if self._tick is not None and self._ticks is not None:
420+
percent_complete = (self._tick / self._ticks) * 100
421+
elapsed = int(time.time() - self._started)
422+
scanned = self._bytes_to_string(self._bytes_read)
423+
running = self._running
424+
self.w_progress.value = percent_complete
425+
self.w_status.value = f"{percent_complete:.2f}% Complete ({running} Tasks running, {elapsed}s, Scanned {scanned})"
426+
427+
@staticmethod
428+
def _bytes_to_string(size: int) -> str:
429+
"""Helper method to convert a numeric bytes value into a human-readable representation"""
430+
i = 0
431+
while i < len(Progress.SI_BYTE_SIZES) - 1 and size < 2 * Progress.SI_BYTE_SIZES[i]:
432+
i += 1
433+
result = float(size) / Progress.SI_BYTE_SIZES[i]
434+
return f"{result:.1f} {Progress.SI_BYTE_SUFFIXES[i]}"
435+
436+
437+
class ProgressHandler:
438+
def __init__(self):
439+
self.op_id = ""
440+
441+
def reset(self):
442+
self.p = Progress()
443+
444+
def __call__(self,
445+
stages,
446+
inflight_tasks: int,
447+
operation_id,
448+
done: bool
449+
):
450+
if len(stages) == 0:
451+
return
452+
453+
if self.op_id != operation_id:
454+
self.op_id = operation_id
455+
self.reset()
456+
457+
self.p.update_ticks(stages, inflight_tasks)
458+
459+
spark.clearProgressHandlers()
460+
spark.registerProgressHandler(ProgressHandler())
461+
360462

361463
@logErrorAndContinue
362464
@disposable
@@ -382,9 +484,16 @@ def make_matplotlib_inline():
382484
if not load_env_from_leaf(os.getcwd()):
383485
sys.exit(1)
384486
cfg = LocalDatabricksNotebookConfig()
487+
488+
# disable build-in progress bar
489+
show_progress = cfg.show_progress
490+
if "SPARK_CONNECT_PROGRESS_BAR_ENABLED" in os.environ:
491+
del os.environ["SPARK_CONNECT_PROGRESS_BAR_ENABLED"]
492+
385493
create_and_register_databricks_globals()
386494
register_magics(cfg)
387495
register_formatters(cfg)
496+
register_spark_progress(globals()["spark"], show_progress)
388497
update_sys_path(cfg)
389498
make_matplotlib_inline()
390499

packages/databricks-vscode/src/file-managers/DatabricksEnvFileManager.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export class DatabricksEnvFileManager implements Disposable {
1919
private userEnvFileWatcherDisposables: Disposable[] = [];
2020
private mutex = new Mutex();
2121
private userEnvPath?: Uri;
22+
private showDatabricksConnectProgess = true;
2223

2324
get databricksEnvPath() {
2425
return Uri.joinPath(
@@ -44,7 +45,23 @@ export class DatabricksEnvFileManager implements Disposable {
4445
private readonly featureManager: FeatureManager,
4546
private readonly connectionManager: ConnectionManager,
4647
private readonly configModel: ConfigModel
47-
) {}
48+
) {
49+
this.showDatabricksConnectProgess =
50+
workspaceConfigs.showDatabricksConnectProgress;
51+
}
52+
53+
private async updateShowDatabricksConnectProgessWatcher() {
54+
if (
55+
this.showDatabricksConnectProgess ===
56+
workspaceConfigs.showDatabricksConnectProgress
57+
) {
58+
return;
59+
}
60+
61+
this.showDatabricksConnectProgess =
62+
workspaceConfigs.showDatabricksConnectProgress;
63+
await this.writeFile();
64+
}
4865

4966
private updateUserEnvFileWatcher() {
5067
const userEnvPath = workspaceConfigs.msPythonEnvFile
@@ -111,6 +128,11 @@ export class DatabricksEnvFileManager implements Disposable {
111128
this,
112129
this.disposables
113130
),
131+
workspace.onDidChangeConfiguration(
132+
this.updateShowDatabricksConnectProgessWatcher,
133+
this,
134+
this.disposables
135+
),
114136
this.featureManager.onDidChangeState(
115137
"environment.dependencies",
116138
() => {
@@ -160,7 +182,8 @@ export class DatabricksEnvFileManager implements Disposable {
160182
...(this.getDatabrickseEnvVars() || {}),
161183
...((await EnvVarGenerators.getDbConnectEnvVars(
162184
this.connectionManager,
163-
this.workspacePath
185+
this.workspacePath,
186+
this.showDatabricksConnectProgess
164187
)) || {}),
165188
...this.getIdeEnvVars(),
166189
...((await this.getUserEnvVars()) || {}),

packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/JobRunStateUtils.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ export function getSimplifiedRunState(run?: Run): SimplifiedRunState {
4343
}
4444
return "Terminated";
4545
}
46+
47+
return "Unknown";
4648
}

packages/databricks-vscode/src/utils/envVarGenerators.test.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,14 @@ describe(__filename, () => {
103103

104104
const actual = await getDbConnectEnvVars(
105105
instance(mockConnectionManager),
106-
mockWorkspacePath
106+
mockWorkspacePath,
107+
true
107108
);
108109

109110
assert.deepEqual(actual, {
110111
SPARK_CONNECT_USER_AGENT: "test/0.0.1",
111112
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
113+
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
112114
});
113115
});
114116

@@ -118,12 +120,14 @@ describe(__filename, () => {
118120

119121
const actual = await getDbConnectEnvVars(
120122
instance(mockConnectionManager),
121-
mockWorkspacePath
123+
mockWorkspacePath,
124+
true
122125
);
123126

124127
assert.deepEqual(actual, {
125128
SPARK_CONNECT_USER_AGENT: "existing test/0.0.1",
126129
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
130+
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
127131
});
128132
});
129133

@@ -139,12 +143,14 @@ describe(__filename, () => {
139143

140144
const actual = await getDbConnectEnvVars(
141145
instance(mockConnectionManager),
142-
mockWorkspacePath
146+
mockWorkspacePath,
147+
true
143148
);
144149

145150
assert.deepEqual(actual, {
146151
SPARK_CONNECT_USER_AGENT: "test/0.0.1",
147152
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
153+
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
148154
SPARK_REMOTE: `sc://${
149155
Uri.parse(mockHost).authority
150156
}:443/;token=token;use_ssl=true;x-databricks-cluster-id=${mockClusterId}`,
@@ -163,12 +169,14 @@ describe(__filename, () => {
163169

164170
const actual = await getDbConnectEnvVars(
165171
instance(mockConnectionManager),
166-
mockWorkspacePath
172+
mockWorkspacePath,
173+
true
167174
);
168175

169176
assert.deepEqual(actual, {
170177
SPARK_CONNECT_USER_AGENT: "test/0.0.1",
171178
DATABRICKS_PROJECT_ROOT: mockWorkspacePath.fsPath,
179+
SPARK_CONNECT_PROGRESS_BAR_ENABLED: "1",
172180
});
173181
});
174182
});

packages/databricks-vscode/src/utils/envVarGenerators.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,20 @@ async function getSparkRemoteEnvVar(connectionManager: ConnectionManager) {
111111

112112
export async function getDbConnectEnvVars(
113113
connectionManager: ConnectionManager,
114-
workspacePath: Uri
114+
workspacePath: Uri,
115+
showDatabricksConnectProgess: boolean
115116
) {
116117
const userAgent = getUserAgent(connectionManager);
117118
const existingSparkUa = process.env.SPARK_CONNECT_USER_AGENT ?? "";
119+
118120
/* eslint-disable @typescript-eslint/naming-convention */
119121
return {
120122
//We append our user agent to any existing SPARK_CONNECT_USER_AGENT defined in the
121123
//environment of the parent process of VS Code.
122124
SPARK_CONNECT_USER_AGENT: [existingSparkUa, userAgent].join(" ").trim(),
125+
SPARK_CONNECT_PROGRESS_BAR_ENABLED: showDatabricksConnectProgess
126+
? "1"
127+
: "0",
123128
DATABRICKS_PROJECT_ROOT: workspacePath.fsPath,
124129
...((await getSparkRemoteEnvVar(connectionManager)) || {}),
125130
};

packages/databricks-vscode/src/vscode-objs/WorkspaceConfigs.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ export const workspaceConfigs = {
105105
);
106106
},
107107

108+
get showDatabricksConnectProgress(): boolean {
109+
return (
110+
workspace
111+
.getConfiguration("databricks")
112+
.get<boolean>("connect.progress") ?? true
113+
);
114+
},
115+
108116
get ipythonDir(): string | undefined {
109117
const dir = workspace
110118
.getConfiguration("databricks")

0 commit comments

Comments
 (0)