Skip to content

Commit 89baaff

Browse files
author
Olga Annenkova
committed
feat: managed spark wrapper
1 parent 7d462dd commit 89baaff

File tree

6 files changed

+718
-1
lines changed

6 files changed

+718
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ Check `examples` directory for more examples.
117117
| yandex.cloud.serverless.containers | serverless-containers |
118118
| yandex.cloud.serverless.functions | serverless-functions |
119119
| yandex.cloud.serverless.triggers | serverless-triggers |
120+
| yandex.cloud.spark | managed-spark |
120121
| yandex.cloud.storage | storage-api |
121122
| yandex.cloud.vpc | vpc |
122123
| yandex.cloud.ydb | ydb |

examples/spark/main.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import grpc
4+
import json
5+
import logging
6+
import os
7+
8+
import yandexcloud
9+
from yandexcloud.operations import OperationError
10+
11+
USER_AGENT = "ycloud-python-sdk:spark"
12+
13+
14+
def parse_cmd():
15+
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
16+
17+
auth = parser.add_mutually_exclusive_group(required=True)
18+
auth.add_argument(
19+
"--sa-json-path",
20+
help="Path to the service account key JSON file.\nThis file can be created using YC CLI:\n"
21+
"yc iam key create --output sa.json --service-account-id <id>",
22+
)
23+
auth.add_argument("--token", help="OAuth token")
24+
25+
parser.add_argument("--folder-id", help="Your Yandex.Cloud folder id", required=True)
26+
parser.add_argument("--service-account-id", type=str, default="")
27+
parser.add_argument("--subnet-id", type=str, action="extend", nargs="*", dest="subnet_ids")
28+
parser.add_argument("--security-group-id", type=str, action="extend", nargs="*", dest="security_group_ids")
29+
parser.add_argument(
30+
"--job-name",
31+
type=str, default="pi number",
32+
)
33+
parser.add_argument(
34+
"--job-script",
35+
type=str, default="local:///opt/bitnami/spark/examples/src/main/python/pi.py",
36+
)
37+
parser.add_argument(
38+
"--job-arg",
39+
type=str, action="extend", nargs="*", dest="job_args", default=["1000"],
40+
)
41+
return parser.parse_args()
42+
43+
44+
def main():
45+
logging.basicConfig(level=logging.INFO)
46+
arguments = parse_cmd()
47+
48+
if arguments.token:
49+
sdk = yandexcloud.SDK(token=arguments.token, user_agent=USER_AGENT)
50+
else:
51+
with open(arguments.sa_json_path) as infile:
52+
sdk = yandexcloud.SDK(service_account_key=json.load(infile), user_agent=USER_AGENT)
53+
54+
spark_client = sdk.wrappers.Spark()
55+
56+
cluster_spec = sdk.wrappers.SparkClusterParameters(
57+
folder_id=arguments.folder_id,
58+
description="created with python-sdk",
59+
service_account_id=arguments.service_account_id,
60+
subnet_ids=arguments.subnet_ids,
61+
security_group_ids=arguments.security_group_ids,
62+
driver_pool_resource_preset="c2-m8",
63+
driver_pool_size=1,
64+
executor_pool_resource_preset="c4-m16",
65+
executor_pool_min_size=1,
66+
executor_pool_max_size=2,
67+
)
68+
69+
try:
70+
spark_client.create_cluster(cluster_spec)
71+
72+
try:
73+
job_spec = sdk.wrappers.PysparkJobParameters(
74+
name=arguments.job_name,
75+
main_python_file_uri=arguments.job_script,
76+
args=arguments.job_args,
77+
)
78+
job_operation = spark_client.create_pyspark_job(job_spec)
79+
job_id = job_operation.response.id
80+
job_info = job_operation.response
81+
82+
except OperationError as job_error:
83+
job_id = job_error.operation_result.meta.job_id
84+
job_info, _ = spark_client.get_job(job_id=job_id)
85+
raise
86+
87+
finally:
88+
job_log = spark_client.get_job_log(job_id=job_id)
89+
for line in job_log:
90+
logging.info(line)
91+
92+
logging.info("Job info: %s", job_info)
93+
94+
except grpc.RpcError:
95+
logging.exception("GRPC Error:")
96+
except OperationError:
97+
logging.exception("Operation Error:")
98+
finally:
99+
if spark_client.cluster_id is not None:
100+
spark_client.delete_cluster()
101+
102+
103+
if __name__ == "__main__":
104+
main()

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

yandexcloud/_sdk.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def _service_for_ctor(stub_ctor: Any) -> str:
190190
("yandex.cloud.serverless.containers", "serverless-containers"),
191191
("yandex.cloud.serverless.functions", "serverless-functions"),
192192
("yandex.cloud.serverless.triggers", "serverless-triggers"),
193+
("yandex.cloud.spark", "managed-spark"),
193194
("yandex.cloud.storage", "storage-api"),
194195
("yandex.cloud.vpc", "vpc"),
195196
("yandex.cloud.ydb", "ydb"),

yandexcloud/_wrappers/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
from typing import TYPE_CHECKING
22

33
from yandexcloud._wrappers.dataproc import Dataproc, InitializationAction
4+
from yandexcloud._wrappers.spark import (
5+
Spark, SparkClusterParameters, SparkJobParameters, PysparkJobParameters
6+
)
47

58
if TYPE_CHECKING:
69
from yandexcloud._sdk import SDK
@@ -13,3 +16,12 @@ def __init__(self, sdk: "SDK"):
1316
self.Dataproc.sdk = sdk
1417
# pylint: disable-next=invalid-name
1518
self.InitializationAction = InitializationAction
19+
# pylint: disable-next=invalid-name
20+
self.Spark = Spark
21+
# pylint: disable-next=invalid-name
22+
self.SparkClusterParameters = SparkClusterParameters
23+
# pylint: disable-next=invalid-name
24+
self.SparkJobParameters = SparkJobParameters
25+
# pylint: disable-next=invalid-name
26+
self.PysparkJobParameters = PysparkJobParameters
27+
self.Spark.sdk = sdk

0 commit comments

Comments
 (0)