Skip to content

Commit 1255e70

Browse files
committed
Replace the Eclipse uploading Spark Job jar with Livy session helper
Signed-off-by: Wei Zhang <[email protected]>
1 parent ece0900 commit 1255e70

File tree

3 files changed

+72
-25
lines changed

3 files changed

+72
-25
lines changed

PluginsAndFeatures/azure-toolkit-for-eclipse/com.microsoft.azuretools.hdinsight/src/com/microsoft/azuretools/hdinsight/common2/HDInsightUtil.java

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,36 @@
2424
import org.eclipse.ui.PlatformUI;
2525

2626
import com.microsoft.azuretools.azurecommons.helpers.NotNull;
27+
import com.microsoft.azuretools.azurecommons.helpers.Nullable;
2728
import com.microsoft.azuretools.hdinsight.Activator;
2829
import com.microsoft.azuretools.hdinsight.SparkSubmissionToolWindowView;
30+
31+
import rx.subjects.ReplaySubject;
32+
2933
import com.microsoft.azure.hdinsight.common.MessageInfoType;
3034
import static com.microsoft.azure.hdinsight.common.MessageInfoType.*;
3135

36+
import java.util.AbstractMap.SimpleImmutableEntry;
37+
import java.util.Optional;
38+
3239
public class HDInsightUtil {
3340
private static SparkSubmissionToolWindowView sparkSubmissionToolWindowView;
34-
41+
42+
// The replay subject for the message showed in HDInsight tool window
43+
// The replay subject will replay all notifications before the initialization is
44+
// done
45+
// The replay buffer size is 1MB.
46+
@NotNull
47+
@SuppressWarnings("null")
48+
private static ReplaySubject<SimpleImmutableEntry<MessageInfoType, String>> toolWindowMessageSubject =
49+
ReplaySubject.create(1024 * 1024);
50+
51+
@NotNull
52+
public static ReplaySubject<SimpleImmutableEntry<MessageInfoType, String>> getToolWindowMessageSubject() {
53+
return toolWindowMessageSubject;
54+
}
55+
56+
@Nullable
3557
public static SparkSubmissionToolWindowView getSparkSubmissionToolWindowView() {
3658
if (sparkSubmissionToolWindowView == null) {
3759
Display.getDefault().syncExec(new Runnable() {
@@ -41,6 +63,9 @@ public void run() {
4163
sparkSubmissionToolWindowView = (SparkSubmissionToolWindowView) PlatformUI.getWorkbench()
4264
.getActiveWorkbenchWindow().getActivePage()
4365
.showView(SparkSubmissionToolWindowView.class.getName());
66+
67+
Optional.ofNullable(sparkSubmissionToolWindowView)
68+
.ifPresent(view -> resetToolWindowMessageSubject(view));
4469
} catch (PartInitException e) {
4570
Activator.getDefault().log(e.getMessage(), e);
4671
}
@@ -54,15 +79,15 @@ public static synchronized void setHyperLinkWithText(final String text, final St
5479
Display.getDefault().syncExec(new Runnable() {
5580
@Override
5681
public void run() {
57-
getSparkSubmissionToolWindowView().setHyperLinkWithText(text, hyperlinkUrl, anchorText);
82+
SparkSubmissionToolWindowView view = getSparkSubmissionToolWindowView();
83+
84+
if (view != null) {
85+
view.setHyperLinkWithText(text, hyperlinkUrl, anchorText);
86+
}
5887
}
5988
});
6089
}
6190

62-
public static void showInfoOnSubmissionMessageWindow(/*@NotNull */final String message, boolean isNeedClear) {
63-
showInfoOnSubmissionMessageWindow(Info, message, isNeedClear);
64-
}
65-
6691
public static void showInfoOnSubmissionMessageWindow(@NotNull final String message) {
6792
showInfoOnSubmissionMessageWindow(Info, message, false);
6893
}
@@ -75,30 +100,52 @@ public static void showWarningMessageOnSubmissionMessageWindow(@NotNull final St
75100
showInfoOnSubmissionMessageWindow(Warning, message, false);
76101
}
77102

78-
private static void showInfoOnSubmissionMessageWindow(@NotNull final MessageInfoType type, @NotNull final String message, @NotNull final boolean isNeedClear) {
103+
private static void showInfoOnSubmissionMessageWindow(@NotNull final MessageInfoType type,
104+
@NotNull final String message,
105+
final boolean isNeedClear) {
79106
Display.getDefault().syncExec(new Runnable() {
80107
@Override
81108
public void run() {
82-
showSubmissionMessage(getSparkSubmissionToolWindowView(), message, type, isNeedClear);
109+
showSubmissionMessage(getSparkSubmissionToolWindowView(), message, type, isNeedClear);
83110
}
84111
});
85112
}
86113

87-
private static void showSubmissionMessage(@NotNull SparkSubmissionToolWindowView sparkSubmissionView, @NotNull String message, @NotNull MessageInfoType type, @NotNull final boolean isNeedClear) {
88-
if (isNeedClear) {
89-
sparkSubmissionView.clearAll();
114+
private static void resetToolWindowMessageSubject(@Nullable SparkSubmissionToolWindowView sparkSubmissionToolWindowView) {
115+
if (sparkSubmissionToolWindowView == null) {
116+
return;
90117
}
91118

92-
switch (type) {
119+
toolWindowMessageSubject = ReplaySubject.create(1024 * 1024);
120+
121+
getToolWindowMessageSubject().subscribe(entry -> {
122+
String message = entry.getValue();
123+
124+
switch (entry.getKey()) {
93125
case Error:
94-
sparkSubmissionView.setError(message);
126+
sparkSubmissionToolWindowView.setError(message);
95127
break;
96128
case Info:
97-
sparkSubmissionView.setInfo(message);
129+
sparkSubmissionToolWindowView.setInfo(message);
98130
break;
99131
case Warning:
100-
sparkSubmissionView.setWarning(message);
132+
sparkSubmissionToolWindowView.setWarning(message);
133+
break;
134+
default:
101135
break;
136+
}
137+
});
138+
}
139+
140+
private static void showSubmissionMessage(@Nullable SparkSubmissionToolWindowView sparkSubmissionView,
141+
@NotNull String message,
142+
@NotNull MessageInfoType type,
143+
final boolean isNeedClear) {
144+
if (isNeedClear && sparkSubmissionView != null) {
145+
sparkSubmissionView.clearAll();
146+
resetToolWindowMessageSubject(sparkSubmissionView);
102147
}
148+
149+
getToolWindowMessageSubject().onNext(new SimpleImmutableEntry<>(type, message));
103150
}
104151
}

PluginsAndFeatures/azure-toolkit-for-eclipse/com.microsoft.azuretools.hdinsight/src/com/microsoft/azuretools/hdinsight/spark/common2/SparkSubmitHelper.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import com.microsoft.azure.hdinsight.spark.common.SparkBatchSubmission;
5454
import com.microsoft.azure.hdinsight.spark.common.SparkJobLog;
5555
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitResponse;
56+
import com.microsoft.azure.hdinsight.spark.jobs.JobUtils;
5657
import com.microsoft.tooling.msservices.helpers.CallableSingleArg;
5758
import com.microsoft.azuretools.azurecommons.helpers.NotNull;
5859
import com.microsoft.azuretools.azurecommons.helpers.StringHelper;
@@ -220,7 +221,8 @@ public void printRunningLogStreamingly(/* Project project, */ int id, IClusterDe
220221
}
221222
}
222223

223-
public String uploadFileToHDFS(/* Project project, */ String localFile, IHDIStorageAccount storageAccount,
224+
public String uploadFileToADL(/* Project project, */ String localFile,
225+
IHDIStorageAccount storageAccount,
224226
String defaultContainerName, String uploadFolderPath) throws Exception {
225227
final File file = new File(localFile);
226228
if (storageAccount.getAccountType() == StorageAccountTypeEnum.BLOB) {
@@ -383,15 +385,12 @@ public String sftpFileToEmulator(String localFile, String folderPath, IClusterDe
383385
}
384386
}
385387

386-
public static String uploadFileToHDFS(/* @NotNull Project project, */ @NotNull IClusterDetail selectedClusterDetail,
387-
@NotNull String buildJarPath) throws Exception {
388-
389-
HDInsightUtil.showInfoOnSubmissionMessageWindow(String.format("Info : Get target jar from %s.", buildJarPath));
390-
final String uploadShortPath = getFormatPathByDate();
388+
public static String uploadFileToHDFS(@NotNull IClusterDetail selectedClusterDetail,
389+
@NotNull String buildJarPath)
390+
throws Exception {
391391

392-
return SparkSubmitHelper.getInstance().uploadFileToHDFS(/* project, */ buildJarPath,
393-
selectedClusterDetail.getStorageAccount(),
394-
selectedClusterDetail.getStorageAccount().getDefaultContainerOrRootPath(), uploadShortPath);
392+
return JobUtils.uploadFileToHDFS(selectedClusterDetail, buildJarPath,
393+
HDInsightUtil.getToolWindowMessageSubject());
395394
}
396395

397396
public static String getLivyConnectionURL(IClusterDetail clusterDetail) {

PluginsAndFeatures/azure-toolkit-for-eclipse/com.microsoft.azuretools.hdinsight/src/com/microsoft/azuretools/hdinsight/spark/common2/SparkSubmitModel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,8 @@ private void writeJobLogToLocal() {
262262
}
263263
}
264264

265-
private IClusterDetail getClusterConfiguration(@NotNull final IClusterDetail selectedClusterDetail, @NotNull final boolean isFirstSubmit) {
265+
private IClusterDetail getClusterConfiguration(@NotNull final IClusterDetail selectedClusterDetail,
266+
final boolean isFirstSubmit) {
266267
try {
267268
if (!selectedClusterDetail.isConfigInfoAvailable()) {
268269
selectedClusterDetail.getConfigurationInfo();

0 commit comments

Comments
 (0)