tool_operate: make --skip-existing work for --parallel

Reported-by: Tobias Wendorff
Fixes #15261
Closes #15283
This commit is contained in:
Daniel Stenberg 2024-10-13 17:33:38 +02:00
parent 9bee39bfed
commit 69bf530dfd
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2

View File

@ -128,10 +128,12 @@ static CURLcode single_transfer(struct GlobalConfig *global,
struct OperationConfig *config, struct OperationConfig *config,
CURLSH *share, CURLSH *share,
bool capath_from_env, bool capath_from_env,
bool *added); bool *added,
bool *skipped);
static CURLcode create_transfer(struct GlobalConfig *global, static CURLcode create_transfer(struct GlobalConfig *global,
CURLSH *share, CURLSH *share,
bool *added); bool *added,
bool *skipped);
static bool is_fatal_error(CURLcode code) static bool is_fatal_error(CURLcode code)
{ {
@ -828,7 +830,8 @@ static CURLcode single_transfer(struct GlobalConfig *global,
struct OperationConfig *config, struct OperationConfig *config,
CURLSH *share, CURLSH *share,
bool capath_from_env, bool capath_from_env,
bool *added) bool *added,
bool *skipped)
{ {
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
struct getout *urlnode; struct getout *urlnode;
@ -837,7 +840,7 @@ static CURLcode single_transfer(struct GlobalConfig *global,
struct State *state = &config->state; struct State *state = &config->state;
char *httpgetfields = state->httpgetfields; char *httpgetfields = state->httpgetfields;
*added = FALSE; /* not yet */ *skipped = *added = FALSE; /* not yet */
if(config->postfields) { if(config->postfields) {
if(config->use_httpget) { if(config->use_httpget) {
@ -1220,6 +1223,7 @@ static CURLcode single_transfer(struct GlobalConfig *global,
notef(global, "skips transfer, \"%s\" exists locally", notef(global, "skips transfer, \"%s\" exists locally",
per->outfile); per->outfile);
per->skip = TRUE; per->skip = TRUE;
*skipped = TRUE;
} }
} }
if((urlnode->flags & GETOUT_USEREMOTE) if((urlnode->flags & GETOUT_USEREMOTE)
@ -2468,15 +2472,17 @@ static CURLcode add_parallel_transfers(struct GlobalConfig *global,
*addedp = FALSE; *addedp = FALSE;
*morep = FALSE; *morep = FALSE;
if(all_pers < (global->parallel_max*2)) { if(all_pers < (global->parallel_max*2)) {
result = create_transfer(global, share, addedp); bool skipped = FALSE;
if(result) do {
return result; result = create_transfer(global, share, addedp, &skipped);
if(result)
return result;
} while(skipped);
} }
for(per = transfers; per && (all_added < global->parallel_max); for(per = transfers; per && (all_added < global->parallel_max);
per = per->next) { per = per->next) {
bool getadded = FALSE; if(per->added || per->skip)
if(per->added) /* already added or to be skipped */
/* already added */
continue; continue;
if(per->startat && (time(NULL) < per->startat)) { if(per->startat && (time(NULL) < per->startat)) {
/* this is still delaying */ /* this is still delaying */
@ -2512,8 +2518,15 @@ static CURLcode add_parallel_transfers(struct GlobalConfig *global,
result = CURLE_OUT_OF_MEMORY; result = CURLE_OUT_OF_MEMORY;
} }
if(!result) if(!result) {
result = create_transfer(global, share, &getadded); bool getadded = FALSE;
bool skipped = FALSE;
do {
result = create_transfer(global, share, &getadded, &skipped);
if(result)
break;
} while(skipped);
}
if(result) { if(result) {
free(errorbuf); free(errorbuf);
return result; return result;
@ -2874,32 +2887,35 @@ static CURLcode parallel_transfers(struct GlobalConfig *global,
#endif #endif
else else
#endif #endif
while(!s->mcode && (s->still_running || s->more_transfers)) {
/* If stopping prematurely (eg due to a --fail-early condition) then signal if(all_added) {
that any transfers in the multi should abort (via progress callback). */ while(!s->mcode && (s->still_running || s->more_transfers)) {
if(s->wrapitup) { /* If stopping prematurely (eg due to a --fail-early condition) then
if(!s->still_running) signal that any transfers in the multi should abort (via progress
break; callback). */
if(!s->wrapitup_processed) { if(s->wrapitup) {
struct per_transfer *per; if(!s->still_running)
for(per = transfers; per; per = per->next) { break;
if(per->added) if(!s->wrapitup_processed) {
per->abort = TRUE; struct per_transfer *per;
for(per = transfers; per; per = per->next) {
if(per->added)
per->abort = TRUE;
}
s->wrapitup_processed = TRUE;
} }
s->wrapitup_processed = TRUE;
} }
s->mcode = curl_multi_poll(s->multi, NULL, 0, 1000, NULL);
if(!s->mcode)
s->mcode = curl_multi_perform(s->multi, &s->still_running);
if(!s->mcode)
result = check_finished(s);
} }
s->mcode = curl_multi_poll(s->multi, NULL, 0, 1000, NULL); (void)progress_meter(global, &s->start, TRUE);
if(!s->mcode)
s->mcode = curl_multi_perform(s->multi, &s->still_running);
if(!s->mcode)
result = check_finished(s);
} }
(void)progress_meter(global, &s->start, TRUE);
/* Make sure to return some kind of error if there was a multi problem */ /* Make sure to return some kind of error if there was a multi problem */
if(s->mcode) { if(s->mcode) {
result = (s->mcode == CURLM_OUT_OF_MEMORY) ? CURLE_OUT_OF_MEMORY : result = (s->mcode == CURLM_OUT_OF_MEMORY) ? CURLE_OUT_OF_MEMORY :
@ -2920,8 +2936,9 @@ static CURLcode serial_transfers(struct GlobalConfig *global,
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
struct per_transfer *per; struct per_transfer *per;
bool added = FALSE; bool added = FALSE;
bool skipped = FALSE;
result = create_transfer(global, share, &added); result = create_transfer(global, share, &added, &skipped);
if(result) if(result)
return result; return result;
if(!added) { if(!added) {
@ -2967,12 +2984,14 @@ static CURLcode serial_transfers(struct GlobalConfig *global,
if(is_fatal_error(returncode) || (returncode && global->fail_early)) if(is_fatal_error(returncode) || (returncode && global->fail_early))
bailout = TRUE; bailout = TRUE;
else { else {
/* setup the next one just before we delete this */ do {
result = create_transfer(global, share, &added); /* setup the next one just before we delete this */
if(result) { result = create_transfer(global, share, &added, &skipped);
returncode = result; if(result) {
bailout = TRUE; returncode = result;
} bailout = TRUE;
}
} while(skipped);
} }
per = del_per_transfer(per); per = del_per_transfer(per);
@ -3006,7 +3025,8 @@ static CURLcode serial_transfers(struct GlobalConfig *global,
static CURLcode transfer_per_config(struct GlobalConfig *global, static CURLcode transfer_per_config(struct GlobalConfig *global,
struct OperationConfig *config, struct OperationConfig *config,
CURLSH *share, CURLSH *share,
bool *added) bool *added,
bool *skipped)
{ {
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
bool capath_from_env; bool capath_from_env;
@ -3111,7 +3131,8 @@ static CURLcode transfer_per_config(struct GlobalConfig *global,
} }
if(!result) if(!result)
result = single_transfer(global, config, share, capath_from_env, added); result = single_transfer(global, config, share, capath_from_env, added,
skipped);
return result; return result;
} }
@ -3122,12 +3143,14 @@ static CURLcode transfer_per_config(struct GlobalConfig *global,
*/ */
static CURLcode create_transfer(struct GlobalConfig *global, static CURLcode create_transfer(struct GlobalConfig *global,
CURLSH *share, CURLSH *share,
bool *added) bool *added,
bool *skipped)
{ {
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
*added = FALSE; *added = FALSE;
while(global->current) { while(global->current) {
result = transfer_per_config(global, global->current, share, added); result = transfer_per_config(global, global->current, share, added,
skipped);
if(!result && !*added) { if(!result && !*added) {
/* when one set is drained, continue to next */ /* when one set is drained, continue to next */
global->current = global->current->next; global->current = global->current->next;