Skip to content
Merged

Osmos #440

Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM mcr.microsoft.com/devcontainers/miniconda:0-3

# commenting out mamba install, is given an error, see:
# https://github.com/conda/conda-libmamba-solver/issues/540
RUN conda install -n base -c conda-forge mamba
RUN conda install -n base -c conda-forge

# Copy environment.yml (if found) to a temp location so we update the environment. Also
# copy "noop.txt" so the COPY instruction does not fail if no environment.yml exists.
Expand Down
2 changes: 2 additions & 0 deletions hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ async def docker_update_dn_info(app):
log.error("HEAD node seems to be down.")
app["dn_urls"] = []
app["dn_ids"] = []
except HTTPServiceUnavailable:
log.warn("Head ServiceUnavailable")
except OSError:
log.error("failed to register")
app["dn_urls"] = []
Expand Down
31 changes: 11 additions & 20 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
bucket = domain_bucket

if obj_id in pending_s3_write:
msg = f"write_s3_key - not expected for key {obj_id} to be in "
msg += "pending_s3_write map"
msg = f"write_s3_key - not expected for key {obj_id} to be in pending_s3_write map"
log.error(msg)
raise KeyError(msg)

Expand All @@ -172,12 +171,10 @@ async def write_s3_obj(app, obj_id, bucket=None):
# timestamp is first element of two-tuple
last_update_time = dirty_ids[obj_id][0]
else:
msg = f"write_s3_obj - {obj_id} not in dirty_ids, "
msg += "assuming flush write"
msg = f"write_s3_obj - {obj_id} not in dirty_ids, assuming flush write"
log.debug(msg)
if last_update_time > now:
msg = f"last_update time {last_update_time} is in the future for "
msg += f"obj_id: {obj_id}"
msg = f"last_update time {last_update_time} is in the future for obj_id: {obj_id}"
log.error(msg)
raise ValueError(msg)

Expand All @@ -198,8 +195,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
dset_id = getDatasetId(obj_id)
if dset_id in filter_map:
filter_ops = filter_map[dset_id]
msg = f"write_s3_obj: got filter_op: {filter_ops} "
msg += f"for dset: {dset_id}"
msg = f"write_s3_obj: got filter_op: {filter_ops} for dset: {dset_id}"
log.debug(msg)
else:
filter_ops = None
Expand Down Expand Up @@ -237,13 +233,11 @@ async def write_s3_obj(app, obj_id, bucket=None):
# meta data update
# check for object in meta cache
if obj_id not in meta_cache:
msg = f"write_s3_obj: expected to find obj_id: {obj_id} "
msg += "in meta cache"
msg = f"write_s3_obj: expected to find obj_id: {obj_id} in meta cache"
log.error(msg)
raise KeyError(f"{obj_id} not found in meta cache")
if not meta_cache.isDirty(obj_id):
msg = f"write_s3_obj: expected meta cache obj {obj_id} "
msg == "to be dirty"
msg = f"write_s3_obj: expected meta cache obj {obj_id} to be dirty"
log.error(msg)
raise ValueError("bad dirty state for obj")
obj_json = meta_cache[obj_id]
Expand All @@ -264,8 +258,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
else:
timestamp = 0
if timestamp > last_update_time:
msg = f"write_s3_obj: {obj_id} got updated while s3 "
msg += "write was in progress"
msg = f"write_s3_obj: {obj_id} got updated while s3 write was in progress"
log.info(msg)
else:
log.debug(f"write_s3obj: clear dirty for {obj_id} ")
Expand All @@ -279,11 +272,10 @@ async def write_s3_obj(app, obj_id, bucket=None):

finally:
# clear pending_s3_write item
log.debug(f"write_s3_obj finally block, success={success}")
log.debug(f"write_s3_obj {obj_id} finally block, success={success}")
if obj_id in pending_s3_write:
if pending_s3_write[obj_id] != now:
msg = "pending_s3_write timestamp got updated unexpectedly "
msg += f"for {obj_id}"
msg = f"pending_s3_write timestamp for got updated unexpectedly for {obj_id}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the error message: "pending_s3_write timestamp for got updated unexpectedly for {obj_id}" seems to have an extra or misplaced 'for'. Consider revising it, e.g., "pending_s3_write timestamp got updated unexpectedly for {obj_id}".

Suggested change
msg = f"pending_s3_write timestamp for got updated unexpectedly for {obj_id}"
msg = f"pending_s3_write timestamp got updated unexpectedly for {obj_id}"

log.error(msg)
del pending_s3_write[obj_id]
# clear task
Expand Down Expand Up @@ -1259,10 +1251,9 @@ def callback(future):

if obj_id in pending_s3_write:
pending_time = s3sync_start - pending_s3_write[obj_id]
msg = f"s3sync - key {obj_id} has been pending for "
msg += f"{pending_time:.3f}"
msg = f"s3sync - key {obj_id} has been pending for {pending_time:.3f}"
log.debug(msg)
if s3sync_start - pending_s3_write[obj_id] > s3_sync_task_timeout:
if pending_time > s3_sync_task_timeout:
msg = f"s3sync - obj {obj_id} has been in pending_s3_write "
msg += f"for {pending_time:.3f} seconds, restarting"
log.warn(msg)
Expand Down
100 changes: 63 additions & 37 deletions hsds/headnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def is_healthy(self):
async def isClusterReady(app):
sn_count = 0
dn_count = 0
active_sn_ids = app["active_sn_ids"]
active_dn_ids = app["active_dn_ids"]
target_sn_count = await getTargetNodeCount(app, "sn")
target_dn_count = await getTargetNodeCount(app, "dn")
last_create_time = None
Expand All @@ -115,9 +117,11 @@ async def isClusterReady(app):
if last_create_time is None or node.create_time > last_create_time:
last_create_time = node.create_time
if node.type == "sn":
sn_count += 1
if node_id in active_sn_ids:
sn_count += 1
else:
dn_count += 1
if node_id in active_dn_ids:
dn_count += 1
if sn_count == 0 or dn_count == 0:
log.debug("no nodes, cluster not ready")
return False
Expand Down Expand Up @@ -171,6 +175,20 @@ async def info(request):
return resp


def getNodeUrls(nodes, node_ids):
""" return a list of node urls for the given set of node ids """

node_urls = []
for node_id in node_ids:
if node_id:
node = nodes[node_id]
node_url = f"http://{node.host}:{node.port}"
node_urls.append(node_url)
else:
node_urls.append(None)
return node_urls


async def register(request):
"""HTTP method for nodes to register with head node"""
app = request.app
Expand Down Expand Up @@ -208,7 +226,7 @@ async def register(request):
log.debug("register - get ip/port from request.transport")
peername = request.transport.get_extra_info("peername")
if peername is None:
msg = "Can not determine caller IP"
msg = "Cannot determine caller IP"
log.error(msg)
raise HTTPBadRequest(reason=msg)
if peername[0] is None or peername[0] in ("::1", "127.0.0.1"):
Expand Down Expand Up @@ -255,10 +273,34 @@ async def register(request):
node_host=node_host,
node_port=node_port,
)
# delete any existing node with the same port
# delete any existing node with the same port and IP
removeNode(app, host=node_host, port=node_port)
nodes[node_id] = node

# add to the active list if there's an open slot
if node_type == "sn":
active_list = app["active_sn_ids"]
else:
active_list = app["active_dn_ids"]

tgt_count = len(active_list)
active_count = sum(id is not None for id in active_list)
if tgt_count == active_count:
# all the slots are filled, see if there is any unhealthy node
# and remove that
for i in range(len(active_list)):
id = active_list[i]
node = nodes[id]
if not node.is_healthy():
active_list[i] = None # clear the slot
break

for i in range(len(active_list)):
if not active_list[i]:
log.info(f"Node {node_id} added to {node_type} active list in slot: {i}")
active_list[i] = node_id
break

resp = StreamResponse()
resp.headers["Content-Type"] = "application/json"
answer = {}
Expand All @@ -267,38 +309,14 @@ async def register(request):
answer["cluster_state"] = "READY"
else:
answer["cluster_state"] = "WAITING"
sn_urls = []
dn_urls = []
sn_ids = []
dn_ids = []
for node_id in nodes:
node = nodes[node_id]
if not node.is_healthy():
continue
node_url = f"http://{node.host}:{node.port}"
if node.type == "sn":
sn_urls.append(node_url)
sn_ids.append(node_id)
else:
dn_urls.append(node_url)
dn_ids.append(node_id)

# sort dn_urls so node number can be determined
dn_id_map = {}
for i in range(len(dn_urls)):
dn_url = dn_urls[i]
dn_id = dn_ids[i]
dn_id_map[dn_url] = dn_id

dn_urls.sort()
dn_ids = [] # re-arrange to match url order
for dn_url in dn_urls:
dn_ids.append(dn_id_map[dn_url])
sn_urls = getNodeUrls(nodes, app["active_sn_ids"])
dn_urls = getNodeUrls(nodes, app["active_dn_ids"])

answer["sn_ids"] = app["active_sn_ids"]
answer["sn_urls"] = sn_urls
answer["dn_ids"] = app["active_dn_ids"]
answer["dn_urls"] = dn_urls
answer["sn_ids"] = sn_ids
answer["dn_ids"] = dn_ids
answer["req_ip"] = node_host
log.debug(f"register returning: {answer}")
app["last_health_check"] = int(time.time())
Expand Down Expand Up @@ -410,7 +428,7 @@ async def nodeinfo(request):
async def getTargetNodeCount(app, node_type):

if node_type == "dn":
key = "target_sn_count"
key = "target_dn_count"
elif node_type == "sn":
key = "target_sn_count"
else:
Expand All @@ -430,7 +448,12 @@ async def getTargetNodeCount(app, node_type):
def getActiveNodeCount(app, node_type):
count = 0
nodes = app["nodes"]
for node_id in nodes:
if node_type == "sn":
active_list = app["active_sn_ids"]
else:
active_list = app["active_dn_ids"]

for node_id in active_list:
node = nodes[node_id]
if node.type != node_type:
continue
Expand Down Expand Up @@ -462,8 +485,6 @@ async def init():

app["head_port"] = config.get("head_port")

nodes = {}

# check to see if we are running in a DCOS cluster
if "MARATHON_APP_ID" in os.environ:
msg = "Found MARATHON_APP_ID environment variable, setting "
Expand All @@ -473,7 +494,12 @@ async def init():
else:
log.info("not setting is_dcos")

app["nodes"] = nodes
target_sn_count = await getTargetNodeCount(app, "sn")
target_dn_count = await getTargetNodeCount(app, "dn")

app["nodes"] = {}
app["active_sn_ids"] = [None, ] * target_sn_count
app["active_dn_ids"] = [None, ] * target_dn_count
app["dead_node_ids"] = set()
app["start_time"] = int(time.time()) # seconds after epoch
app["last_health_check"] = 0
Expand Down
2 changes: 1 addition & 1 deletion hsds/util/azureBlobClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
if isinstance(e, AzureError):
if e.status_code == 404:
msg = f"storage key: {key} not found "
log.warn(msg)
log.info(msg)
raise HTTPNotFound()
elif e.status_code in (401, 403):
msg = f"azureBlobClient.access denied for get key: {key}"
Expand Down
14 changes: 7 additions & 7 deletions hsds/util/fileClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
log.info(msg)
except FileNotFoundError:
msg = f"fileClient: {key} not found "
log.warn(msg)
log.info(msg)
raise HTTPNotFound()
except IOError as ioe:
msg = f"fileClient: IOError reading {bucket}/{key}: {ioe}"
Expand All @@ -166,8 +166,8 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
except CancelledError as cle:
self._file_stats_increment("error_count")
msg = f"CancelledError for get file obj {key}: {cle}"
log.error(msg)
raise HTTPInternalServerError()
log.warn(msg)
raise
except Exception as e:
self._file_stats_increment("error_count")
msg = f"Unexpected Exception {type(e)} get get_object {key}: {e}"
Expand Down Expand Up @@ -227,8 +227,8 @@ async def put_object(self, key, data, bucket=None):
except CancelledError as cle:
# file_stats_increment(app, "error_count")
msg = f"CancelledError for put file obj {key}: {cle}"
log.error(msg)
raise HTTPInternalServerError()
log.warn(msg)
raise

except Exception as e:
# file_stats_increment(app, "error_count")
Expand Down Expand Up @@ -274,8 +274,8 @@ async def delete_object(self, key, bucket=None):
except CancelledError as cle:
self._file_stats_increment("error_count")
msg = f"CancelledError deleting file obj {key}: {cle}"
log.error(msg)
raise HTTPInternalServerError()
log.warn(msg)
raise

except Exception as e:
self._file_stats_increment("error_count")
Expand Down
4 changes: 4 additions & 0 deletions hsds/util/idUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,5 +536,9 @@ def getDataNodeUrl(app, obj_id):
raise HTTPServiceUnavailable()
dn_number = getObjPartition(obj_id, dn_node_count)
url = dn_urls[dn_number]
if not url:
msg = "Service not ready (no DN url set)"
log.warn(msg)
raise HTTPServiceUnavailable()
log.debug(f"got dn_url: {url} for obj_id: {obj_id}")
return url
Loading