Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,8 @@ steps:
run: upgrade-downtime
ci-builder: stable
agents:
queue: hetzner-aarch64-16cpu-32gb
# More stable results for recording benchmarks
queue: hetzner-x86-64-dedi-16cpu-64gb

- id: orchestratord-default-properties
label: "Orchestratord + defaults for properties"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from dataclasses import dataclass

from materialize import buildkite
from materialize.buildkite import BuildkiteEnvVar
from materialize.test_analytics.data.base_data_storage import BaseDataStorage
from materialize.test_analytics.util.mz_sql_util import as_sanitized_literal


@dataclass
class UpgradeDowntimeResultEntry:
scenario: str
scenario_version: str
downtime_initial: float
downtime_upgrade: float


class UpgradeDowntimeResultStorage(BaseDataStorage):

def add_result(
self,
framework_version: str,
results: list[UpgradeDowntimeResultEntry],
) -> None:
job_id = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_JOB_ID)

sql_statements = []

for result_entry in results:
# TODO: remove NULL castings when database-issues#8100 is resolved
sql_statements.append(
f"""
INSERT INTO upgrade_downtime_result
(
build_job_id,
framework_version,
scenario,
scenario_version,
downtime_initial,
downtime_upgrade
)
SELECT
{as_sanitized_literal(job_id)},
{as_sanitized_literal(framework_version)},
{as_sanitized_literal(result_entry.scenario)},
{as_sanitized_literal(result_entry.scenario_version)},
{result_entry.downtime_initial},
{result_entry.downtime_upgrade}
;
"""
)

self.database_connector.add_update_statements(sql_statements)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ DELETE FROM parallel_benchmark_result WHERE build_job_id IN (SELECT build_id FRO
DELETE FROM product_limits_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%));
DELETE FROM cluster_spec_sheet_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%));
DELETE FROM cluster_spec_sheet_environmentd_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%));
DELETE FROM upgrade_downtime_result WHERE build_job_id IN (SELECT build_id FROM build_job WHERE build_id IN (%build-ids%));
DELETE FROM build_annotation_error WHERE build_job_id IN (SELECT build_job_id FROM build_annotation WHERE build_id IN (%build-ids%));
DELETE FROM build_annotation WHERE build_id IN (%build-ids%);
DELETE FROM build_job WHERE build_id IN (%build-ids%);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
-- by the Apache License, Version 2.0.


-- result of individual product limits scenarios
CREATE TABLE cluster_spec_sheet_result (
build_job_id TEXT NOT NULL,
framework_version TEXT NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
-- by the Apache License, Version 2.0.


-- result of individual product limits scenarios
CREATE TABLE cluster_spec_sheet_environmentd_result (
build_job_id TEXT NOT NULL,
framework_version TEXT NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Copyright Materialize, Inc. and contributors. All rights reserved.
--
-- Use of this software is governed by the Business Source License
-- included in the LICENSE file at the root of this repository.
--
-- As of the Change Date specified in that file, in accordance with
-- the Business Source License, use of this software will be governed
-- by the Apache License, Version 2.0.


CREATE TABLE upgrade_downtime_result (
build_job_id TEXT NOT NULL,
framework_version TEXT NOT NULL,
scenario TEXT NOT NULL,
scenario_version TEXT NOT NULL,
downtime_initial TEXT NOT NULL,
downtime_upgrade TEXT NOT NULL
);

ALTER TABLE upgrade_downtime_result OWNER TO qa;
GRANT SELECT, INSERT, UPDATE ON TABLE upgrade_downtime_result TO "hetzner-ci";
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ CREATE OR REPLACE VIEW v_data_integrity (table_name, own_item_key, referenced_it
FROM cluster_spec_sheet_environmentd_result
WHERE build_job_id NOT IN (SELECT build_job_id FROM build_job)
UNION
SELECT 'upgrade_downtime_environmentd_result', build_job_id, build_job_id, 'upgrade downtime result references missing build job'
FROM upgrade_downtime_result
WHERE build_job_id NOT IN (SELECT build_job_id FROM build_job)
UNION
SELECT 'build_annotation', build_job_id, build_job_id, 'build annotation references missing build job'
FROM build_annotation
WHERE build_job_id NOT IN (SELECT build_job_id FROM build_job)
Expand Down
6 changes: 6 additions & 0 deletions misc/python/materialize/test_analytics/test_analytics_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
from materialize.test_analytics.data.scalability_framework.scalability_framework_result_storage import (
ScalabilityFrameworkResultStorage,
)
from materialize.test_analytics.data.upgrade_downtime.upgrade_downtime_result_storage import (
UpgradeDowntimeResultStorage,
)

TEST_ANALYTICS_DATA_VERSION: int = 21

Expand Down Expand Up @@ -83,6 +86,9 @@ def __init__(self, config: MzDbConfig):
self.cluster_spec_sheet_environmentd_results = (
ClusterSpecSheetEnvironmentdResultStorage(self.database_connector)
)
self.upgrade_downtime_results = UpgradeDowntimeResultStorage(
self.database_connector
)

def _create_database_connector(self, config: MzDbConfig) -> DatabaseConnector:
if config.enabled:
Expand Down
76 changes: 70 additions & 6 deletions test/orchestratord/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import yaml
from semver.version import Version

from materialize import MZ_ROOT, ci_util, git, spawn
from materialize import MZ_ROOT, buildkite, ci_util, git, spawn
from materialize.mz_version import MzVersion
from materialize.mzcompose.composition import (
Composition,
Expand All @@ -41,9 +41,17 @@
from materialize.mzcompose.services.balancerd import Balancerd
from materialize.mzcompose.services.clusterd import Clusterd
from materialize.mzcompose.services.environmentd import Environmentd
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.mz_debug import MzDebug
from materialize.mzcompose.services.orchestratord import Orchestratord
from materialize.mzcompose.services.testdrive import Testdrive
from materialize.test_analytics.config.test_analytics_db_config import (
create_test_analytics_config,
)
from materialize.test_analytics.data.upgrade_downtime import (
upgrade_downtime_result_storage,
)
from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
from materialize.util import PropagatingThread, all_subclasses
from materialize.version_list import (
get_all_self_managed_versions,
Expand All @@ -57,6 +65,7 @@
Clusterd(),
Balancerd(),
MzDebug(),
Mz(app_password=""),
]


Expand Down Expand Up @@ -1594,6 +1603,46 @@ def make_mod_source(
raise ValueError(f"Unhandled properties: {properties}")


# Bump this version if the upgrade-downtime workflow is changed in a way that changes the results uploaded to test analytics
UPGRADE_DOWNTIME_SCENARIO_VERSION = "1.0.0"
# Used for uploading test analytics results
ORCHESTRATORD_TEST_VERSION = "1.0.0"


def upload_upgrade_downtime_to_test_analytics(
composition: Composition,
downtime_initial: float,
downtime_upgrade: float,
was_successful: bool,
) -> None:
if not buildkite.is_in_buildkite():
return

test_analytics = TestAnalyticsDb(create_test_analytics_config(composition))
test_analytics.builds.add_build_job(was_successful=was_successful)

result_entries = [
upgrade_downtime_result_storage.UpgradeDowntimeResultEntry(
scenario="upgrade-downtime",
scenario_version=UPGRADE_DOWNTIME_SCENARIO_VERSION,
downtime_initial=downtime_initial,
downtime_upgrade=downtime_upgrade,
)
]

test_analytics.upgrade_downtime_results.add_result(
framework_version=ORCHESTRATORD_TEST_VERSION,
results=result_entries,
)

try:
test_analytics.submit_updates()
print("Uploaded results.")
except Exception as e:
# An error during an upload must never cause the build to fail
test_analytics.on_upload_failed(e)


def workflow_upgrade_downtime(c: Composition, parser: WorkflowArgumentParser) -> None:
parser.add_argument(
"--recreate-cluster",
Expand All @@ -1614,6 +1663,7 @@ def workflow_upgrade_downtime(c: Composition, parser: WorkflowArgumentParser) ->
args = parser.parse_args()

running = True
downtimes: list[float] = []

def measure_downtime() -> None:
port_forward_process = None
Expand Down Expand Up @@ -1650,6 +1700,7 @@ def measure_downtime() -> None:
preexec_fn=os.setpgrp,
)
connect_port_forward = False
time.sleep(1)
try:
with psycopg.connect(
"postgres://[email protected]:6875/materialize",
Expand All @@ -1660,11 +1711,10 @@ def measure_downtime() -> None:
except psycopg.OperationalError:
connect_port_forward = True
continue
runtime = time.time() - start_time
if runtime > 2:
print(f"Downtime: {runtime}s")
assert runtime < 15, f"SELECT 1 took more than 15s: {runtime}s"
time.sleep(10)
runtime = time.time() - start_time - 1
print(f"Time: {runtime}s")
if runtime > 1:
downtimes.append(runtime)
start_time = time.time()
finally:
if port_forward_process:
Expand All @@ -1684,6 +1734,20 @@ def measure_downtime() -> None:
running = False
thread.join()

assert len(downtimes) == 2, f"Wrong number of downtimes: {downtimes}"

test_failed = False
max_downtime = 15
for downtime in downtimes:
if downtime > max_downtime:
print(f"SELECT 1 took more than {max_downtime}s: {downtime}s")
test_failed = True

upload_upgrade_downtime_to_test_analytics(
c, downtimes[0], downtimes[1], not test_failed
)
assert not test_failed


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
parser.add_argument(
Expand Down