|
3 | 3 | #include "exec_cmd.h"
|
4 | 4 | #include "sigchain.h"
|
5 | 5 | #include "argv-array.h"
|
| 6 | +#include "thread-utils.h" |
| 7 | +#include "strbuf.h" |
6 | 8 |
|
7 | 9 | void child_process_init(struct child_process *child)
|
8 | 10 | {
|
@@ -865,3 +867,336 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
|
865 | 867 | close(cmd->out);
|
866 | 868 | return finish_command(cmd);
|
867 | 869 | }
|
| 870 | + |
| 871 | +enum child_state { |
| 872 | + GIT_CP_FREE, |
| 873 | + GIT_CP_WORKING, |
| 874 | + GIT_CP_WAIT_CLEANUP, |
| 875 | +}; |
| 876 | + |
| 877 | +struct parallel_processes { |
| 878 | + void *data; |
| 879 | + |
| 880 | + int max_processes; |
| 881 | + int nr_processes; |
| 882 | + |
| 883 | + get_next_task_fn get_next_task; |
| 884 | + start_failure_fn start_failure; |
| 885 | + task_finished_fn task_finished; |
| 886 | + |
| 887 | + struct { |
| 888 | + enum child_state state; |
| 889 | + struct child_process process; |
| 890 | + struct strbuf err; |
| 891 | + void *data; |
| 892 | + } *children; |
| 893 | + /* |
| 894 | + * The struct pollfd is logically part of *children, |
| 895 | + * but the system call expects it as its own array. |
| 896 | + */ |
| 897 | + struct pollfd *pfd; |
| 898 | + |
| 899 | + unsigned shutdown : 1; |
| 900 | + |
| 901 | + int output_owner; |
| 902 | + struct strbuf buffered_output; /* of finished children */ |
| 903 | +}; |
| 904 | + |
| 905 | +static int default_start_failure(struct child_process *cp, |
| 906 | + struct strbuf *err, |
| 907 | + void *pp_cb, |
| 908 | + void *pp_task_cb) |
| 909 | +{ |
| 910 | + int i; |
| 911 | + |
| 912 | + strbuf_addstr(err, "Starting a child failed:"); |
| 913 | + for (i = 0; cp->argv[i]; i++) |
| 914 | + strbuf_addf(err, " %s", cp->argv[i]); |
| 915 | + |
| 916 | + return 0; |
| 917 | +} |
| 918 | + |
| 919 | +static int default_task_finished(int result, |
| 920 | + struct child_process *cp, |
| 921 | + struct strbuf *err, |
| 922 | + void *pp_cb, |
| 923 | + void *pp_task_cb) |
| 924 | +{ |
| 925 | + int i; |
| 926 | + |
| 927 | + if (!result) |
| 928 | + return 0; |
| 929 | + |
| 930 | + strbuf_addf(err, "A child failed with return code %d:", result); |
| 931 | + for (i = 0; cp->argv[i]; i++) |
| 932 | + strbuf_addf(err, " %s", cp->argv[i]); |
| 933 | + |
| 934 | + return 0; |
| 935 | +} |
| 936 | + |
| 937 | +static void kill_children(struct parallel_processes *pp, int signo) |
| 938 | +{ |
| 939 | + int i, n = pp->max_processes; |
| 940 | + |
| 941 | + for (i = 0; i < n; i++) |
| 942 | + if (pp->children[i].state == GIT_CP_WORKING) |
| 943 | + kill(pp->children[i].process.pid, signo); |
| 944 | +} |
| 945 | + |
| 946 | +static struct parallel_processes *pp_for_signal; |
| 947 | + |
| 948 | +static void handle_children_on_signal(int signo) |
| 949 | +{ |
| 950 | + kill_children(pp_for_signal, signo); |
| 951 | + sigchain_pop(signo); |
| 952 | + raise(signo); |
| 953 | +} |
| 954 | + |
| 955 | +static void pp_init(struct parallel_processes *pp, |
| 956 | + int n, |
| 957 | + get_next_task_fn get_next_task, |
| 958 | + start_failure_fn start_failure, |
| 959 | + task_finished_fn task_finished, |
| 960 | + void *data) |
| 961 | +{ |
| 962 | + int i; |
| 963 | + |
| 964 | + if (n < 1) |
| 965 | + n = online_cpus(); |
| 966 | + |
| 967 | + pp->max_processes = n; |
| 968 | + |
| 969 | + trace_printf("run_processes_parallel: preparing to run up to %d tasks", n); |
| 970 | + |
| 971 | + pp->data = data; |
| 972 | + if (!get_next_task) |
| 973 | + die("BUG: you need to specify a get_next_task function"); |
| 974 | + pp->get_next_task = get_next_task; |
| 975 | + |
| 976 | + pp->start_failure = start_failure ? start_failure : default_start_failure; |
| 977 | + pp->task_finished = task_finished ? task_finished : default_task_finished; |
| 978 | + |
| 979 | + pp->nr_processes = 0; |
| 980 | + pp->output_owner = 0; |
| 981 | + pp->shutdown = 0; |
| 982 | + pp->children = xcalloc(n, sizeof(*pp->children)); |
| 983 | + pp->pfd = xcalloc(n, sizeof(*pp->pfd)); |
| 984 | + strbuf_init(&pp->buffered_output, 0); |
| 985 | + |
| 986 | + for (i = 0; i < n; i++) { |
| 987 | + strbuf_init(&pp->children[i].err, 0); |
| 988 | + child_process_init(&pp->children[i].process); |
| 989 | + pp->pfd[i].events = POLLIN | POLLHUP; |
| 990 | + pp->pfd[i].fd = -1; |
| 991 | + } |
| 992 | + |
| 993 | + pp_for_signal = pp; |
| 994 | + sigchain_push_common(handle_children_on_signal); |
| 995 | +} |
| 996 | + |
| 997 | +static void pp_cleanup(struct parallel_processes *pp) |
| 998 | +{ |
| 999 | + int i; |
| 1000 | + |
| 1001 | + trace_printf("run_processes_parallel: done"); |
| 1002 | + for (i = 0; i < pp->max_processes; i++) { |
| 1003 | + strbuf_release(&pp->children[i].err); |
| 1004 | + child_process_clear(&pp->children[i].process); |
| 1005 | + } |
| 1006 | + |
| 1007 | + free(pp->children); |
| 1008 | + free(pp->pfd); |
| 1009 | + |
| 1010 | + /* |
| 1011 | + * When get_next_task added messages to the buffer in its last |
| 1012 | + * iteration, the buffered output is non empty. |
| 1013 | + */ |
| 1014 | + fputs(pp->buffered_output.buf, stderr); |
| 1015 | + strbuf_release(&pp->buffered_output); |
| 1016 | + |
| 1017 | + sigchain_pop_common(); |
| 1018 | +} |
| 1019 | + |
| 1020 | +/* returns |
| 1021 | + * 0 if a new task was started. |
| 1022 | + * 1 if no new jobs was started (get_next_task ran out of work, non critical |
| 1023 | + * problem with starting a new command) |
| 1024 | + * <0 no new job was started, user wishes to shutdown early. Use negative code |
| 1025 | + * to signal the children. |
| 1026 | + */ |
| 1027 | +static int pp_start_one(struct parallel_processes *pp) |
| 1028 | +{ |
| 1029 | + int i, code; |
| 1030 | + |
| 1031 | + for (i = 0; i < pp->max_processes; i++) |
| 1032 | + if (pp->children[i].state == GIT_CP_FREE) |
| 1033 | + break; |
| 1034 | + if (i == pp->max_processes) |
| 1035 | + die("BUG: bookkeeping is hard"); |
| 1036 | + |
| 1037 | + code = pp->get_next_task(&pp->children[i].process, |
| 1038 | + &pp->children[i].err, |
| 1039 | + pp->data, |
| 1040 | + &pp->children[i].data); |
| 1041 | + if (!code) { |
| 1042 | + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); |
| 1043 | + strbuf_reset(&pp->children[i].err); |
| 1044 | + return 1; |
| 1045 | + } |
| 1046 | + pp->children[i].process.err = -1; |
| 1047 | + pp->children[i].process.stdout_to_stderr = 1; |
| 1048 | + pp->children[i].process.no_stdin = 1; |
| 1049 | + |
| 1050 | + if (start_command(&pp->children[i].process)) { |
| 1051 | + code = pp->start_failure(&pp->children[i].process, |
| 1052 | + &pp->children[i].err, |
| 1053 | + pp->data, |
| 1054 | + &pp->children[i].data); |
| 1055 | + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); |
| 1056 | + strbuf_reset(&pp->children[i].err); |
| 1057 | + if (code) |
| 1058 | + pp->shutdown = 1; |
| 1059 | + return code; |
| 1060 | + } |
| 1061 | + |
| 1062 | + pp->nr_processes++; |
| 1063 | + pp->children[i].state = GIT_CP_WORKING; |
| 1064 | + pp->pfd[i].fd = pp->children[i].process.err; |
| 1065 | + return 0; |
| 1066 | +} |
| 1067 | + |
| 1068 | +static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout) |
| 1069 | +{ |
| 1070 | + int i; |
| 1071 | + |
| 1072 | + while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) { |
| 1073 | + if (errno == EINTR) |
| 1074 | + continue; |
| 1075 | + pp_cleanup(pp); |
| 1076 | + die_errno("poll"); |
| 1077 | + } |
| 1078 | + |
| 1079 | + /* Buffer output from all pipes. */ |
| 1080 | + for (i = 0; i < pp->max_processes; i++) { |
| 1081 | + if (pp->children[i].state == GIT_CP_WORKING && |
| 1082 | + pp->pfd[i].revents & (POLLIN | POLLHUP)) { |
| 1083 | + int n = strbuf_read_once(&pp->children[i].err, |
| 1084 | + pp->children[i].process.err, 0); |
| 1085 | + if (n == 0) { |
| 1086 | + close(pp->children[i].process.err); |
| 1087 | + pp->children[i].state = GIT_CP_WAIT_CLEANUP; |
| 1088 | + } else if (n < 0) |
| 1089 | + if (errno != EAGAIN) |
| 1090 | + die_errno("read"); |
| 1091 | + } |
| 1092 | + } |
| 1093 | +} |
| 1094 | + |
| 1095 | +static void pp_output(struct parallel_processes *pp) |
| 1096 | +{ |
| 1097 | + int i = pp->output_owner; |
| 1098 | + if (pp->children[i].state == GIT_CP_WORKING && |
| 1099 | + pp->children[i].err.len) { |
| 1100 | + fputs(pp->children[i].err.buf, stderr); |
| 1101 | + strbuf_reset(&pp->children[i].err); |
| 1102 | + } |
| 1103 | +} |
| 1104 | + |
| 1105 | +static int pp_collect_finished(struct parallel_processes *pp) |
| 1106 | +{ |
| 1107 | + int i, code; |
| 1108 | + int n = pp->max_processes; |
| 1109 | + int result = 0; |
| 1110 | + |
| 1111 | + while (pp->nr_processes > 0) { |
| 1112 | + for (i = 0; i < pp->max_processes; i++) |
| 1113 | + if (pp->children[i].state == GIT_CP_WAIT_CLEANUP) |
| 1114 | + break; |
| 1115 | + if (i == pp->max_processes) |
| 1116 | + break; |
| 1117 | + |
| 1118 | + code = finish_command(&pp->children[i].process); |
| 1119 | + |
| 1120 | + code = pp->task_finished(code, &pp->children[i].process, |
| 1121 | + &pp->children[i].err, pp->data, |
| 1122 | + &pp->children[i].data); |
| 1123 | + |
| 1124 | + if (code) |
| 1125 | + result = code; |
| 1126 | + if (code < 0) |
| 1127 | + break; |
| 1128 | + |
| 1129 | + pp->nr_processes--; |
| 1130 | + pp->children[i].state = GIT_CP_FREE; |
| 1131 | + pp->pfd[i].fd = -1; |
| 1132 | + child_process_init(&pp->children[i].process); |
| 1133 | + |
| 1134 | + if (i != pp->output_owner) { |
| 1135 | + strbuf_addbuf(&pp->buffered_output, &pp->children[i].err); |
| 1136 | + strbuf_reset(&pp->children[i].err); |
| 1137 | + } else { |
| 1138 | + fputs(pp->children[i].err.buf, stderr); |
| 1139 | + strbuf_reset(&pp->children[i].err); |
| 1140 | + |
| 1141 | + /* Output all other finished child processes */ |
| 1142 | + fputs(pp->buffered_output.buf, stderr); |
| 1143 | + strbuf_reset(&pp->buffered_output); |
| 1144 | + |
| 1145 | + /* |
| 1146 | + * Pick next process to output live. |
| 1147 | + * NEEDSWORK: |
| 1148 | + * For now we pick it randomly by doing a round |
| 1149 | + * robin. Later we may want to pick the one with |
| 1150 | + * the most output or the longest or shortest |
| 1151 | + * running process time. |
| 1152 | + */ |
| 1153 | + for (i = 0; i < n; i++) |
| 1154 | + if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING) |
| 1155 | + break; |
| 1156 | + pp->output_owner = (pp->output_owner + i) % n; |
| 1157 | + } |
| 1158 | + } |
| 1159 | + return result; |
| 1160 | +} |
| 1161 | + |
| 1162 | +int run_processes_parallel(int n, |
| 1163 | + get_next_task_fn get_next_task, |
| 1164 | + start_failure_fn start_failure, |
| 1165 | + task_finished_fn task_finished, |
| 1166 | + void *pp_cb) |
| 1167 | +{ |
| 1168 | + int i, code; |
| 1169 | + int output_timeout = 100; |
| 1170 | + int spawn_cap = 4; |
| 1171 | + struct parallel_processes pp; |
| 1172 | + |
| 1173 | + pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb); |
| 1174 | + while (1) { |
| 1175 | + for (i = 0; |
| 1176 | + i < spawn_cap && !pp.shutdown && |
| 1177 | + pp.nr_processes < pp.max_processes; |
| 1178 | + i++) { |
| 1179 | + code = pp_start_one(&pp); |
| 1180 | + if (!code) |
| 1181 | + continue; |
| 1182 | + if (code < 0) { |
| 1183 | + pp.shutdown = 1; |
| 1184 | + kill_children(&pp, -code); |
| 1185 | + } |
| 1186 | + break; |
| 1187 | + } |
| 1188 | + if (!pp.nr_processes) |
| 1189 | + break; |
| 1190 | + pp_buffer_stderr(&pp, output_timeout); |
| 1191 | + pp_output(&pp); |
| 1192 | + code = pp_collect_finished(&pp); |
| 1193 | + if (code) { |
| 1194 | + pp.shutdown = 1; |
| 1195 | + if (code < 0) |
| 1196 | + kill_children(&pp, -code); |
| 1197 | + } |
| 1198 | + } |
| 1199 | + |
| 1200 | + pp_cleanup(&pp); |
| 1201 | + return 0; |
| 1202 | +} |
0 commit comments