Skip to content

Commit f7b8dcb

Browse files
authored
Merge branch 'master' into fix/batch_append_update_bust_version_chain
2 parents f5a1f20 + 810df8c commit f7b8dcb

File tree

5 files changed

+141
-10
lines changed

5 files changed

+141
-10
lines changed

python/arcticdb/storage_fixtures/azure.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import stat
1414
import tempfile
1515
import uuid
16+
import requests
1617
from typing import TYPE_CHECKING, Optional, Union
1718
from tempfile import mkdtemp
1819

@@ -214,8 +215,25 @@ def _safe_enter(self):
214215
process_start_cmd=args,
215216
cwd=self.working_dir,
216217
)
218+
if not self.ssl_test_support:
219+
# Do not verify when ssl test support because of need of proper certs
220+
assert self.is_azurite(self.endpoint_root), f"Azurite not started at: {self.endpoint_root}"
217221
return self
218222

223+
def is_azurite(self, url: str, timeout: int = 60):
224+
try:
225+
response = requests.get(url, timeout=timeout)
226+
headers = response.headers
227+
228+
# Check for Azurite-specific headers
229+
server_header = headers.get("Server", "").lower()
230+
has_azurite_headers = "x-ms-request-id" in str(headers) and "azurite" in server_header
231+
232+
return has_azurite_headers
233+
234+
except requests.RequestException:
235+
return False
236+
219237
def __exit__(self, exc_type, exc_value, traceback):
220238
with handle_cleanup_exception(self, "process", consequence="Subsequent file deletion may also fail. "):
221239
GracefulProcessUtils.terminate(self._p)

python/arcticdb/storage_fixtures/mongo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class ManagedMongoDBServer(StorageFixtureFactory):
117117

118118
def __init__(self, data_dir: Optional[str] = None, port=0, executable="mongod"):
119119
self._data_dir = data_dir or tempfile.mkdtemp("ManagedMongoDBServer")
120-
self._port = port or get_ephemeral_port(5)
120+
self._port = port or get_ephemeral_port(7)
121121
self._executable = executable
122122
self._client = None
123123

python/arcticdb/storage_fixtures/s3.py

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import re
1414
import sys
1515
import platform
16+
import pprint
1617
from tempfile import mkdtemp
1718
from urllib.parse import urlparse
1819
import boto3
@@ -688,13 +689,20 @@ def __call__(self, environ, start_response):
688689
else:
689690
self._reqs_till_rate_limit -= 1
690691

692+
# Lets add ability to identify type as S3
693+
if "/whoami" in path_info:
694+
start_response("200 OK", [("Content-Type", "text/plain")])
695+
return [b"Moto AWS S3"]
696+
691697
return super().__call__(environ, start_response)
692698

693699

694700
class GcpHostDispatcherApplication(HostDispatcherApplication):
695701
"""GCP's S3 implementation does not have batch delete."""
696702

697703
def __call__(self, environ, start_response):
704+
path_info: bytes = environ.get("PATH_INFO", "")
705+
698706
if environ["REQUEST_METHOD"] == "POST" and environ["QUERY_STRING"] == "delete":
699707
response_body = (
700708
b'<?xml version="1.0" encoding="UTF-8"?>'
@@ -708,6 +716,12 @@ def __call__(self, environ, start_response):
708716
"501 Not Implemented", [("Content-Type", "text/xml"), ("Content-Length", str(len(response_body)))]
709717
)
710718
return [response_body]
719+
720+
# Lets add ability to identify type as GCP
721+
if "/whoami" in path_info:
722+
start_response("200 OK", [("Content-Type", "text/plain")])
723+
return [b"Moto GCP"]
724+
711725
return super().__call__(environ, start_response)
712726

713727

@@ -731,6 +745,20 @@ def run_gcp_server(port, key_file, cert_file):
731745
)
732746

733747

748+
def is_server_type(url: str, server_type: str):
749+
"""Check if a server is of certain type.
750+
751+
/whoami url is added to Moto* objects to identify GCP or S3"""
752+
try:
753+
response = requests.get(url, verify=False)
754+
if response.status_code == 200 and server_type in response.text:
755+
return True
756+
except Exception as e:
757+
logger.error(f"Error during server type check: {e}")
758+
logger.error(f"Was not of expected type: status code {response.status_code}, text: {response.text}")
759+
return False
760+
761+
734762
def create_bucket(s3_client, bucket_name, max_retries=15):
735763
for i in range(max_retries):
736764
try:
@@ -741,6 +769,9 @@ def create_bucket(s3_client, bucket_name, max_retries=15):
741769
raise
742770
logger.warning(f"S3 create bucket failed. Retry {1}/{max_retries}")
743771
time.sleep(1)
772+
except Exception as e:
773+
logger.error(f"create_bucket - Error: {e.response['Error']['Message']}")
774+
pprint.pprint(e.response)
744775

745776

746777
class MotoS3StorageFixtureFactory(BaseS3StorageFixtureFactory):
@@ -791,10 +822,19 @@ def bucket_name(self, bucket_type="s3"):
791822
# We need the unique_id because we have tests that are creating the factory directly
792823
# and not using the fixtures
793824
# so this guarantees a unique bucket name
794-
return f"test_{bucket_type}_bucket_{self.unique_id}_{self._bucket_id}"
825+
return f"test-{bucket_type}-bucket-{self.unique_id}-{self._bucket_id}"
795826

796-
def _start_server(self):
797-
port = self.port = get_ephemeral_port(2)
827+
def is_server_type(url: str, server_type: str):
828+
try:
829+
response = requests.get(url, verify=False)
830+
if response.status_code == 200 and server_type in response.text:
831+
return True
832+
except Exception:
833+
pass
834+
return False
835+
836+
def _start_server(self, seed=2):
837+
port = self.port = get_ephemeral_port(seed)
798838
self.endpoint = f"{self.http_protocol}://{self.host}:{port}"
799839
self.working_dir = mkdtemp(suffix="MotoS3StorageFixtureFactory")
800840
self._iam_endpoint = f"{self.http_protocol}://localhost:{port}"
@@ -826,17 +866,22 @@ def _start_server(self):
826866
# There is a problem with the performance of the socket module in the MacOS 15 GH runners - https://github.com/actions/runner-images/issues/12162
827867
# Due to this, we need to wait for the server to come up for a longer time
828868
wait_for_server_to_come_up(self.endpoint, "moto", self._p, timeout=240)
869+
assert is_server_type(self.endpoint + "/whoami", "S3"), "The server has not identified as S3"
829870

830871
def _safe_enter(self):
831-
for _ in range(3): # For unknown reason, Moto, when running in pytest-xdist, will randomly fail to start
872+
for i in range(5): # For unknown reason, Moto, when running in pytest-xdist, will randomly fail to start
832873
try:
833-
self._start_server()
874+
logger.info(f"Attempt to start server - {i}")
875+
self._start_server(2 + i)
876+
self._s3_admin = self._boto(service="s3", key=self.default_key)
877+
logger.info(f"Moto S3 STARTED!!! on port {self.port}")
834878
break
835879
except AssertionError as e: # Thrown by wait_for_server_to_come_up
836880
sys.stderr.write(repr(e))
837881
GracefulProcessUtils.terminate(self._p)
882+
except Exception as e:
883+
logger.error(f"Error during startup of Moto S3. Trying again. Error: {e}")
838884

839-
self._s3_admin = self._boto(service="s3", key=self.default_key)
840885
return self
841886

842887
def __exit__(self, exc_type, exc_value, traceback):
@@ -928,8 +973,8 @@ def create_fixture(self) -> NfsS3Bucket:
928973

929974

930975
class MotoGcpS3StorageFixtureFactory(MotoS3StorageFixtureFactory):
931-
def _start_server(self):
932-
port = self.port = get_ephemeral_port(3)
976+
def _start_server(self, seed=20):
977+
port = self.port = get_ephemeral_port(seed)
933978
self.endpoint = f"{self.http_protocol}://{self.host}:{port}"
934979
self.working_dir = mkdtemp(suffix="MotoGcpS3StorageFixtureFactory")
935980
self._iam_endpoint = f"{self.http_protocol}://localhost:{port}"
@@ -961,6 +1006,7 @@ def _start_server(self):
9611006
# There is a problem with the performance of the socket module in the MacOS 15 GH runners - https://github.com/actions/runner-images/issues/12162
9621007
# Due to this, we need to wait for the server to come up for a longer time
9631008
wait_for_server_to_come_up(self.endpoint, "moto", self._p, timeout=240)
1009+
assert is_server_type(self.endpoint + "/whoami", "GCP"), "The server has not identified as GCP"
9641010

9651011
def create_fixture(self) -> GcpS3Bucket:
9661012
bucket = self.bucket_name("gcp")

python/tests/integration/arcticdb/test_s3.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ def test_wrapped_s3_storage(lib_name, wrapped_s3_storage_bucket):
196196
lib.write("s", data=create_df())
197197

198198

199-
@SKIP_CONDA_MARK # issue with fixture init will be fixed in https://github.com/man-group/ArcticDB/issues/2640
200199
def test_library_get_key_path(lib_name, s3_and_nfs_storage_bucket, test_prefix):
201200
lib = s3_and_nfs_storage_bucket.create_version_store_factory(lib_name)()
202201
lib.write("s", data=create_df())

python/tests/integration/arcticdb/version_store/test_basic_version_store.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
)
3232
from arcticdb import QueryBuilder
3333
from arcticdb.flattener import Flattener
34+
from arcticdb.util.utils import generate_random_numpy_array, generate_random_series
3435
from arcticdb.version_store import NativeVersionStore
3536
from arcticdb.version_store._store import VersionedItem
3637
from arcticdb_ext.exceptions import _ArcticLegacyCompatibilityException, StorageException
@@ -1160,6 +1161,37 @@ def test_update_times(basic_store):
11601161
assert update_times_versioned[0] < update_times_versioned[1] < update_times_versioned[2]
11611162

11621163

1164+
@pytest.mark.storage
1165+
def test_update_time(basic_store):
1166+
lib: NativeVersionStore = basic_store
1167+
1168+
nparr = generate_random_numpy_array(50, np.float32, seed=None)
1169+
series = generate_random_series(np.uint64, 50, "numbers")
1170+
df = pd.DataFrame(data={"col1": np.arange(10)}, index=pd.date_range(pd.Timestamp(0), periods=10))
1171+
lib.write("sym1", nparr)
1172+
lib.write("sym1", series)
1173+
lib.snapshot("snap")
1174+
lib.write("sym1", df)
1175+
1176+
# Negative number for as_of works as expected (dataframes)
1177+
assert lib.update_time("sym1") == lib.update_time("sym1", -1) == lib.update_time("sym1", 2)
1178+
# Snapshots are accepted (series)
1179+
assert lib.update_time("sym1", 1) == lib.update_time("sym1", -2) == lib.update_time("sym1", "snap")
1180+
# and now check for np array
1181+
assert lib.update_time("sym1", 0) == lib.update_time("sym1", -3)
1182+
1183+
# Times are ordered
1184+
assert lib.update_time("sym1") > lib.update_time("sym1", 1) > lib.update_time("sym1", 0)
1185+
1186+
# Correct exception thrown if symbol does not exist
1187+
with pytest.raises(NoDataFoundException):
1188+
lib.update_time("sym12")
1189+
1190+
# Correct exception thrown if version does not exist
1191+
with pytest.raises(NoDataFoundException):
1192+
lib.update_time("sym1", 11)
1193+
1194+
11631195
@pytest.mark.storage
11641196
@pytest.mark.parametrize(
11651197
"index_names",
@@ -2556,6 +2588,7 @@ def test_batch_restore_version_mixed_as_ofs(lmdb_version_store):
25562588
second_ts = pd.Timestamp(2000)
25572589
ManualClockVersionStore.time = second_ts.value
25582590
lib.batch_write(syms, second_data, second_metadata)
2591+
lib.snapshot("snap")
25592592

25602593
third_ts = pd.Timestamp(3000)
25612594
ManualClockVersionStore.time = third_ts.value
@@ -2587,6 +2620,32 @@ def test_batch_restore_version_mixed_as_ofs(lmdb_version_store):
25872620
assert latest["s2"].metadata == "s2-2"
25882621
assert latest["s3"].metadata == "s3-1"
25892622

2623+
# check restore from snapshot and negative ver number
2624+
res = lib.batch_restore_version(syms, [-3, "snap", -1])
2625+
2626+
# check returned data
2627+
assert res[0].symbol == "s1"
2628+
assert res[1].symbol == "s2"
2629+
assert res[2].symbol == "s3"
2630+
assert res[0].version == 3
2631+
assert res[1].version == 4
2632+
assert res[2].version == 3 # We restored last version (-1 is last)
2633+
assert res[0].metadata == "s1-1"
2634+
assert res[1].metadata == "s2-2"
2635+
assert res[2].metadata == "s3-1"
2636+
2637+
# check latest version of symbols from the read
2638+
latest = lib.batch_read(syms)
2639+
assert latest["s1"].version == 3
2640+
assert latest["s2"].version == 4
2641+
assert latest["s3"].version == 3
2642+
assert_equal(latest["s1"].data, first_data[0])
2643+
assert_equal(latest["s2"].data, second_data[1])
2644+
assert_equal(latest["s3"].data, first_data[2])
2645+
assert latest["s1"].metadata == "s1-1"
2646+
assert latest["s2"].metadata == "s2-2"
2647+
assert latest["s3"].metadata == "s3-1"
2648+
25902649

25912650
@pytest.mark.parametrize("bad_thing", ("symbol", "as_of", "duplicate"))
25922651
def test_batch_restore_version_bad_input_noop(lmdb_version_store, bad_thing):
@@ -2725,6 +2784,15 @@ def test_batch_restore_version(basic_store_tombstone):
27252784
assert_equal(read_df, dfs[d])
27262785

27272786

2787+
@pytest.mark.storage
2788+
def test_name_method(basic_store_factory):
2789+
2790+
for lib_name in ["my name", "1243"]:
2791+
lib: NativeVersionStore = basic_store_factory(name=lib_name)
2792+
assert lib_name == lib.name()
2793+
lib.version_store.clear()
2794+
2795+
27282796
@pytest.mark.storage
27292797
def test_batch_append(basic_store_tombstone, three_col_df):
27302798
lmdb_version_store = basic_store_tombstone

0 commit comments

Comments
 (0)