@@ -1068,6 +1068,75 @@ let refresh { verbose; _ } {
10681068 | Error error -> fail_lwt " refresh error:\n %s" error
10691069 | Ok result -> Lwt_io. printl result
10701070
1071+ type reroute_action =
1072+ | AllocateReplica of { index : string ; shard : int ; node : string ; }
1073+ | AllocateEmptyPrimary of { index : string ; shard : int ; node : string ; }
1074+
1075+ type reroute_args = {
1076+ host : string ;
1077+ actions : reroute_action list ;
1078+ explain : bool ;
1079+ retry_failed : bool ;
1080+ }
1081+
1082+ let confirm_data_loss index shard node =
1083+ let % lwt () = Lwt_io. eprintlf
1084+ " WARNING: You are about to allocate an empty primary shard for index '%s', shard %d to node '%s'."
1085+ index shard node in
1086+ let % lwt () = Lwt_io. eprintl " This operation will result in DATA LOSS for this shard!" in
1087+ let % lwt () = Lwt_io. eprint " Are you sure you want to continue? (yes/no): " in
1088+ let % lwt () = Lwt_io. flush Lwt_io. stderr in
1089+ let % lwt response = Lwt_io. read_line Lwt_io. stdin in
1090+ match String. lowercase_ascii (String. trim response) with
1091+ | "yes" -> Lwt. return true
1092+ | _ -> Lwt. return false
1093+
1094+ let reroute { verbose; _ } {
1095+ host;
1096+ actions;
1097+ explain;
1098+ retry_failed;
1099+ } =
1100+ let config = Common. load_config () in
1101+ let { Common. host; _ } = Common. get_cluster config host in
1102+ Lwt_main. run @@
1103+ let % lwt confirmed_actions =
1104+ Lwt_list. filter_map_s begin function
1105+ | AllocateReplica { index; shard; node; } ->
1106+ let cmd = { Elastic_t. allocate_replica = Some { index; shard; node; };
1107+ allocate_empty_primary = None ; move = None ; cancel = None ; } in
1108+ Lwt. return (Some cmd)
1109+ | AllocateEmptyPrimary { index; shard; node; } ->
1110+ let % lwt confirmed = confirm_data_loss index shard node in
1111+ if confirmed then
1112+ let cmd = { Elastic_t. allocate_replica = None ;
1113+ allocate_empty_primary = Some { index; shard; node; accept_data_loss = true ; };
1114+ move = None ; cancel = None ; } in
1115+ Lwt. return (Some cmd)
1116+ else
1117+ let % lwt () = Lwt_io. eprintl " Operation cancelled." in
1118+ Lwt. return None
1119+ end actions
1120+ in
1121+ match confirmed_actions with
1122+ | [] ->
1123+ let % lwt () = Lwt_io. eprintl " No operations to perform." in
1124+ Lwt. return_unit
1125+ | commands ->
1126+ let reroute_request = { Elastic_t. commands;
1127+ dry_run = None ;
1128+ explain = None ;
1129+ retry_failed = None ; } in
1130+ let body = (JSON (Elastic_j. string_of_reroute_request reroute_request) : content_type) in
1131+ let args = [
1132+ " metric" , Some (Some " none" );
1133+ " explain" , if explain then Some (Some " true" ) else None ;
1134+ " retry_failed" , if retry_failed then Some (Some " true" ) else None ;
1135+ ] in
1136+ match % lwt request ~verbose ~body `POST host [ Some " _cluster" ; Some " reroute" ; ] args id with
1137+ | Error error -> fail_lwt " reroute error:\n %s" error
1138+ | Ok result -> Lwt_io. printl result
1139+
10711140type aggregation_field = {
10721141 field : string ;
10731142}
@@ -1964,6 +2033,66 @@ let refresh_tool =
19642033 let man = [] in
19652034 info " refresh" ~doc ~sdocs: Manpage. s_common_options ~exits ~man
19662035
2036+ let reroute_tool =
2037+ let open Common_args in
2038+ let % map common_args = common_args
2039+ and host = host
2040+ and allocate_replica =
2041+ let doc = " allocate replica shard to node (format: INDEX:SHARD:NODE)" in
2042+ Arg. (value & opt_all string [] & info [ " r" ; " allocate-replica" ; ] ~docv: " INDEX:SHARD:NODE" ~doc )
2043+ and allocate_empty_primary =
2044+ let doc = " allocate empty primary shard to node (format: INDEX:SHARD:NODE) - WARNING: CAUSES DATA LOSS!" in
2045+ Arg. (value & opt_all string [] & info [ " p" ; " allocate-empty-primary" ; ] ~docv: " INDEX:SHARD:NODE" ~doc )
2046+ and explain = Arg. (value & flag & info [ " e" ; " explain" ; ] ~doc: " explain the reroute decisions" )
2047+ and retry_failed = Arg. (value & flag & info [ " f" ; " retry-failed" ; ] ~doc: " retry failed allocations" )
2048+ in
2049+ let parse_allocation spec =
2050+ match String. split_on_char ':' spec with
2051+ | [index; shard_str; node] ->
2052+ (match int_of_string shard_str with
2053+ | shard -> Some (index, shard, node)
2054+ | exception _ -> None )
2055+ | _ -> None
2056+ in
2057+ let replica_actions =
2058+ List. filter_map (fun spec ->
2059+ match parse_allocation spec with
2060+ | Some (index , shard , node ) -> Some (AllocateReplica { index; shard; node; })
2061+ | None -> failwith (" Invalid replica allocation format: " ^ spec)
2062+ ) allocate_replica
2063+ in
2064+ let primary_actions =
2065+ List. filter_map (fun spec ->
2066+ match parse_allocation spec with
2067+ | Some (index , shard , node ) -> Some (AllocateEmptyPrimary { index; shard; node; })
2068+ | None -> failwith (" Invalid primary allocation format: " ^ spec)
2069+ ) allocate_empty_primary
2070+ in
2071+ reroute common_args {
2072+ host;
2073+ actions = replica_actions @ primary_actions;
2074+ explain;
2075+ retry_failed;
2076+ }
2077+
2078+ let reroute_tool =
2079+ reroute_tool,
2080+ let open Term in
2081+ let doc = " reroute shards (allocate replica or empty primary shards)" in
2082+ let exits = default_exits in
2083+ let man = [
2084+ `S Manpage. s_description;
2085+ `P " The reroute command allows manual allocation of shards to specific nodes." ;
2086+ `P " Use -r to allocate replica shards (safe operation)." ;
2087+ `P " Use -p to allocate empty primary shards (WARNING: causes data loss!)." ;
2088+ `S Manpage. s_examples;
2089+ `P " Allocate replica shard to a specific node:" ;
2090+ `P " $(tname) reroute cluster -r myindex:900:mynode" ;
2091+ `P " Allocate empty primary shard (with data loss confirmation):" ;
2092+ `P " $(tname) reroute cluster -p myindex:0:mynode" ;
2093+ ] in
2094+ info " reroute" ~doc ~sdocs: Manpage. s_common_options ~exits ~man
2095+
19672096let search_tool =
19682097 let aggregation =
19692098 let module Let_syntax =
@@ -2204,6 +2333,7 @@ let tools = [
22042333 put_tool;
22052334 recovery_tool;
22062335 refresh_tool;
2336+ reroute_tool;
22072337 search_tool;
22082338 settings_tool;
22092339]
0 commit comments