Skip to content

Commit a7253dc

Browse files
authored
Stop with savepoint if query has been removed (#19)
1 parent 80346c7 commit a7253dc

File tree

10 files changed

+455
-225
lines changed

10 files changed

+455
-225
lines changed

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,7 @@ _Decision_: Flink jobs are deployed on a shared Flink Session Cluster.
179179
- All necessary Flink dependencies are available on the classpath on the EMR master node.
180180
- Python virtualenv with all necessary python dependencies is created on each EMR node.
181181
```bash
182-
python3 -m pip install \
183-
-r flink-sql-runner/deployment-scripts/jobs-deployment/requirements.txt \
184-
-r flink-sql-runner/python/requirements.txt
182+
python3 -m pip install flink-sql-runner
185183
```
186184
- An S3 bucket for storing YAML manifests file is created. See [Job manifest](#job-manifest) section.
187185

flink_sql_runner/deploy.py

Lines changed: 115 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
import argparse
33
import logging
44
import os
5-
import tempfile
65
from typing import List
76

87
import yaml
98

10-
from flink_sql_runner.deploy_job import EmrJobRunner, JinjaTemplateResolver
11-
from flink_sql_runner.flink_clients import (FlinkStandaloneClusterRunner,
9+
from flink_sql_runner.deploy_job import FlinkJobRunner, JinjaTemplateResolver
10+
from flink_sql_runner.flink_clients import (FlinkCli,
11+
FlinkStandaloneClusterRunner,
1212
FlinkYarnRunner)
13+
from flink_sql_runner.job_configuration import JobConfiguration
14+
from flink_sql_runner.manifest import ManifestManager
1315

1416
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
1517

@@ -58,58 +60,117 @@ def parse_args():
5860
return parser.parse_known_args()
5961

6062

61-
def list_query_files(base_path: str) -> List[str]:
62-
result = []
63-
for root, dirs, files in os.walk(base_path):
64-
for f in files:
65-
if f.endswith(".yaml"):
66-
result.append(os.path.abspath(os.path.join(root, f)))
67-
return result
68-
69-
70-
def read_config(query_file, template_file):
71-
# FIXME: refactor variables resolutions and yaml merge
72-
with open(query_file) as qf:
73-
query_specification = yaml.load(qf, yaml.FullLoader)
74-
if "sql" in query_specification:
75-
query_specification["sql"] = query_specification["sql"].replace("\n", " ")
76-
with open(template_file) as tf:
77-
raw_defaults = tf.read().format(job_name=query_specification["name"])
78-
default_config = yaml.safe_load(raw_defaults)
79-
final_flink_props = {
80-
**default_config["flinkProperties"],
81-
**query_specification["flinkProperties"],
82-
}
83-
final_config = {**default_config, **query_specification}
84-
final_config["flinkProperties"] = final_flink_props
85-
final_config["flinkProperties"]["pipeline.name"] = query_specification["name"]
86-
logging.info(f"Final configuration:\n{final_config}")
87-
return final_config
63+
class FlinkRunner:
64+
def __init__(self,
65+
queries_base_path: str,
66+
table_definition_path: str,
67+
pyflink_runner_dir: str,
68+
template_file: str,
69+
pyexec_path: str,
70+
flink_cli_runner: FlinkCli,
71+
manifest_manager: ManifestManager,
72+
jinja_template_resolver: JinjaTemplateResolver,
73+
passthrough_args,
74+
):
75+
self.queries_base_path = queries_base_path
76+
self.table_definition_path = table_definition_path
77+
self.pyflink_runner_dir = pyflink_runner_dir
78+
self.template_file = template_file
79+
self.pyexec_path = pyexec_path
80+
self.flink_cli_runner = flink_cli_runner
81+
self.manifest_manager = manifest_manager
82+
self.jinja_template_resolver = jinja_template_resolver
83+
self.passthrough_args = passthrough_args
84+
85+
def run(self):
86+
query_files = self.__list_query_files(self.queries_base_path)
87+
existing_query_manifests = self.manifest_manager.list_manifests()
88+
queries_to_cancel = self.__get_queries_to_cancel(query_files, existing_query_manifests)
89+
90+
logging.info("Queries to start or update: {}", query_files)
91+
logging.info("Queries to cancel: {}", queries_to_cancel)
92+
93+
self.__add_or_update_jobs(query_files)
94+
self.__cancel_deleted_jobs(queries_to_cancel)
95+
96+
def __get_queries_to_cancel(self, query_files: List[str], existing_query_manifests: List[str]) -> List[str]:
97+
running_query_names = [n.split("/")[-1] for n in existing_query_manifests]
98+
current_query_names = [n.split("/")[-1] for n in query_files]
99+
difference = set(running_query_names).difference(set(current_query_names))
100+
return [q.replace(".yaml", "") for q in difference]
101+
102+
def __add_or_update_jobs(self, query_files):
103+
for query_file in query_files:
104+
final_config = self.__read_config(query_file, self.template_file)
105+
new_job_conf = JobConfiguration(final_config)
106+
FlinkJobRunner(
107+
job_name=new_job_conf.get_name(),
108+
new_job_conf=new_job_conf,
109+
pyflink_runner_dir=self.pyflink_runner_dir,
110+
table_definition_paths=self.table_definition_path,
111+
pyexec_path=self.pyexec_path,
112+
flink_cli_runner=self.flink_cli_runner,
113+
jinja_template_resolver=self.jinja_template_resolver,
114+
manifest_manager=self.manifest_manager,
115+
passthrough_args=self.passthrough_args,
116+
).run()
117+
118+
def __cancel_deleted_jobs(self, queries_to_cancel):
119+
for query_to_remove in queries_to_cancel:
120+
FlinkJobRunner(
121+
job_name=query_to_remove,
122+
new_job_conf=None,
123+
pyflink_runner_dir=self.pyflink_runner_dir,
124+
table_definition_paths=self.table_definition_path,
125+
pyexec_path=self.pyexec_path,
126+
flink_cli_runner=self.flink_cli_runner,
127+
jinja_template_resolver=self.jinja_template_resolver,
128+
manifest_manager=self.manifest_manager,
129+
passthrough_args=self.passthrough_args,
130+
).run()
131+
132+
def __list_query_files(self, base_path: str) -> List[str]:
133+
result = []
134+
for root, dirs, files in os.walk(base_path):
135+
for f in files:
136+
if f.endswith(".yaml"):
137+
result.append(os.path.abspath(os.path.join(root, f)))
138+
return result
139+
140+
def __read_config(self, query_file, template_file):
141+
# FIXME: refactor variables resolutions and yaml merge
142+
with open(query_file) as qf:
143+
query_specification = yaml.load(qf, yaml.FullLoader)
144+
if "sql" in query_specification:
145+
query_specification["sql"] = query_specification["sql"].replace("\n", " ")
146+
with open(template_file) as tf:
147+
raw_defaults = tf.read().format(job_name=query_specification["name"])
148+
default_config = yaml.safe_load(raw_defaults)
149+
final_flink_props = {
150+
**default_config["flinkProperties"],
151+
**query_specification["flinkProperties"],
152+
}
153+
final_config = {**default_config, **query_specification}
154+
final_config["flinkProperties"] = final_flink_props
155+
logging.info(f"Final configuration:\n{final_config}")
156+
return final_config
88157

89158

90159
if __name__ == "__main__":
91160
args, passthrough_args = parse_args()
92-
query_files = list_query_files(args.path)
93-
94-
for query_file in query_files:
95-
final_config = read_config(query_file, args.template_file)
96-
query_name = final_config["name"]
97-
with tempfile.NamedTemporaryFile(mode="w+t", prefix=query_name, suffix=".yaml") as tmp:
98-
yaml.dump(final_config, tmp)
99-
flink_cli_runner = (
100-
FlinkYarnRunner()
101-
if args.deployment_target == "yarn"
102-
else FlinkStandaloneClusterRunner(args.jobmanager_address)
103-
)
104-
jinja_template_resolver = JinjaTemplateResolver()
105-
EmrJobRunner(
106-
tmp.name,
107-
args.pyflink_runner_dir,
108-
args.external_job_config_bucket,
109-
args.external_job_config_prefix,
110-
args.table_definition_path,
111-
args.pyexec_path,
112-
flink_cli_runner,
113-
jinja_template_resolver,
114-
passthrough_args,
115-
).run()
161+
162+
FlinkRunner(
163+
queries_base_path=args.path,
164+
table_definition_path=args.table_definition_path,
165+
pyflink_runner_dir=args.pyflink_runner_dir,
166+
template_file=args.template_file,
167+
pyexec_path=args.pyexec_path,
168+
flink_cli_runner=(
169+
FlinkYarnRunner()
170+
if args.deployment_target == "yarn"
171+
else FlinkStandaloneClusterRunner(args.jobmanager_address)
172+
),
173+
manifest_manager=ManifestManager(args.external_job_config_bucket, args.external_job_config_prefix),
174+
jinja_template_resolver=JinjaTemplateResolver(),
175+
passthrough_args=passthrough_args,
176+
).run()

flink_sql_runner/deploy_job.py

Lines changed: 44 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99
from typing import Any, Dict, List, Optional, Tuple
1010

1111
import yaml
12-
from jinja2 import Environment, FileSystemLoader
1312

1413
from flink_sql_runner.flink_clients import (FlinkCli,
1514
FlinkStandaloneClusterRunner,
1615
FlinkYarnRunner)
16+
from flink_sql_runner.jinja import JinjaTemplateResolver
1717
from flink_sql_runner.job_configuration import JobConfiguration
18-
from flink_sql_runner.s3 import get_content, get_latest_object, upload_content
18+
from flink_sql_runner.manifest import ManifestManager
19+
from flink_sql_runner.s3 import get_latest_object
1920

2021
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
2122

@@ -66,87 +67,69 @@ def parse_args():
6667
return parser.parse_known_args()
6768

6869

69-
class JinjaTemplateResolver(object):
70-
def resolve(
71-
self,
72-
template_dir: str,
73-
template_file: str,
74-
vars: Dict[str, str],
75-
output_file_path: str,
76-
) -> None:
77-
environment = Environment(loader=FileSystemLoader(template_dir))
78-
template = environment.get_template(template_file)
79-
content = template.render(**vars)
80-
with open(output_file_path, mode="w", encoding="utf-8") as run_file:
81-
run_file.truncate()
82-
run_file.write(content)
83-
84-
85-
class EmrJobRunner(object):
70+
class FlinkJobRunner(object):
8671
def __init__(
8772
self,
88-
job_config_path: str,
73+
job_name: str,
74+
new_job_conf: Optional[JobConfiguration],
8975
pyflink_runner_dir: str,
90-
external_job_config_bucket: str,
91-
external_job_config_prefix: str,
9276
table_definition_paths: str,
9377
pyexec_path: str,
9478
flink_cli_runner: FlinkCli,
9579
jinja_template_resolver: JinjaTemplateResolver,
80+
manifest_manager: ManifestManager,
9681
passthrough_args: List[str],
9782
):
98-
self.job_config_path = job_config_path
83+
self.job_name = job_name
84+
self.new_job_conf = new_job_conf
9985
self.pyflink_runner_dir = pyflink_runner_dir
100-
self.external_job_config_bucket = external_job_config_bucket
101-
self.external_job_config_prefix = external_job_config_prefix
10286
self.table_definition_paths = table_definition_paths
10387
self.pyexec_path = pyexec_path
10488
self.pyclientexec_path = pyexec_path
10589
self.flink_cli_runner = flink_cli_runner
10690
self.jinja_template_resolver = jinja_template_resolver
91+
self.manifest_manager = manifest_manager
10792
self.passthrough_args = passthrough_args
108-
self.new_job_conf = JobConfiguration(self.__read_config(job_config_path))
10993

11094
def run(self) -> None:
111-
logging.info(f"Deploying '{self.new_job_conf.get_name()}'.")
95+
if self.new_job_conf is None:
96+
logging.info(f"Deleting job '{self.job_name}'.")
97+
job_manifest = self.manifest_manager.fetch_job_manifest(self.job_name)
98+
if job_manifest is None:
99+
raise ValueError(f"Job manifest for {self.job_name} not found.")
100+
self.__stop_with_savepoint(job_manifest)
101+
return
102+
103+
logging.info(f"Deploying '{self.job_name}'.")
112104
if self.new_job_conf.is_sql():
113105
logging.info(f"Deploying query: |{self.new_job_conf.get_sql()}|")
114106
else:
115107
logging.info(f"Deploying code:\n{self.new_job_conf.get_code()}")
116108

117-
external_config = self.__fetch_job_manifest(
118-
self.external_job_config_bucket,
119-
self.external_job_config_prefix,
120-
self.new_job_conf.get_name(),
121-
)
109+
external_config = self.manifest_manager.fetch_job_manifest(self.job_name)
122110
logging.info(f"External config:\n{external_config}")
123111

124112
if not external_config:
125113
# The job manifest did not exist. Starting a newly created job.
126114
self.__start_new_job(self.new_job_conf)
127-
self.__upload_job_manifest(self.new_job_conf)
115+
self.manifest_manager.upload_job_manifest(self.new_job_conf)
128116
elif external_config and not self.__has_job_manifest_changed(external_config, self.new_job_conf):
129117
# The job manifest has not been modified. There is no need to restart the job. Just ensure it's running.
130-
if self.__is_job_running(self.new_job_conf.get_name()):
118+
if self.__is_job_running(self.job_name):
131119
logging.info("Job manifest has not changed. Skipping job restart.")
132120
else:
133121
self.__start_job_with_unchanged_query(external_config, self.new_job_conf)
134122
else:
135123
# The job manifest has been modified. Job needs to be restarted.
136-
if self.__is_job_running(self.new_job_conf.get_name()):
124+
if self.__is_job_running(self.job_name):
137125
# Stop the job using the old config (query-version in particular).
138126
self.__stop_with_savepoint(external_config)
139127

140128
if external_config and not self.__has_job_definition_changed(external_config, self.new_job_conf):
141129
self.__start_job_with_unchanged_query(external_config, self.new_job_conf)
142130
else:
143131
self.__start_new_job_with_changed_query(external_config, self.new_job_conf)
144-
self.__upload_job_manifest(self.new_job_conf)
145-
146-
@staticmethod
147-
def __read_config(config_file: str):
148-
with open(config_file) as qf:
149-
return yaml.load(qf, yaml.FullLoader)
132+
self.manifest_manager.upload_job_manifest(self.new_job_conf)
150133

151134
def __is_job_running(self, job_name: str) -> bool:
152135
return self.flink_cli_runner.is_job_running(job_name)
@@ -169,12 +152,6 @@ def __start_new_job_with_changed_query(self, external_config, job_conf):
169152
job_conf.set_meta_query_create_timestamp(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
170153
self.__start_with_clean_state(job_conf)
171154

172-
def __upload_job_manifest(self, job_conf):
173-
upload_path = os.path.join(self.external_job_config_prefix, f"{job_conf.get_name()}.yaml")
174-
logging.info(f"Uploading the new config file to 's3://{self.external_job_config_bucket}/{upload_path}'.")
175-
upload_content(yaml.dump(job_conf.to_dict()), self.external_job_config_bucket, upload_path)
176-
logging.info("The config file has been uploaded.")
177-
178155
def __stop_with_savepoint(self, job_conf: JobConfiguration) -> None:
179156
job_id = self.flink_cli_runner.get_job_id(job_conf.get_name())
180157
savepoint_path = os.path.join(job_conf.get_flink_savepoints_dir(), job_conf.get_meta_query_version_str())
@@ -316,12 +293,6 @@ def __find_latest_state_internal(
316293
logging.info(f"State found at '{state_path}'.")
317294
return state_path, last_created_ts
318295

319-
def __fetch_job_manifest(self, bucket_name: str, prefix: str, job_name: str) -> Optional[JobConfiguration]:
320-
object_key = os.path.join(prefix, f"{job_name}.yaml")
321-
logging.info(f"Looking for config at s3://{bucket_name}/{object_key}.")
322-
raw_manifest = get_content(bucket_name, object_key)
323-
return JobConfiguration(yaml.safe_load(raw_manifest)) if raw_manifest else None
324-
325296
def __has_job_manifest_changed(self, old_job_conf: JobConfiguration, new_job_conf: JobConfiguration) -> bool:
326297
return self.__has_job_definition_changed(old_job_conf, new_job_conf) or self.__have_flink_properties_changed(
327298
old_job_conf, new_job_conf
@@ -352,21 +323,26 @@ def __escape_query(query: str) -> str:
352323
return query.replace("`", "\\`")
353324

354325

326+
def read_config(config_file: str):
327+
with open(config_file) as qf:
328+
return yaml.load(qf, yaml.FullLoader)
329+
330+
355331
if __name__ == "__main__":
356332
args, passthrough_args = parse_args()
357-
flink_cli_runner = (
358-
FlinkYarnRunner() if args.deployment_target == "yarn" else FlinkStandaloneClusterRunner(args.jobmanager_address)
359-
)
360-
jinja_template_resolver = JinjaTemplateResolver()
361-
362-
EmrJobRunner(
363-
args.job_config_path,
364-
args.pyflink_runner_dir,
365-
args.external_job_config_bucket,
366-
args.external_job_config_prefix,
367-
args.base_output_path,
368-
args.pyexec_path,
369-
flink_cli_runner,
370-
jinja_template_resolver,
371-
passthrough_args,
333+
configuration = JobConfiguration(read_config(args.job_config_path))
334+
FlinkJobRunner(
335+
job_name=configuration.get_name(),
336+
new_job_conf=configuration,
337+
pyflink_runner_dir=args.pyflink_runner_dir,
338+
table_definition_paths=args.base_output_path,
339+
pyexec_path=args.pyexec_path,
340+
flink_cli_runner=(
341+
FlinkYarnRunner()
342+
if args.deployment_target == "yarn"
343+
else FlinkStandaloneClusterRunner(args.jobmanager_address)
344+
),
345+
jinja_template_resolver=JinjaTemplateResolver(),
346+
manifest_manager=ManifestManager(args.external_job_config_bucket, args.external_job_config_prefix),
347+
passthrough_args=passthrough_args,
372348
).run()

0 commit comments

Comments
 (0)