Skip to content

Commit 29692e0

Browse files
authored
feat: managed spark wrapper
2 parents 7d462dd + 400693a commit 29692e0

File tree

6 files changed

+723
-1
lines changed

6 files changed

+723
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ Check `examples` directory for more examples.
117117
| yandex.cloud.serverless.containers | serverless-containers |
118118
| yandex.cloud.serverless.functions | serverless-functions |
119119
| yandex.cloud.serverless.triggers | serverless-triggers |
120+
| yandex.cloud.spark | managed-spark |
120121
| yandex.cloud.storage | storage-api |
121122
| yandex.cloud.vpc | vpc |
122123
| yandex.cloud.ydb | ydb |

examples/spark/main.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import grpc
4+
import json
5+
import logging
6+
import os
7+
8+
import yandexcloud
9+
from yandexcloud.operations import OperationError
10+
11+
12+
def parse_cmd():
13+
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
14+
15+
auth = parser.add_mutually_exclusive_group(required=True)
16+
auth.add_argument(
17+
"--sa-json-path",
18+
help="Path to the service account key JSON file.\nThis file can be created using YC CLI:\n"
19+
"yc iam key create --output sa.json --service-account-id <id>",
20+
)
21+
auth.add_argument("--token", help="OAuth token")
22+
23+
parser.add_argument("--folder-id", help="Your Yandex.Cloud folder id", required=True)
24+
parser.add_argument("--service-account-id", type=str, default="")
25+
parser.add_argument("--subnet-id", type=str, action="extend", nargs="*", dest="subnet_ids")
26+
parser.add_argument("--security-group-id", type=str, action="extend", nargs="*", dest="security_group_ids")
27+
parser.add_argument(
28+
"--job-name",
29+
type=str, default="pi number",
30+
)
31+
parser.add_argument(
32+
"--job-script",
33+
type=str, default="local:///opt/bitnami/spark/examples/src/main/python/pi.py",
34+
)
35+
parser.add_argument(
36+
"--job-arg",
37+
type=str, action="extend", nargs="*", dest="job_args", default=["1000"],
38+
)
39+
return parser.parse_args()
40+
41+
42+
def main():
43+
logging.basicConfig(level=logging.INFO)
44+
arguments = parse_cmd()
45+
46+
if arguments.token:
47+
sdk = yandexcloud.SDK(token=arguments.token)
48+
else:
49+
with open(arguments.sa_json_path) as infile:
50+
sdk = yandexcloud.SDK(service_account_key=json.load(infile))
51+
52+
spark_client = sdk.wrappers.Spark()
53+
54+
cluster_spec = sdk.wrappers.SparkClusterParameters(
55+
folder_id=arguments.folder_id,
56+
description="created with python-sdk",
57+
service_account_id=arguments.service_account_id,
58+
subnet_ids=arguments.subnet_ids,
59+
security_group_ids=arguments.security_group_ids,
60+
driver_pool_resource_preset="c2-m8",
61+
driver_pool_size=1,
62+
executor_pool_resource_preset="c4-m16",
63+
executor_pool_min_size=1,
64+
executor_pool_max_size=2,
65+
)
66+
67+
try:
68+
spark_client.create_cluster(cluster_spec)
69+
70+
try:
71+
job_spec = sdk.wrappers.PysparkJobParameters(
72+
name=arguments.job_name,
73+
main_python_file_uri=arguments.job_script,
74+
args=arguments.job_args,
75+
)
76+
job_operation = spark_client.create_pyspark_job(job_spec)
77+
job_id = job_operation.response.id
78+
job_info = job_operation.response
79+
80+
except OperationError as job_error:
81+
job_id = job_error.operation_result.meta.job_id
82+
job_info, _ = spark_client.get_job(job_id=job_id)
83+
raise
84+
85+
finally:
86+
job_log = spark_client.get_job_log(job_id=job_id)
87+
for line in job_log:
88+
logging.info(line)
89+
90+
logging.info("Job info: %s", job_info)
91+
92+
except grpc.RpcError:
93+
logging.exception("GRPC Error:")
94+
except OperationError:
95+
logging.exception("Operation Error:")
96+
finally:
97+
if spark_client.cluster_id is not None:
98+
spark_client.delete_cluster()
99+
100+
101+
if __name__ == "__main__":
102+
main()

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

yandexcloud/_sdk.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def _service_for_ctor(stub_ctor: Any) -> str:
190190
("yandex.cloud.serverless.containers", "serverless-containers"),
191191
("yandex.cloud.serverless.functions", "serverless-functions"),
192192
("yandex.cloud.serverless.triggers", "serverless-triggers"),
193+
("yandex.cloud.spark", "managed-spark"),
193194
("yandex.cloud.storage", "storage-api"),
194195
("yandex.cloud.vpc", "vpc"),
195196
("yandex.cloud.ydb", "ydb"),

yandexcloud/_wrappers/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
from typing import TYPE_CHECKING
22

33
from yandexcloud._wrappers.dataproc import Dataproc, InitializationAction
4+
from yandexcloud._wrappers.spark import (
5+
PysparkJobParameters,
6+
Spark,
7+
SparkClusterParameters,
8+
SparkJobParameters,
9+
)
410

511
if TYPE_CHECKING:
612
from yandexcloud._sdk import SDK
@@ -13,3 +19,12 @@ def __init__(self, sdk: "SDK"):
1319
self.Dataproc.sdk = sdk
1420
# pylint: disable-next=invalid-name
1521
self.InitializationAction = InitializationAction
22+
# pylint: disable-next=invalid-name
23+
self.Spark = Spark
24+
# pylint: disable-next=invalid-name
25+
self.SparkClusterParameters = SparkClusterParameters
26+
# pylint: disable-next=invalid-name
27+
self.SparkJobParameters = SparkJobParameters
28+
# pylint: disable-next=invalid-name
29+
self.PysparkJobParameters = PysparkJobParameters
30+
self.Spark.sdk = sdk

0 commit comments

Comments
 (0)