Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 5 additions & 24 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
import queue
import random
import re
import time
from contextlib import contextmanager
Expand All @@ -16,7 +15,6 @@
Mapping,
Optional,
Sequence,
Tuple,
TypedDict,
Union,
cast,
Expand Down Expand Up @@ -117,19 +115,6 @@ def __init__(
def fallback_pool_enabled(self) -> bool:
return state.get_config("use_fallback_host_in_native_connection_pool", 0) == 1

def get_fallback_host(self) -> Tuple[str, int]:
config_hosts_str = state.get_config(f"fallback_hosts:{self.host}:{self.port}", None)
assert config_hosts_str, f"no fallback hosts found for {self.host}:{self.port}"

config_hosts = cast(str, config_hosts_str).split(",")
selected_host_port = random.choice(config_hosts).split(":")

assert (
len(selected_host_port) == 2
), f"expected host:port format in fallback hosts for {self.host}:{self.port}"

return (selected_host_port[0], int(selected_host_port[1]))

# This will actually return an int if an INSERT query is run, but we never capture the
# output of INSERT queries so I left this as a Sequence.
def execute(
Expand All @@ -152,8 +137,6 @@ def execute(
return relatively quickly with an error in case of more persistent
failures.
"""
fallback_mode = False

try:
conn = self.pool.get(block=True)

Expand All @@ -167,7 +150,7 @@ def execute(
# Lazily create connection instances
if conn is None:
self.__gauge.increment()
conn = self._create_conn(fallback_mode)
conn = self._create_conn()

try:

This comment was marked as outdated.

if capture_trace:
Expand Down Expand Up @@ -227,7 +210,7 @@ def query_execute() -> Any:
return result
except (errors.NetworkError, errors.SocketTimeoutError, EOFError) as e:
metrics.increment(
("connection_error" if not fallback_mode else "fallback_connection_error"),
"connection_error",
tags={
"host": self.host,
"port": str(self.port),
Expand Down Expand Up @@ -364,12 +347,10 @@ def execute_robust(
except errors.Error as e:
raise ClickhouseError(e.message, code=e.code) from e

def _create_conn(self, use_fallback_host: bool = False) -> Client:
if use_fallback_host:
(fallback_host, fallback_port) = self.get_fallback_host()
def _create_conn(self) -> Client:
return Client(
host=(self.host if not use_fallback_host else fallback_host),
port=(self.port if not use_fallback_host else fallback_port),
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
Expand Down
12 changes: 0 additions & 12 deletions tests/clickhouse/test_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,6 @@ def test_concurrency_limit() -> None:
CLUSTER_PORT = 100


@pytest.mark.redis_db
def test_get_fallback_host() -> None:
FALLBACK_HOSTS_CONFIG_VAL = "host1:100,host2:100,host3:100"
FALLBACK_HOSTS = [("host1", 100), ("host2", 100), ("host3", 100)]

state.set_config(f"fallback_hosts:{CLUSTER_HOST}:{CLUSTER_PORT}", FALLBACK_HOSTS_CONFIG_VAL)

pool = ClickhousePool(CLUSTER_HOST, CLUSTER_PORT, "test", "test", TEST_DB_NAME)

assert pool.get_fallback_host() in FALLBACK_HOSTS


@pytest.mark.redis_db
def test_fallback_logic() -> None:
state.set_config("use_fallback_host_in_native_connection_pool", 1)
Expand Down
Loading