Skip to content

Commit 0116d3a

Browse files
Bring back sync functionality (#1278)
## Changes * Bring back sync functionality. Users can choose to manually start sync from the UI and not rely on deploy before file runs. This avoids breaking some existing user flows. * The files are still run only after a deploy. * There is possibility of race conditions between sync and deploy but the possibility is very small. Also, syncing is blocked for prod and staging targets. ## Tests <!-- How is this tested? -->
1 parent e68f2bf commit 0116d3a

File tree

16 files changed

+1192
-87
lines changed

16 files changed

+1192
-87
lines changed

packages/databricks-vscode/package.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,27 @@
322322
"title": "Force destroy bundle",
323323
"category": "Databricks",
324324
"enablement": "databricks.context.activated && databricks.context.bundle.isTargetSet && databricks.context.bundle.deploymentState == idle"
325+
},
326+
{
327+
"command": "databricks.sync.start",
328+
"title": "Start synchronization",
329+
"category": "Databricks",
330+
"enablement": "databricks.context.activated && databricks.context.loggedIn && databricks.context.bundle.isTargetSet && databricks.context.bundle.isDevTarget",
331+
"icon": "$(sync)"
332+
},
333+
{
334+
"command": "databricks.sync.startFull",
335+
"title": "Start synchronization (full sync)",
336+
"category": "Databricks",
337+
"enablement": "databricks.context.activated && databricks.context.loggedIn && databricks.context.bundle.isTargetSet && databricks.context.bundle.isDevTarget",
338+
"icon": "$(sync)"
339+
},
340+
{
341+
"command": "databricks.sync.stop",
342+
"title": "Stop synchronization",
343+
"category": "Databricks",
344+
"enablement": "databricks.context.activated && databricks.context.loggedIn && databricks.context.bundle.isTargetSet && databricks.context.bundle.isDevTarget",
345+
"icon": "$(sync-ignored)"
325346
}
326347
],
327348
"viewsContainers": {
@@ -538,6 +559,16 @@
538559
"when": "view == configurationView && viewItem =~ /^databricks.environment.checkEnvironmentDependencies.success$/",
539560
"group": "inline@0",
540561
"icon": "$(gear)"
562+
},
563+
{
564+
"command": "databricks.sync.start",
565+
"when": "view == configurationView && viewItem =~ /^databricks.*sync.*is-stopped.*$/ && databricks.context.bundle.isDevTarget",
566+
"group": "inline@0"
567+
},
568+
{
569+
"command": "databricks.sync.stop",
570+
"when": "view == configurationView && viewItem =~ /^databricks.*sync.*is-running.*$/ && databricks.context.bundle.isDevTarget",
571+
"group": "inline@0"
541572
}
542573
],
543574
"editor/title": [
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import {logging} from "@databricks/databricks-sdk";
2+
import {EventEmitter} from "vscode";
3+
import {Loggers} from "../logger";
4+
import {SyncState} from "../sync";
5+
6+
type EventBase = {
7+
timestamp: string;
8+
seq: number;
9+
type: string;
10+
};
11+
12+
type EventChanges = {
13+
put: Array<string>;
14+
delete: Array<string>;
15+
};
16+
17+
type EventStart = EventBase &
18+
EventChanges & {
19+
type: "start";
20+
};
21+
22+
type EventComplete = EventBase &
23+
EventChanges & {
24+
type: "complete";
25+
};
26+
27+
type EventProgress = EventBase & {
28+
type: "progress";
29+
30+
action: "put" | "delete";
31+
path: string;
32+
progress: number;
33+
};
34+
35+
type Event = EventStart | EventComplete | EventProgress;
36+
37+
export class DatabricksCliSyncParser {
38+
private state: SyncState = "STOPPED";
39+
40+
constructor(
41+
private syncStateCallback: (state: SyncState, reason?: string) => void,
42+
private writeEmitter: EventEmitter<string>
43+
) {}
44+
45+
private changeSize(ec: EventChanges): number {
46+
let size = 0;
47+
if (ec.put) {
48+
size += ec.put.length;
49+
}
50+
if (ec.delete) {
51+
size += ec.delete.length;
52+
}
53+
return size;
54+
}
55+
56+
private processLine(line: string) {
57+
const event = JSON.parse(line) as Event;
58+
switch (event.type) {
59+
case "start": {
60+
this.state = "IN_PROGRESS";
61+
this.writeEmitter.fire(
62+
"Starting synchronization (" +
63+
this.changeSize(event) +
64+
" files)\r\n"
65+
);
66+
break;
67+
}
68+
case "progress": {
69+
let action = "";
70+
switch (event.action) {
71+
case "put":
72+
action = "Uploaded";
73+
break;
74+
case "delete":
75+
action = "Deleted";
76+
break;
77+
}
78+
if (event.progress === 1.0) {
79+
this.writeEmitter.fire(action + " " + event.path + "\r\n");
80+
}
81+
break;
82+
}
83+
case "complete":
84+
this.state = "WATCHING_FOR_CHANGES";
85+
this.writeEmitter.fire("Completed synchronization\r\n");
86+
break;
87+
}
88+
}
89+
90+
public processStderr(data: string) {
91+
const logLines = data.split("\n");
92+
for (let i = 0; i < logLines.length; i++) {
93+
const line = logLines[i].trim();
94+
if (line.length === 0) {
95+
continue;
96+
}
97+
this.writeEmitter.fire(line.trim() + "\r\n");
98+
if (this.matchForErrors(line)) {
99+
return;
100+
}
101+
}
102+
}
103+
104+
private matchForErrors(line: string) {
105+
if (line.match(/^Error: .*Files in Workspace is disabled.*/) !== null) {
106+
this.syncStateCallback("FILES_IN_WORKSPACE_DISABLED");
107+
return true;
108+
}
109+
110+
if (line.match(/^Error: .*Files in Repos is disabled.*/) !== null) {
111+
this.syncStateCallback("FILES_IN_REPOS_DISABLED");
112+
return true;
113+
}
114+
115+
const match = line.match(/^Error: (.*)/);
116+
if (match !== null) {
117+
this.syncStateCallback("ERROR", match[1]);
118+
return true;
119+
}
120+
121+
return false;
122+
}
123+
124+
// This function processes the JSON output from databricks sync and parses it
125+
// to figure out if a synchronization step is in progress or has completed.
126+
public processStdout(data: string) {
127+
const logLines = data.split("\n");
128+
for (let i = 0; i < logLines.length; i++) {
129+
const line = logLines[i].trim();
130+
if (line.length === 0) {
131+
continue;
132+
}
133+
134+
try {
135+
this.processLine(line);
136+
} catch (error: any) {
137+
logging.NamedLogger.getOrCreate(Loggers.Extension).error(
138+
"Error parsing JSON line from databricks sync stdout: " +
139+
error
140+
);
141+
}
142+
143+
if (this.matchForErrors(line)) {
144+
return;
145+
}
146+
}
147+
148+
this.syncStateCallback(this.state);
149+
}
150+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import "@databricks/databricks-sdk";
2+
import assert from "assert";
3+
import {mock, instance, capture, reset} from "ts-mockito";
4+
import {EventEmitter} from "vscode";
5+
import {SyncState} from "../sync";
6+
import {DatabricksCliSyncParser} from "./DatabricksCliSyncParser";
7+
8+
describe("tests for SycnParser", () => {
9+
let syncState: SyncState = "STOPPED";
10+
let mockedOutput: EventEmitter<string>;
11+
let databricksSyncParser: DatabricksCliSyncParser;
12+
13+
const syncStateCallback = (state: SyncState) => {
14+
syncState = state;
15+
};
16+
17+
beforeEach(() => {
18+
syncState = "STOPPED";
19+
mockedOutput = mock(EventEmitter<string>);
20+
databricksSyncParser = new DatabricksCliSyncParser(
21+
syncStateCallback,
22+
instance(mockedOutput)
23+
);
24+
});
25+
26+
it("ignores empty lines", () => {
27+
assert.equal(syncState, "STOPPED");
28+
databricksSyncParser.processStdout("\n\n");
29+
assert.equal(syncState, "STOPPED");
30+
});
31+
32+
it("ignores non-JSON lines", () => {
33+
assert.equal(syncState, "STOPPED");
34+
databricksSyncParser.processStdout("foo\nbar\n");
35+
assert.equal(syncState, "STOPPED");
36+
});
37+
38+
it("transitions from STOPPED -> IN_PROGRESS -> WATCHING_FOR_CHANGES", () => {
39+
assert.equal(syncState, "STOPPED");
40+
databricksSyncParser.processStdout(`{"type": "start"}`);
41+
assert.equal(syncState, "IN_PROGRESS");
42+
databricksSyncParser.processStdout(`{"type": "complete"}`);
43+
assert.equal(syncState, "WATCHING_FOR_CHANGES");
44+
});
45+
46+
it("writes start events to the terminal", () => {
47+
databricksSyncParser.processStdout(
48+
`{"type": "start", "put": ["hello"], "delete": ["world"]}`
49+
);
50+
const arg = capture(mockedOutput.fire).first();
51+
assert.match(arg[0], /^Starting synchronization /);
52+
});
53+
54+
it("writes progress events to the terminal", () => {
55+
databricksSyncParser.processStdout(
56+
`{"type": "progress", "action": "put", "path": "hello", "progress": 0.0}`
57+
);
58+
databricksSyncParser.processStdout(
59+
`{"type": "progress", "action": "put", "path": "hello", "progress": 1.0}`
60+
);
61+
assert.match(
62+
capture(mockedOutput.fire).first()[0],
63+
/^Uploaded hello\r\n/
64+
);
65+
reset(mockedOutput);
66+
67+
databricksSyncParser.processStdout(
68+
`{"type": "progress", "action": "delete", "path": "hello", "progress": 0.0}`
69+
);
70+
databricksSyncParser.processStdout(
71+
`{"type": "progress", "action": "delete", "path": "hello", "progress": 1.0}`
72+
);
73+
assert.match(
74+
capture(mockedOutput.fire).first()[0],
75+
/^Deleted hello\r\n/
76+
);
77+
reset(mockedOutput);
78+
});
79+
80+
it("writes complete events to the terminal", () => {
81+
databricksSyncParser.processStdout(
82+
`{"type": "complete", "put": ["hello"], "delete": ["world"]}`
83+
);
84+
const arg = capture(mockedOutput.fire).first();
85+
assert.match(arg[0], /^Completed synchronization\r\n/);
86+
});
87+
});

0 commit comments

Comments
 (0)