Skip to content

fix: update correct server mode in parseable.json #1332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

176 changes: 96 additions & 80 deletions src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,99 +110,115 @@ pub async fn resolve_parseable_metadata(
.as_ref()
.map(|meta| serde_json::from_slice(meta).expect("parseable config is valid json"));

// Env Change needs to be updated
let check = determine_environment(staging_metadata, remote_metadata);
// flags for if metadata needs to be synced
let mut overwrite_staging = false;
let mut overwrite_remote = false;

let res = match check {
EnvChange::None(metadata) => {
// overwrite staging anyways so that it matches remote in case of any divergence
overwrite_staging = true;
if PARSEABLE.options.mode == Mode::All {
metadata.server_mode.standalone_after_distributed()?;
}
Ok(metadata)
},
EnvChange::NewRemote => {
Err("Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server")
}
EnvChange::NewStaging(mut metadata) => {
let env_change = determine_environment(staging_metadata, remote_metadata);

// if server is started in ingest mode,we need to make sure that query mode has been started
// i.e the metadata is updated to reflect the server mode = Query
if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest {
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
} else {
create_dir_all(PARSEABLE.options.staging_dir())?;
metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?;
// this flag is set to true so that metadata is copied to staging
overwrite_staging = true;
// overwrite remote in all and query mode
// because staging dir has changed.
match PARSEABLE.options.mode {
Mode::All => {
metadata.server_mode.standalone_after_distributed()
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
overwrite_remote = true;
},
Mode::Query | Mode::Prism => {
overwrite_remote = true;
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
Mode::Ingest => {
// if ingest server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
Mode::Index => {
// if index server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
}
}
Ok(metadata)
}
}
EnvChange::CreateBoth => {
create_dir_all(PARSEABLE.options.staging_dir())?;
let metadata = StorageMetadata::default();
// new metadata needs to be set
// if mode is query or all then both staging and remote
match PARSEABLE.options.mode {
Mode::All | Mode::Query | Mode::Prism => overwrite_remote = true,
_ => (),
}
// else only staging
overwrite_staging = true;
Ok(metadata)
}
};

let mut metadata = res.map_err(|err| {
let err = format!("{}. {}", err, JOIN_COMMUNITY);
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.into();
ObjectStorageError::UnhandledError(err)
})?;
let (mut metadata, overwrite_staging, overwrite_remote) = process_env_change(env_change)?;

metadata.server_mode = PARSEABLE.options.mode;

if overwrite_remote {
put_remote_metadata(&metadata).await?;
}

if overwrite_staging {
put_staging_metadata(&metadata)?;
}

Ok(metadata)
}

fn process_env_change(
env_change: EnvChange,
) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
match env_change {
EnvChange::None(mut metadata) => handle_none_env(&mut metadata),
EnvChange::NewRemote => handle_new_remote_env(),
EnvChange::NewStaging(mut metadata) => handle_new_staging_env(&mut metadata),
EnvChange::CreateBoth => handle_create_both_env(),
}
}

fn handle_none_env(
metadata: &mut StorageMetadata,
) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
let overwrite_staging = true;
let mut overwrite_remote = false;

match PARSEABLE.options.mode {
Mode::All => {
metadata.server_mode.standalone_after_distributed()?;
overwrite_remote = true;
update_metadata_mode_and_staging(metadata);
}
Mode::Query => {
overwrite_remote = true;
update_metadata_mode_and_staging(metadata);
}
_ => {}
}
if PARSEABLE.options.mode == Mode::All {
metadata.server_mode.standalone_after_distributed()?;
}
Ok((metadata.clone(), overwrite_staging, overwrite_remote))
}

fn handle_new_remote_env() -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(format!(
"Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}",
JOIN_COMMUNITY
).into()))
}

fn handle_new_staging_env(
metadata: &mut StorageMetadata,
) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
if metadata.server_mode == Mode::All && PARSEABLE.options.mode == Mode::Ingest {
return Err(ObjectStorageError::UnhandledError(
format!(
"Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}",
JOIN_COMMUNITY
)
.into(),
));
}
create_dir_all(PARSEABLE.options.staging_dir())?;
metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?;
let overwrite_staging = true;
let mut overwrite_remote = false;

match PARSEABLE.options.mode {
Mode::All => {
metadata
.server_mode
.standalone_after_distributed()
.map_err(|err| ObjectStorageError::Custom(err.to_string()))?;
overwrite_remote = true;
}
Mode::Query | Mode::Prism | Mode::Ingest | Mode::Index => {
update_metadata_mode_and_staging(metadata);
if matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) {
overwrite_remote = true;
}
}
}
Ok((metadata.clone(), overwrite_staging, overwrite_remote))
}

fn handle_create_both_env() -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
create_dir_all(PARSEABLE.options.staging_dir())?;
let metadata = StorageMetadata::default();
let overwrite_remote = matches!(
PARSEABLE.options.mode,
Mode::All | Mode::Query | Mode::Prism
);
let overwrite_staging = true;
Ok((metadata, overwrite_staging, overwrite_remote))
}

fn update_metadata_mode_and_staging(metadata: &mut StorageMetadata) {
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
}

pub fn determine_environment(
staging_metadata: Option<StorageMetadata>,
remote_metadata: Option<StorageMetadata>,
Expand Down
Loading