From 39036c90216e059bbee64b86b198eae3135e0cda Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Tue, 24 Mar 2026 12:50:53 +0100 Subject: [PATCH] async-thrdd: use thread queue for resolving Use a thread queue and pool for asnyc threaded DNS resolves. Add pytest test_21_* for verification. Add `CURLMOPT_RESOLVE_THREADS_MAX` to allow applications to resize the thread pool used. Add `CURLMOPT_QUICK_EXIT` to allow applications to skip thread joins when cleaning up a multi handle. Multi handles in `curl_easy_perform()` inherit this from `CURLOPT_QUICK_EXIT`. Add several debug environment variables for testing. Closes #20936 --- docs/libcurl/curl_multi_setopt.md | 8 + docs/libcurl/libcurl-env-dbg.md | 17 + docs/libcurl/opts/CURLMOPT_QUICK_EXIT.md | 60 ++ .../opts/CURLMOPT_RESOLVE_THREADS_MAX.md | 75 ++ docs/libcurl/opts/Makefile.inc | 2 + docs/libcurl/symbols-in-versions | 2 + include/curl/multi.h | 6 + lib/asyn-thrdd.c | 940 ++++++++---------- lib/asyn.h | 53 +- lib/cf-socket.c | 8 +- lib/cfilters.c | 12 + lib/cfilters.h | 2 + lib/easy.c | 3 +- lib/ftp.c | 6 +- lib/hostip.c | 54 +- lib/hostip.h | 7 +- lib/hostip4.c | 4 +- lib/hostip6.c | 6 +- lib/multi.c | 48 + lib/multihandle.h | 4 + lib/socks.c | 8 +- lib/thrdpool.c | 98 +- lib/thrdpool.h | 6 + lib/thrdqueue.c | 31 +- lib/thrdqueue.h | 6 + lib/url.c | 3 +- src/tool_operate.c | 3 + tests/data/test2500 | 2 +- tests/http/Makefile.am | 1 + tests/http/test_21_resolve.py | 124 +++ tests/http/testenv/curl.py | 13 +- 31 files changed, 998 insertions(+), 614 deletions(-) create mode 100644 docs/libcurl/opts/CURLMOPT_QUICK_EXIT.md create mode 100644 docs/libcurl/opts/CURLMOPT_RESOLVE_THREADS_MAX.md create mode 100644 tests/http/test_21_resolve.py diff --git a/docs/libcurl/curl_multi_setopt.md b/docs/libcurl/curl_multi_setopt.md index adb38e0de5..0bf7438cc8 100644 --- a/docs/libcurl/curl_multi_setopt.md +++ b/docs/libcurl/curl_multi_setopt.md @@ -100,6 +100,14 @@ Pointer to pass to push callback. See CURLMOPT_PUSHDATA(3) Callback that approves or denies server pushes. See CURLMOPT_PUSHFUNCTION(3) +## CURLMOPT_QUICK_EXIT + +Enable a quicker cleanup of the multi handle. See CURLMOPT_QUICK_EXIT(3) + +## CURLMOPT_RESOLVE_THREADS_MAX + +Max threads used for threaded DNS resolver. See CURLMOPT_RESOLVE_THREADS_MAX(3) + ## CURLMOPT_SOCKETDATA Custom pointer passed to the socket callback. See CURLMOPT_SOCKETDATA(3) diff --git a/docs/libcurl/libcurl-env-dbg.md b/docs/libcurl/libcurl-env-dbg.md index a7aad018e9..24ac67ac84 100644 --- a/docs/libcurl/libcurl-env-dbg.md +++ b/docs/libcurl/libcurl-env-dbg.md @@ -173,3 +173,20 @@ Make a blocking, graceful shutdown of all remaining connections when a multi handle is destroyed. This implicitly triggers for easy handles that are run via easy_perform. The value of the environment variable gives the shutdown timeout in milliseconds. + +## `CURL_DBG_RESOLV_MAX_THREADS` + +Overrides the maximum number of threads for resolver. + +## `CURL_DBG_RESOLV_DELAY` + +Makes ever threaded resolve experience an initial delay in milliseconds. + +## `CURL_DBG_RESOLV_FAIL_DELAY` + +With a threaded resolver, delay each lookup by the given milliseconds +and give a negative answer. + +## `CURL_DBG_RESOLV_FAIL_IPV6` + +Make libcurl fail a resolve for IPv6 only. diff --git a/docs/libcurl/opts/CURLMOPT_QUICK_EXIT.md b/docs/libcurl/opts/CURLMOPT_QUICK_EXIT.md new file mode 100644 index 0000000000..f932208363 --- /dev/null +++ b/docs/libcurl/opts/CURLMOPT_QUICK_EXIT.md @@ -0,0 +1,60 @@ +--- +c: Copyright (C) Daniel Stenberg, , et al. +SPDX-License-Identifier: curl +Title: CURLMOPT_QUICK_EXIT +Section: 3 +Source: libcurl +See-also: + - CURLOPT_QUICK_EXIT (3) +Protocol: + - All +Added-in: 8.20.0 +--- + +# NAME + +CURLOPT_QUICK_EXIT - allow libcurl to exit quickly + +# SYNOPSIS + +~~~c +#include + +CURLMcode curl_multi_setopt(CURLM *handle, CURLMOPT_QUICK_EXIT, + long value); +~~~ + +# DESCRIPTION + +Pass a long as a parameter, 1L meaning that when recovering from a timeout, +libcurl should skip lengthy cleanups that are intended to avoid all kinds of +leaks (threads etc.), as the caller program is about to call exit() anyway. +This allows for a swift termination after a DNS timeout for example, by +canceling and/or forgetting about a resolver thread, at the expense of a +possible (though short-lived) leak of associated resources. + +# DEFAULT + +20. + +# %PROTOCOLS% + +# EXAMPLE + +~~~c +int main(void) +{ + CURLM *m = curl_multi_init(); + /* do not join threads when cleaning up this multi handle */ + curl_multi_setopt(m, CURLMOPT_QUICK_EXIT, 1L); +} +~~~ + +# %AVAILABILITY% + +# RETURN VALUE + +curl_multi_setopt(3) returns a CURLMcode indicating success or error. + +CURLM_OK (0) means everything was OK, non-zero means an error occurred, see +libcurl-errors(3). diff --git a/docs/libcurl/opts/CURLMOPT_RESOLVE_THREADS_MAX.md b/docs/libcurl/opts/CURLMOPT_RESOLVE_THREADS_MAX.md new file mode 100644 index 0000000000..593ff7fa02 --- /dev/null +++ b/docs/libcurl/opts/CURLMOPT_RESOLVE_THREADS_MAX.md @@ -0,0 +1,75 @@ +--- +c: Copyright (C) Daniel Stenberg, , et al. +SPDX-License-Identifier: curl +Title: CURLMOPT_RESOLVE_THREADS_MAX +Section: 3 +Source: libcurl +See-also: + - CURLOPT_IPRESOLVE (3) + - CURLOPT_RESOLVE (3) +Protocol: + - All +Added-in: 8.20.0 +--- + +# NAME + +CURLMOPT_RESOLVE_THREADS_MAX - max threads for threaded DNS resolver + +# SYNOPSIS + +~~~c +#include + +CURLMcode curl_multi_setopt(CURLM *handle, CURLMOPT_RESOLVE_THREADS_MAX, + long amount); +~~~ + +# DESCRIPTION + +Pass a long for the **amount**. The set number is used as the maximum number +of threads to be used for the threaded DNS resolver. It has to be a +positive number in the range of 32 bits. + +When libcurl is built with a threaded resolver, which is the default on +many systems, it uses a thread pool to lookup addresses and other +properties of hostnames so other transfers are not blocked by this. + +Threads are started on demand to perform the resolving and shut down +again after a period of inactivity. When the maximum number of threads +is reached, outstanding resolves are held in a queue and served when +a thread becomes available. + +The default maximum is expected to work fine for many situations. Application +may override it using this option for the multi handle. + +Changing this value while there are resolves in progress is possible. +Increasing the value takes effect right away. Lowering the value does +not close down any resolves, but ends threads above the new maximum +once the resolving is done. + +# DEFAULT + +20. + +# %PROTOCOLS% + +# EXAMPLE + +~~~c +int main(void) +{ + CURLM *m = curl_multi_init(); + /* never use more than 5 threads for resolving */ + curl_multi_setopt(m, CURLMOPT_RESOLVE_THREADS_MAX, 5L); +} +~~~ + +# %AVAILABILITY% + +# RETURN VALUE + +curl_multi_setopt(3) returns a CURLMcode indicating success or error. + +CURLM_OK (0) means everything was OK, non-zero means an error occurred, see +libcurl-errors(3). diff --git a/docs/libcurl/opts/Makefile.inc b/docs/libcurl/opts/Makefile.inc index e875ff3ff4..1eb628b800 100644 --- a/docs/libcurl/opts/Makefile.inc +++ b/docs/libcurl/opts/Makefile.inc @@ -122,6 +122,8 @@ man_MANS = \ CURLMOPT_PIPELINING_SITE_BL.3 \ CURLMOPT_PUSHDATA.3 \ CURLMOPT_PUSHFUNCTION.3 \ + CURLMOPT_QUICK_EXIT.3 \ + CURLMOPT_RESOLVE_THREADS_MAX.3 \ CURLMOPT_SOCKETDATA.3 \ CURLMOPT_SOCKETFUNCTION.3 \ CURLMOPT_TIMERDATA.3 \ diff --git a/docs/libcurl/symbols-in-versions b/docs/libcurl/symbols-in-versions index 987d4304f2..0d775fa655 100644 --- a/docs/libcurl/symbols-in-versions +++ b/docs/libcurl/symbols-in-versions @@ -579,6 +579,8 @@ CURLMOPT_PIPELINING_SERVER_BL 7.30.0 CURLMOPT_PIPELINING_SITE_BL 7.30.0 CURLMOPT_PUSHDATA 7.44.0 CURLMOPT_PUSHFUNCTION 7.44.0 +CURLMOPT_QUICK_EXIT 8.20.0 +CURLMOPT_RESOLVE_THREADS_MAX 8.20.0 CURLMOPT_SOCKETDATA 7.15.4 CURLMOPT_SOCKETFUNCTION 7.15.4 CURLMOPT_TIMERDATA 7.16.0 diff --git a/include/curl/multi.h b/include/curl/multi.h index 7baea3778b..412fdb94af 100644 --- a/include/curl/multi.h +++ b/include/curl/multi.h @@ -403,6 +403,12 @@ typedef enum { /* This is the argument passed to the notify callback */ CURLOPT(CURLMOPT_NOTIFYDATA, CURLOPTTYPE_OBJECTPOINT, 19), + /* maximum number of threads used with threaded DNS resolver */ + CURLOPT(CURLMOPT_RESOLVE_THREADS_MAX, CURLOPTTYPE_LONG, 20), + + /* set to 1L for not joining threads when multi is cleaned up */ + CURLOPT(CURLMOPT_QUICK_EXIT, CURLOPTTYPE_LONG, 21), + CURLMOPT_LASTENTRY /* the last unused */ } CURLMoption; diff --git a/lib/asyn-thrdd.c b/lib/asyn-thrdd.c index 52278b24fa..1fa83ff4d6 100644 --- a/lib/asyn-thrdd.c +++ b/lib/asyn-thrdd.c @@ -28,8 +28,6 @@ **********************************************************************/ #ifdef USE_RESOLV_THREADED -#include "socketpair.h" - #ifdef HAVE_NETINET_IN_H #include #endif @@ -44,10 +42,6 @@ #include #endif -#if defined(HAVE_THREADS_POSIX) && defined(HAVE_PTHREAD_H) -#include -#endif - #ifdef HAVE_GETADDRINFO #define RESOLVER_ENOMEM EAI_MEMORY /* = WSA_NOT_ENOUGH_MEMORY on Windows */ #else @@ -64,7 +58,11 @@ #include "multiif.h" #include "curl_threads.h" #include "progress.h" +#include "rand.h" #include "select.h" +#include "thrdqueue.h" +#include "curlx/strparse.h" +#include "curlx/wait.h" #ifdef USE_ARES #include @@ -111,221 +109,115 @@ CURLcode Curl_async_get_impl(struct Curl_easy *data, return CURLE_OK; } +#ifdef CURLVERBOSE +#define CURL_ASYN_ITEM_DESC_LEN 64 +#define async_item_description(x) (x)->description +#else +#define async_item_description(x) NULL +#endif + +struct async_thrdd_item { + struct Curl_addrinfo *res; +#ifdef HAVE_GETADDRINFO + struct addrinfo hints; +#endif +#ifdef CURLVERBOSE + char description[CURL_ASYN_ITEM_DESC_LEN]; +#endif + int sock_error; + uint32_t mid; + uint32_t async_id; + uint16_t port; + uint8_t ip_version; + uint8_t transport; +#ifdef DEBUGBUILD + uint32_t delay_ms; + uint32_t delay_fail_ms; +#endif + char hostname[1]; +}; + /* Give up reference to add_ctx */ -static void addr_ctx_unlink(struct async_thrdd_addr_ctx **paddr_ctx, - struct Curl_easy *data) +static void async_thrdd_item_destroy(struct async_thrdd_item *item) { - struct async_thrdd_addr_ctx *addr_ctx = *paddr_ctx; - bool destroy; - - if(!addr_ctx) - return; - - Curl_mutex_acquire(&addr_ctx->mutx); - if(!data) /* called by resolving thread */ - addr_ctx->thrd_done = TRUE; - - DEBUGASSERT(addr_ctx->ref_count); - --addr_ctx->ref_count; - destroy = !addr_ctx->ref_count; - Curl_mutex_release(&addr_ctx->mutx); - - if(destroy) { - Curl_mutex_destroy(&addr_ctx->mutx); - curlx_free(addr_ctx->hostname); - if(addr_ctx->res) - Curl_freeaddrinfo(addr_ctx->res); - Curl_wakeup_destroy(addr_ctx->sock_pair); - curlx_free(addr_ctx); + if(item) { + if(item->res) + Curl_freeaddrinfo(item->res); + curlx_free(item); } - *paddr_ctx = NULL; } /* Initialize context for threaded resolver */ -static struct async_thrdd_addr_ctx * -addr_ctx_create(struct Curl_easy *data, - struct Curl_resolv_async *async, - const struct addrinfo *hints) +static struct async_thrdd_item * +async_thrdd_item_create(struct Curl_easy *data, + const char *hostname, uint16_t port, + uint8_t ip_version, uint8_t transport, + uint32_t async_id) { - struct async_thrdd_addr_ctx *addr_ctx = curlx_calloc(1, sizeof(*addr_ctx)); - if(!addr_ctx) + size_t hostlen = strlen(hostname); + struct async_thrdd_item *item; + VERBOSE(const char *qtype = "A"); + + item = curlx_calloc(1, sizeof(*item) + hostlen); + if(!item) return NULL; - addr_ctx->thread_hnd = curl_thread_t_null; - addr_ctx->port = async->port; - addr_ctx->ref_count = 1; + if(hostlen) /* NUL byte of name already in struct size */ + memcpy(item->hostname, hostname, hostlen); + item->port = port; + item->ip_version = ip_version; + item->transport = transport; + item->mid = data->mid; + item->async_id = async_id; #ifdef HAVE_GETADDRINFO - DEBUGASSERT(hints); - addr_ctx->hints = *hints; -#else - (void)hints; -#endif - - Curl_mutex_init(&addr_ctx->mutx); - -#ifndef CURL_DISABLE_SOCKETPAIR - /* create socket pair or pipe */ - if(Curl_wakeup_init(addr_ctx->sock_pair, FALSE) < 0) { - addr_ctx->sock_pair[0] = CURL_SOCKET_BAD; - addr_ctx->sock_pair[1] = CURL_SOCKET_BAD; - goto err_exit; - } -#endif - addr_ctx->sock_error = 0; - - /* Copying hostname string because original can be destroyed by parent - * thread during gethostbyname execution. - */ - addr_ctx->hostname = curlx_strdup(async->hostname); - if(!addr_ctx->hostname) - goto err_exit; - - return addr_ctx; - -err_exit: - addr_ctx_unlink(&addr_ctx, data); - return NULL; -} - -#ifdef HAVE_GETADDRINFO - -/* - * getaddrinfo_thread() resolves a name and then exits. - * - * For builds without ARES, but with USE_IPV6, create a resolver thread - * and wait on it. - */ -static CURL_THREAD_RETURN_T CURL_STDCALL getaddrinfo_thread(void *arg) -{ - struct async_thrdd_addr_ctx *addr_ctx = arg; - curl_bit do_abort; - - Curl_mutex_acquire(&addr_ctx->mutx); - do_abort = addr_ctx->do_abort; - Curl_mutex_release(&addr_ctx->mutx); - - if(!do_abort) { - char service[12]; - int rc; - - curl_msnprintf(service, sizeof(service), "%d", addr_ctx->port); - - rc = Curl_getaddrinfo_ex(addr_ctx->hostname, service, - &addr_ctx->hints, &addr_ctx->res); - - if(rc) { - addr_ctx->sock_error = SOCKERRNO ? SOCKERRNO : rc; - if(addr_ctx->sock_error == 0) - addr_ctx->sock_error = RESOLVER_ENOMEM; - } - else { - Curl_addrinfo_set_port(addr_ctx->res, addr_ctx->port); - } - -#ifndef CURL_DISABLE_SOCKETPAIR - Curl_mutex_acquire(&addr_ctx->mutx); - do_abort = addr_ctx->do_abort; - Curl_mutex_release(&addr_ctx->mutx); - - if(!do_abort) { - /* Thread is done, notify transfer */ - int err = Curl_wakeup_signal(addr_ctx->sock_pair); - if(err) { - /* update sock_error to errno */ - addr_ctx->sock_error = err; - } + { + int pf = PF_INET; +#ifdef CURLRES_IPV6 + if((ip_version != CURL_IPRESOLVE_V4) && Curl_ipv6works(data)) { + /* The stack seems to be IPv6-enabled */ + if(ip_version == CURL_IPRESOLVE_V6) + pf = PF_INET6; + else + pf = PF_UNSPEC; } +#endif /* CURLRES_IPV6 */ + item->hints.ai_family = pf; + item->hints.ai_socktype = Curl_socktype_for_transport(transport); +#ifdef CURLVERBOSE + qtype = (pf == PF_INET6) ? "AAAA" : "A+AAAA"; #endif } - - addr_ctx_unlink(&addr_ctx, NULL); - return 0; -} - -#else /* HAVE_GETADDRINFO */ - -/* - * gethostbyname_thread() resolves a name and then exits. - */ -static CURL_THREAD_RETURN_T CURL_STDCALL gethostbyname_thread(void *arg) -{ - struct async_thrdd_addr_ctx *addr_ctx = arg; - bool do_abort; - - Curl_mutex_acquire(&addr_ctx->mutx); - do_abort = addr_ctx->do_abort; - Curl_mutex_release(&addr_ctx->mutx); - - if(!do_abort) { - addr_ctx->res = Curl_ipv4_resolve_r(addr_ctx->hostname, addr_ctx->port); - if(!addr_ctx->res) { - addr_ctx->sock_error = SOCKERRNO; - if(addr_ctx->sock_error == 0) - addr_ctx->sock_error = RESOLVER_ENOMEM; - } - - Curl_mutex_acquire(&addr_ctx->mutx); - do_abort = addr_ctx->do_abort; - Curl_mutex_release(&addr_ctx->mutx); -#ifndef CURL_DISABLE_SOCKETPAIR - if(!do_abort) { - int err = Curl_wakeup_signal(addr_ctx->sock_pair); - if(err) { - /* update sock_error to errno */ - addr_ctx->sock_error = err; - } - } -#endif - } - - addr_ctx_unlink(&addr_ctx, NULL); - return 0; -} - #endif /* HAVE_GETADDRINFO */ -/* - * async_thrdd_destroy() cleans up async resolver data and thread handle. - */ -static void async_thrdd_destroy(struct Curl_easy *data, - struct Curl_resolv_async *async) -{ - struct async_thrdd_ctx *thrdd = &async->thrdd; - struct async_thrdd_addr_ctx *addr = thrdd->addr; - -#ifdef USE_HTTPSRR_ARES - if(thrdd->rr.channel) { - ares_destroy(thrdd->rr.channel); - thrdd->rr.channel = NULL; - } - Curl_httpsrr_cleanup(&thrdd->rr.hinfo); +#ifdef CURLVERBOSE + curl_msnprintf(item->description, sizeof(item->description), + "[%" FMT_OFF_T "/%d] %s %s:%u", + data->id, item->async_id, qtype, item->hostname, item->port); #endif - if(thrdd->addr && (thrdd->addr->thread_hnd != curl_thread_t_null)) { - curl_bit done; - - Curl_mutex_acquire(&addr->mutx); -#ifndef CURL_DISABLE_SOCKETPAIR - if(!addr->do_abort) - Curl_multi_will_close(data, addr->sock_pair[0]); -#endif - addr->do_abort = TRUE; - done = addr->thrd_done; - Curl_mutex_release(&addr->mutx); - - if(done) { - Curl_thread_join(&addr->thread_hnd); - CURL_TRC_DNS(data, "async_thrdd_destroy, thread joined"); +#ifdef DEBUGBUILD + { + const char *p = getenv("CURL_DBG_RESOLV_DELAY"); + if(p) { + curl_off_t l; + if(!curlx_str_number(&p, &l, UINT32_MAX)) { + item->delay_ms = (uint32_t)l; + } } - else { - /* thread is still running. Detach it. */ - Curl_thread_destroy(&addr->thread_hnd); - CURL_TRC_DNS(data, "async_thrdd_destroy, thread detached"); + p = getenv("CURL_DBG_RESOLV_FAIL_DELAY"); + if(p) { + curl_off_t l; + if(!curlx_str_number(&p, &l, UINT32_MAX)) { + unsigned char c = 0; + Curl_rand_bytes(data, FALSE, &c, 1); + item->delay_fail_ms = (uint32_t)l + c; + } } } - /* release our reference to the shared context */ - addr_ctx_unlink(&thrdd->addr, data); +#endif + + return item; } #ifdef USE_HTTPSRR_ARES @@ -344,15 +236,17 @@ static void async_thrdd_rr_done(void *user_data, ares_status_t status, thrdd->rr.result = Curl_httpsrr_from_ares(data, dnsrec, &thrdd->rr.hinfo); } -static CURLcode async_rr_start(struct Curl_easy *data, int port) +static CURLcode async_rr_start(struct Curl_easy *data, + struct Curl_resolv_async *async) { - struct async_thrdd_ctx *thrdd = &data->state.async->thrdd; + struct async_thrdd_ctx *thrdd = &async->thrdd; int status; char *rrname = NULL; DEBUGASSERT(!thrdd->rr.channel); - if(port != 443) { - rrname = curl_maprintf("_%d_.https.%s", port, data->conn->host.name); + if(async->port != 443) { + rrname = curl_maprintf("_%d_.https.%s", + async->port, data->conn->host.name); if(!rrname) return CURLE_OUT_OF_MEMORY; } @@ -385,164 +279,315 @@ static CURLcode async_rr_start(struct Curl_easy *data, int port) } #endif -/* - * async_thrdd_init() starts a new thread that performs the actual - * resolve. This function returns before the resolve is done. - * - * Returns FALSE in case of failure, otherwise TRUE. - */ -static bool async_thrdd_init(struct Curl_easy *data, - struct Curl_resolv_async *async, - const struct addrinfo *hints) -{ - struct async_thrdd_ctx *thrdd = &async->thrdd; - struct async_thrdd_addr_ctx *addr_ctx; - - /* !checksrc! disable ERRNOVAR 1 */ - int err = ENOMEM; - - DEBUGASSERT(!thrdd->addr); -#ifdef USE_HTTPSRR_ARES - DEBUGASSERT(!thrdd->rr.channel); -#endif - - addr_ctx = addr_ctx_create(data, async, hints); - if(!addr_ctx) - goto err_exit; - thrdd->addr = addr_ctx; - - /* passing addr_ctx to the thread adds a reference */ - addr_ctx->ref_count = 2; - addr_ctx->start = *Curl_pgrs_now(data); - -#ifdef HAVE_GETADDRINFO - addr_ctx->thread_hnd = Curl_thread_create(getaddrinfo_thread, addr_ctx); -#else - addr_ctx->thread_hnd = Curl_thread_create(gethostbyname_thread, addr_ctx); -#endif - - if(addr_ctx->thread_hnd == curl_thread_t_null) { - /* The thread never started */ - addr_ctx->ref_count = 1; - addr_ctx->thrd_done = TRUE; - err = errno; - goto err_exit; - } - -#ifdef USE_HTTPSRR_ARES - if(async_rr_start(data, async->port)) - infof(data, "Failed HTTPS RR operation"); -#endif - CURL_TRC_DNS(data, "resolve thread started for of %s:%d", - async->hostname, async->port); - return TRUE; - -err_exit: - CURL_TRC_DNS(data, "resolve thread failed init: %d", err); - async_thrdd_destroy(data, async); - errno = err; - return FALSE; -} - -static void async_thrdd_shutdown(struct Curl_easy *data, - struct Curl_resolv_async *async) -{ - struct async_thrdd_ctx *thrdd = &async->thrdd; - struct async_thrdd_addr_ctx *addr_ctx = thrdd->addr; - curl_bit done; - - if(!addr_ctx) - return; - if(addr_ctx->thread_hnd == curl_thread_t_null) - return; - - Curl_mutex_acquire(&addr_ctx->mutx); -#ifndef CURL_DISABLE_SOCKETPAIR - if(!addr_ctx->do_abort) - Curl_multi_will_close(data, addr_ctx->sock_pair[0]); -#endif - addr_ctx->do_abort = TRUE; - done = addr_ctx->thrd_done; - Curl_mutex_release(&addr_ctx->mutx); - - /* Wait for the thread to terminate if it is already marked done. If it is - not done yet we cannot do anything here. We had tried pthread_cancel but - it caused hanging and resource leaks (#18532). */ - if(done && (addr_ctx->thread_hnd != curl_thread_t_null)) { - Curl_thread_join(&addr_ctx->thread_hnd); - CURL_TRC_DNS(data, "async_thrdd_shutdown, thread joined"); - } -} - -/* - * 'entry' may be NULL and then no data is returned - */ -static CURLcode asyn_thrdd_await(struct Curl_easy *data, - struct Curl_resolv_async *async, - struct Curl_dns_entry **entry) -{ - struct async_thrdd_addr_ctx *addr_ctx = async->thrdd.addr; - CURLcode result = CURLE_OK; - - if(addr_ctx && (addr_ctx->thread_hnd != curl_thread_t_null)) { - /* not interested in result? cancel, if still running... */ - if(!entry) - async_thrdd_shutdown(data, async); - - if(addr_ctx->thread_hnd != curl_thread_t_null) { - CURL_TRC_DNS(data, "resolve, wait for thread to finish"); - if(!Curl_thread_join(&addr_ctx->thread_hnd)) { - DEBUGASSERT(0); - } - } - - if(entry) { - result = Curl_async_take_result(data, async, entry); - if(result == CURLE_AGAIN) - result = CURLE_OK; - } - } - - return result; -} - -/* - * Until we gain a way to signal the resolver threads to stop early, we must - * wait for them and ignore their results. - */ void Curl_async_thrdd_shutdown(struct Curl_easy *data, struct Curl_resolv_async *async) { - async_thrdd_shutdown(data, async); + Curl_async_thrdd_destroy(data, async); +} + +static bool async_thrdd_match_item(void *qitem, void *match_data) +{ + struct Curl_easy *data = match_data; + struct async_thrdd_item *item = qitem; + return item->mid == data->mid; } void Curl_async_thrdd_destroy(struct Curl_easy *data, struct Curl_resolv_async *async) { - if(!data->set.quick_exit) { - (void)asyn_thrdd_await(data, async, NULL); + (void)data; + if(async->thrdd.queued && !async->thrdd.done && + data->multi && data->multi->resolv_thrdq) { + /* Remove any resolve items still queued */ + Curl_thrdq_clear(data->multi->resolv_thrdq, + async_thrdd_match_item, data); } - async_thrdd_destroy(data, async); +#ifdef USE_HTTPSRR_ARES + if(async->thrdd.rr.channel) { + ares_destroy(async->thrdd.rr.channel); + async->thrdd.rr.channel = NULL; + } + Curl_httpsrr_cleanup(&async->thrdd.rr.hinfo); +#endif + async_thrdd_item_destroy(async->thrdd.resolved); + async->thrdd.resolved = NULL; } /* - * Curl_async_await() - * * Waits for a resolve to finish. This function should be avoided since using * this risk getting the multi interface to "hang". - * - * If 'entry' is non-NULL, make it point to the resolved dns entry - * - * Returns CURLE_COULDNT_RESOLVE_HOST if the host was not resolved, - * CURLE_OPERATION_TIMEDOUT if a time-out occurred, or other errors. - * - * This is the version for resolves-in-a-thread. */ CURLcode Curl_async_await(struct Curl_easy *data, struct Curl_resolv_async *async, - struct Curl_dns_entry **entry) + struct Curl_dns_entry **pdns) { - return asyn_thrdd_await(data, async, entry); + struct async_thrdd_ctx *thrdd = &async->thrdd; + timediff_t milli, ms; + + CURL_TRC_DNS(data, "[async] await %s", async->hostname); + while(thrdd->queued && !thrdd->done) { + Curl_async_thrdd_multi_process(data->multi); + if(thrdd->done) + break; + + ms = curlx_ptimediff_ms(Curl_pgrs_now(data), &async->start); + if(ms < 3) + milli = 0; + else if(ms <= 50) + milli = ms / 3; + else if(ms <= 250) + milli = 50; + else + milli = 200; + CURL_TRC_DNS(data, "[async] await, waiting %" FMT_TIMEDIFF_T "ms", + milli); + curlx_wait_ms(milli); + } + return Curl_async_take_result(data, async, pdns); +} + +#ifdef HAVE_GETADDRINFO + +/* Process the item, using Curl_getaddrinfo_ex() */ +static void async_thrdd_item_process(void *arg) +{ + struct async_thrdd_item *item = arg; + char service[12]; + int rc; + +#ifdef DEBUGBUILD + if(item->delay_ms) { + curlx_wait_ms(item->delay_ms); + } + if(item->delay_fail_ms) { + curlx_wait_ms(item->delay_fail_ms); + return; + } +#endif + curl_msnprintf(service, sizeof(service), "%d", item->port); + + rc = Curl_getaddrinfo_ex(item->hostname, service, + &item->hints, &item->res); + if(rc) { + item->sock_error = SOCKERRNO ? SOCKERRNO : rc; + if(item->sock_error == 0) + item->sock_error = RESOLVER_ENOMEM; + } + else { + Curl_addrinfo_set_port(item->res, item->port); + } +} + +#else /* HAVE_GETADDRINFO */ + +/* Process the item, using Curl_ipv4_resolve_r() */ +static void async_thrdd_item_process(void *item) +{ + struct async_thrdd_item *item = arg; + +#ifdef DEBUGBUILD + if(item->delay_ms) { + curlx_wait_ms(item->delay_ms); + } + if(item->delay_fail_ms) { + curlx_wait_ms(item->delay_fail_ms); + return; + } +#endif + item->res = Curl_ipv4_resolve_r(item->hostname, item->port); + if(!item->res) { + item->sock_error = SOCKERRNO; + if(item->sock_error == 0) + item->sock_error = RESOLVER_ENOMEM; + } +} + +#endif /* HAVE_GETADDRINFO */ + +#ifdef ENABLE_WAKEUP +static void async_thrdd_event(const struct curl_thrdq *tqueue, + Curl_thrdq_event ev, + void *user_data) +{ + struct Curl_multi *multi = user_data; + (void)tqueue; + switch(ev) { + case CURL_THRDQ_EV_ITEM_DONE: + (void)curl_multi_wakeup(multi); + break; + default: + break; + } +} +#else +#define async_thrdd_event NULL +#endif + +static void async_thrdd_item_free(void *item) +{ + async_thrdd_item_destroy(item); +} + +/* Create a thread queue for processing resolv items */ +CURLcode Curl_async_thrdd_multi_init(struct Curl_multi *multi, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms) +{ + CURLcode result; + DEBUGASSERT(!multi->resolv_thrdq); + result = Curl_thrdq_create(&multi->resolv_thrdq, "async", 0, + min_threads, max_threads, idle_time_ms, + async_thrdd_item_free, + async_thrdd_item_process, + async_thrdd_event, + multi); +#ifdef DEBUGBUILD + if(!result) { + const char *p = getenv("CURL_DBG_RESOLV_MAX_THREADS"); + if(p) { + curl_off_t l; + if(!curlx_str_number(&p, &l, UINT32_MAX)) { + result = Curl_async_thrdd_multi_set_props( + multi, min_threads, (uint32_t)l, idle_time_ms); + } + } + } +#endif + return result; +} + +/* Tear down the thread queue, joining active threads or detaching them */ +void Curl_async_thrdd_multi_destroy(struct Curl_multi *multi, bool join) +{ + if(multi->resolv_thrdq) { + Curl_thrdq_destroy(multi->resolv_thrdq, join); + multi->resolv_thrdq = NULL; + } +} + +/* Process the receiving end of the thread queue, dispatching + * processed items to their transfer when it can still be found + * and has an `async` state present. Otherwise, destroy the item. */ +void Curl_async_thrdd_multi_process(struct Curl_multi *multi) +{ + struct Curl_easy *data; + void *qitem; + + while(!Curl_thrdq_recv(multi->resolv_thrdq, &qitem)) { + /* dispatch resolve result */ + struct async_thrdd_item *item = qitem; + + CURL_TRC_DNS(multi->admin, "[async] got %s'%s'", + item->res ? "" : "negative for ", item->description); + + data = Curl_multi_get_easy(multi, item->mid); + /* there is a chance that the original resolve was discarded and + * either no new, or a new resolve with a different id is ongoing. */ + if(data && data->conn && data->state.async && + (data->state.async->id == item->async_id)) { + struct Curl_resolv_async *async = data->state.async; + + async->thrdd.resolved = item; + async->thrdd.done = TRUE; + item = NULL; + Curl_multi_mark_dirty(data); + } + async_thrdd_item_free(item); + } +#ifdef CURLVERBOSE + Curl_thrdq_trace(multi->resolv_thrdq, multi->admin, &Curl_trc_feat_dns); +#endif +} + +CURLcode Curl_async_thrdd_multi_set_props(struct Curl_multi *multi, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms) +{ + return Curl_thrdq_set_props(multi->resolv_thrdq, 0, + min_threads, max_threads, idle_time_ms); +} + +CURLcode Curl_async_getaddrinfo(struct Curl_easy *data, + struct Curl_resolv_async *async) +{ + struct async_thrdd_item *item; + CURLcode result; + + if(async->thrdd.queued || async->thrdd.done || async->thrdd.resolved) + return CURLE_FAILED_INIT; + + item = async_thrdd_item_create(data, async->hostname, async->port, + async->ip_version, async->transport, + async->id); + if(!item) { + result = CURLE_OUT_OF_MEMORY; + goto out; + } + + CURL_TRC_DNS(data, "[async] queueing %s", item->description); + /* queue takes ownership of `item` on success */ + result = Curl_thrdq_send(data->multi->resolv_thrdq, item, + async_item_description(item), async->timeout_ms); + if(!result) + async->thrdd.queued = TRUE; + else + async_thrdd_item_destroy(item); +#ifdef CURLVERBOSE + Curl_thrdq_trace(data->multi->resolv_thrdq, data, &Curl_trc_feat_dns); +#endif + +#ifdef USE_HTTPSRR_ARES + DEBUGASSERT(!async->thrdd.rr.channel); + if(async_rr_start(data, async)) + infof(data, "Failed HTTPS RR operation"); +#endif + +out: + if(result) + CURL_TRC_DNS(data, "[async] error queueing %s:%d -> %d", + async->hostname, async->port, result); + return result; +} + +CURLcode Curl_async_pollset(struct Curl_easy *data, struct easy_pollset *ps) +{ + struct Curl_resolv_async *async = data->state.async; + struct async_thrdd_ctx *thrdd = async ? &async->thrdd : NULL; + + if(!thrdd) + return CURLE_OK; +#ifdef USE_HTTPSRR_ARES + if(thrdd->rr.channel) { + CURLcode result = Curl_ares_pollset(data, thrdd->rr.channel, ps); + if(result) + return result; + } +#else + (void)ps; +#endif + + if(!thrdd->done) { +#ifdef ENABLE_WAKEUP + /* The multi "wakeup" socket pair triggers result processing, + * no need for an extra timer. */ + (void)data; +#else + timediff_t milli; + timediff_t ms = curlx_ptimediff_ms(Curl_pgrs_now(data), &async->start); + if(ms < 3) + milli = 1; + else if(ms <= 50) + milli = ms / 3; + else if(ms <= 250) + milli = 50; + else + milli = 200; + Curl_expire(data, milli, EXPIRE_ASYNC_NAME); +#endif + } + return CURLE_OK; } /* @@ -555,10 +600,14 @@ CURLcode Curl_async_take_result(struct Curl_easy *data, struct Curl_dns_entry **pdns) { struct async_thrdd_ctx *thrdd = &async->thrdd; - curl_bit done = FALSE; + CURLcode result = CURLE_OK; DEBUGASSERT(pdns); *pdns = NULL; + if(!thrdd->queued) { + DEBUGASSERT(0); + return CURLE_FAILED_INIT; + } #ifdef USE_HTTPSRR_ARES /* best effort, ignore errors */ @@ -566,178 +615,55 @@ CURLcode Curl_async_take_result(struct Curl_easy *data, (void)Curl_ares_perform(thrdd->rr.channel, 0); #endif - DEBUGASSERT(thrdd->addr); - if(!thrdd->addr) - return CURLE_FAILED_INIT; - - Curl_mutex_acquire(&thrdd->addr->mutx); - done = thrdd->addr->thrd_done; - Curl_mutex_release(&thrdd->addr->mutx); - - if(done) { - CURLcode result = CURLE_OK; - - Curl_expire_done(data, EXPIRE_ASYNC_NAME); - - if(thrdd->addr->res) { - struct Curl_dns_entry *dns = - Curl_dnscache_mk_entry(data, &thrdd->addr->res, - async->hostname, async->port, - async->ip_version); - if(!dns) - result = CURLE_OUT_OF_MEMORY; - -#ifdef USE_HTTPSRR_ARES - if(thrdd->rr.channel) { - result = thrdd->rr.result; - if(!result) { - struct Curl_https_rrinfo *lhrr; - lhrr = Curl_httpsrr_dup_move(&thrdd->rr.hinfo); - if(!lhrr) - result = CURLE_OUT_OF_MEMORY; - else - dns->hinfo = lhrr; - } - } -#endif - if(!result && dns) { - result = Curl_dnscache_add(data, dns); - *pdns = dns; - } - } - - if(!result && !*pdns) - result = Curl_resolver_error(data, NULL); - CURL_TRC_DNS(data, "is_resolved() result=%d, dns=%sfound", - result, *pdns ? "" : "not "); - async_thrdd_shutdown(data, async); - return result; - } - else { - /* poll for name lookup done with exponential backoff up to 250ms */ - /* should be fine even if this converts to 32-bit */ - timediff_t elapsed = curlx_ptimediff_ms(Curl_pgrs_now(data), - &data->progress.t_startsingle); - if(elapsed < 0) - elapsed = 0; - - if(thrdd->addr->poll_interval == 0) - /* Start at 1ms poll interval */ - thrdd->addr->poll_interval = 1; - else if(elapsed >= thrdd->addr->interval_end) - /* Back-off exponentially if last interval expired */ - thrdd->addr->poll_interval *= 2; - - if(thrdd->addr->poll_interval > 250) - thrdd->addr->poll_interval = 250; - - thrdd->addr->interval_end = elapsed + thrdd->addr->poll_interval; - Curl_expire(data, thrdd->addr->poll_interval, EXPIRE_ASYNC_NAME); + if(!thrdd->done) { + CURL_TRC_DNS(data, "[async] take %s:%d -> EAGAIN", + async->hostname, async->port); return CURLE_AGAIN; } -} -CURLcode Curl_async_pollset(struct Curl_easy *data, struct easy_pollset *ps) -{ - struct async_thrdd_ctx *thrdd = &data->state.async->thrdd; - CURLcode result = CURLE_OK; - curl_bit thrd_done; - -#if !defined(USE_HTTPSRR_ARES) && defined(CURL_DISABLE_SOCKETPAIR) - (void)ps; -#endif + Curl_expire_done(data, EXPIRE_ASYNC_NAME); + if(thrdd->resolved && thrdd->resolved->res) { + struct Curl_dns_entry *dns = + Curl_dnscache_mk_entry(data, &thrdd->resolved->res, + async->hostname, async->port, async->ip_version); + if(!dns) + result = CURLE_OUT_OF_MEMORY; #ifdef USE_HTTPSRR_ARES - if(thrdd->rr.channel) { - result = Curl_ares_pollset(data, thrdd->rr.channel, ps); - if(result) - return result; + if(!result && thrdd->rr.channel) { + result = thrdd->rr.result; + if(!result) { + struct Curl_https_rrinfo *lhrr; + lhrr = Curl_httpsrr_dup_move(&thrdd->rr.hinfo); + if(!lhrr) + result = CURLE_OUT_OF_MEMORY; + else + dns->hinfo = lhrr; + } + } +#endif + if(!result && dns) { + CURL_TRC_DNS(data, "[async] resolved: %s", + thrdd->resolved->description); + *pdns = dns; + dns = NULL; + } + Curl_dns_entry_unlink(data, &dns); } -#endif - if(!thrdd->addr) - return result; - Curl_mutex_acquire(&thrdd->addr->mutx); - thrd_done = thrdd->addr->thrd_done; - Curl_mutex_release(&thrdd->addr->mutx); - - if(!thrd_done) { -#ifndef CURL_DISABLE_SOCKETPAIR - /* return read fd to client for polling the DNS resolution status */ - result = Curl_pollset_add_in(data, ps, thrdd->addr->sock_pair[0]); -#else - timediff_t milli; - timediff_t ms = - curlx_ptimediff_ms(Curl_pgrs_now(data), &thrdd->addr->start); - if(ms < 3) - milli = 0; - else if(ms <= 50) - milli = ms / 3; - else if(ms <= 250) - milli = 50; - else - milli = 200; - Curl_expire(data, milli, EXPIRE_ASYNC_NAME); +#ifdef CURLVERBOSE + Curl_thrdq_trace(data->multi->resolv_thrdq, data, &Curl_trc_feat_dns); #endif + if(!result && !*pdns) + result = Curl_resolver_error(data, NULL); + Curl_async_thrdd_shutdown(data, async); + if(result && + (result != CURLE_COULDNT_RESOLVE_HOST) && + (result != CURLE_COULDNT_RESOLVE_PROXY)) { + CURL_TRC_DNS(data, "[async] %s:%d: error %d", + async->hostname, async->port, result); } return result; } -#ifndef HAVE_GETADDRINFO -/* - * Curl_async_getaddrinfo() - for platforms without getaddrinfo - */ -CURLcode Curl_async_getaddrinfo(struct Curl_easy *data, - struct Curl_resolv_async *async) -{ - (void)ip_version; - - /* fire up a new resolver thread! */ - if(async_thrdd_init(data, async, NULL)) { - return CURLE_OK; - } - - failf(data, "getaddrinfo() thread failed"); - return CURLE_FAILED_INIT; -} - -#else /* !HAVE_GETADDRINFO */ - -/* - * Curl_async_getaddrinfo() - for getaddrinfo - */ -CURLcode Curl_async_getaddrinfo(struct Curl_easy *data, - struct Curl_resolv_async *async) -{ - struct addrinfo hints; - int pf = PF_INET; - - CURL_TRC_DNS(data, "init threaded resolve of %s:%d", - async->hostname, async->port); -#ifdef CURLRES_IPV6 - if((async->ip_version != CURL_IPRESOLVE_V4) && Curl_ipv6works(data)) { - /* The stack seems to be IPv6-enabled */ - if(async->ip_version == CURL_IPRESOLVE_V6) - pf = PF_INET6; - else - pf = PF_UNSPEC; - } -#endif /* CURLRES_IPV6 */ - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = pf; - hints.ai_socktype = - (Curl_conn_get_transport(data, data->conn) == TRNSPRT_TCP) ? - SOCK_STREAM : SOCK_DGRAM; - - /* fire up a new resolver thread! */ - if(async_thrdd_init(data, async, &hints)) - return CURLE_OK; - - failf(data, "getaddrinfo() thread failed to start"); - return CURLE_FAILED_INIT; -} - -#endif /* !HAVE_GETADDRINFO */ - #endif /* USE_RESOLV_THREADED */ diff --git a/lib/asyn.h b/lib/asyn.h index 18ac5caa60..f4057d2419 100644 --- a/lib/asyn.h +++ b/lib/asyn.h @@ -32,6 +32,7 @@ struct Curl_easy; struct Curl_dns_entry; struct Curl_resolv_async; +struct Curl_multi; #ifdef CURLRES_ASYNCH @@ -169,39 +170,12 @@ CURLcode Curl_async_ares_set_dns_local_ip6(struct Curl_easy *data); #endif /* USE_RESOLV_ARES */ #ifdef USE_RESOLV_THREADED -/* async resolving implementation using POSIX threads */ -#include "curl_threads.h" -/* Context for threaded address resolver */ -struct async_thrdd_addr_ctx { - curl_thread_t thread_hnd; - char *hostname; /* hostname to resolve, Curl_async.hostname - duplicate */ - curl_mutex_t mutx; -#ifndef CURL_DISABLE_SOCKETPAIR - curl_socket_t sock_pair[2]; /* eventfd/pipes/socket pair */ -#endif - struct Curl_addrinfo *res; -#ifdef HAVE_GETADDRINFO - struct addrinfo hints; -#endif - struct curltime start; - timediff_t interval_end; - unsigned int poll_interval; - int port; - int sock_error; - int ref_count; - BIT(thrd_done); - BIT(do_abort); -}; +struct async_thrdd_item; /* Context for threaded resolver */ struct async_thrdd_ctx { - /* `addr` is a pointer since this memory is shared with a started - * thread. Since threads cannot be killed, we use reference counting - * so that we can "release" our pointer to this memory while the - * thread is still running. */ - struct async_thrdd_addr_ctx *addr; + struct async_thrdd_item *resolved; #if defined(USE_HTTPSRR) && defined(USE_ARES) struct { ares_channel channel; @@ -210,6 +184,8 @@ struct async_thrdd_ctx { BIT(done); } rr; #endif + BIT(queued); + BIT(done); }; void Curl_async_thrdd_shutdown(struct Curl_easy *data, @@ -217,6 +193,18 @@ void Curl_async_thrdd_shutdown(struct Curl_easy *data, void Curl_async_thrdd_destroy(struct Curl_easy *data, struct Curl_resolv_async *async); +CURLcode Curl_async_thrdd_multi_init(struct Curl_multi *multi, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms); +void Curl_async_thrdd_multi_destroy(struct Curl_multi *multi, bool join); +void Curl_async_thrdd_multi_process(struct Curl_multi *multi); + +CURLcode Curl_async_thrdd_multi_set_props(struct Curl_multi *multi, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms); + #endif /* USE_RESOLV_THREADED */ #ifndef CURL_DISABLE_DOH @@ -248,10 +236,15 @@ struct Curl_resolv_async { #ifndef CURL_DISABLE_DOH struct doh_probes *doh; /* DoH specific data for this request */ #endif + struct curltime start; + timediff_t interval_end; + timediff_t timeout_ms; + uint32_t poll_interval; uint32_t id; /* unique id per easy handle of the resolve operation */ - /* what is being resolved */ + /* what is being resolved */ uint16_t port; uint8_t ip_version; + uint8_t transport; char hostname[1]; }; diff --git a/lib/cf-socket.c b/lib/cf-socket.c index 8025f9a910..091591e5db 100644 --- a/lib/cf-socket.c +++ b/lib/cf-socket.c @@ -530,7 +530,8 @@ CURLcode Curl_parse_interface(const char *input, #ifndef CURL_DISABLE_BINDLOCAL static CURLcode bindlocal(struct Curl_easy *data, struct connectdata *conn, - curl_socket_t sockfd, int af, unsigned int scope) + curl_socket_t sockfd, int af, unsigned int scope, + uint8_t transport) { struct Curl_sockaddr_storage sa; struct sockaddr *sock = (struct sockaddr *)&sa; /* bind to this address */ @@ -648,7 +649,7 @@ static CURLcode bindlocal(struct Curl_easy *data, struct connectdata *conn, ip_version = CURL_IPRESOLVE_V6; #endif - (void)Curl_resolv_blocking(data, host, 80, ip_version, &h); + (void)Curl_resolv_blocking(data, host, 80, ip_version, transport, &h); if(h) { int h_af = h->addr->ai_family; /* convert the resolved address, sizeof myhost >= INET_ADDRSTRLEN */ @@ -1143,7 +1144,8 @@ static CURLcode cf_socket_open(struct Curl_cfilter *cf, #endif ) { result = bindlocal(data, cf->conn, ctx->sock, ctx->addr.family, - Curl_ipv6_scope(&ctx->addr.curl_sa_addr)); + Curl_ipv6_scope(&ctx->addr.curl_sa_addr), + ctx->transport); if(result) { if(result == CURLE_UNSUPPORTED_PROTOCOL) { /* The address family is not supported on this interface. diff --git a/lib/cfilters.c b/lib/cfilters.c index bbc80525ce..9e0e63b53c 100644 --- a/lib/cfilters.c +++ b/lib/cfilters.c @@ -700,6 +700,18 @@ unsigned char Curl_conn_get_transport(struct Curl_easy *data, return Curl_conn_cf_get_transport(cf, data); } +int Curl_socktype_for_transport(uint8_t transport) +{ + switch(transport) { + case TRNSPRT_TCP: + return SOCK_STREAM; + case TRNSPRT_UNIX: + return SOCK_STREAM; + default: /* UDP and QUIC */ + return SOCK_DGRAM; + } +} + const char *Curl_conn_get_alpn_negotiated(struct Curl_easy *data, struct connectdata *conn) { diff --git a/lib/cfilters.h b/lib/cfilters.h index 622c694866..9d83331de6 100644 --- a/lib/cfilters.h +++ b/lib/cfilters.h @@ -345,6 +345,8 @@ bool Curl_conn_cf_needs_flush(struct Curl_cfilter *cf, unsigned char Curl_conn_cf_get_transport(struct Curl_cfilter *cf, struct Curl_easy *data); +int Curl_socktype_for_transport(uint8_t transport); + const char *Curl_conn_cf_get_alpn_negotiated(struct Curl_cfilter *cf, struct Curl_easy *data); diff --git a/lib/easy.c b/lib/easy.c index a6ab3d7ae4..c0147da6b4 100644 --- a/lib/easy.c +++ b/lib/easy.c @@ -779,8 +779,9 @@ static CURLcode easy_perform(struct Curl_easy *data, bool events) if(multi->in_callback) return CURLE_RECURSIVE_API_CALL; - /* Copy the MAXCONNECTS option to the multi handle */ + /* Copy relevant easy options to the multi handle */ curl_multi_setopt(multi, CURLMOPT_MAXCONNECTS, (long)data->set.maxconnects); + curl_multi_setopt(multi, CURLMOPT_QUICK_EXIT, (long)data->set.quick_exit); data->multi_easy = NULL; /* pretend it does not exist */ mresult = curl_multi_add_handle(multi, data); diff --git a/lib/ftp.c b/lib/ftp.c index 5677aa2313..653d5ca86a 100644 --- a/lib/ftp.c +++ b/lib/ftp.c @@ -1063,6 +1063,7 @@ static CURLcode ftp_port_resolve_host(struct Curl_easy *data, *resp = NULL; result = Curl_resolv_blocking(data, host, 0, conn->ip_version, + Curl_conn_get_transport(data, conn), dns_entryp); if(result) failf(data, "failed to resolve the address provided to PORT: %s", host); @@ -2163,6 +2164,7 @@ static CURLcode ftp_state_pasv_resp(struct Curl_easy *data, (void)Curl_resolv_blocking(data, host_name, ipquad.remote_port, is_ipv6 ? CURL_IPRESOLVE_V6 : CURL_IPRESOLVE_V4, + Curl_conn_get_transport(data, conn), &dns); /* we connect to the proxy's port */ connectport = (unsigned short)ipquad.remote_port; @@ -2187,7 +2189,9 @@ static CURLcode ftp_state_pasv_resp(struct Curl_easy *data, goto error; } - (void)Curl_resolv_blocking(data, newhost, newport, conn->ip_version, &dns); + (void)Curl_resolv_blocking(data, newhost, newport, conn->ip_version, + Curl_conn_get_transport(data, conn), + &dns); connectport = newport; /* we connect to the remote port */ if(!dns) { diff --git a/lib/hostip.c b/lib/hostip.c index ae691cce76..898e03afab 100644 --- a/lib/hostip.c +++ b/lib/hostip.c @@ -373,7 +373,9 @@ static bool can_resolve_ip_version(struct Curl_easy *data, int ip_version) static CURLcode hostip_async_new(struct Curl_easy *data, const char *hostname, uint16_t port, - uint8_t ip_version) + uint8_t ip_version, + uint8_t transport, + timediff_t timeout_ms) { struct Curl_resolv_async *async; size_t hostlen = strlen(hostname); @@ -389,6 +391,9 @@ static CURLcode hostip_async_new(struct Curl_easy *data, async->id = data->state.next_async_id++; async->port = port; async->ip_version = ip_version; + async->transport = transport; + async->start = *Curl_pgrs_now(data); + async->timeout_ms = timeout_ms; if(hostlen) memcpy(async->hostname, hostname, hostlen); @@ -401,6 +406,8 @@ static CURLcode hostip_resolv(struct Curl_easy *data, const char *hostname, uint16_t port, uint8_t ip_version, + uint8_t transport, + timediff_t timeout_ms, bool allowDOH, struct Curl_dns_entry **entry) { @@ -415,6 +422,8 @@ static CURLcode hostip_resolv(struct Curl_easy *data, #ifdef USE_CURL_ASYNC if(data->state.async) Curl_async_destroy(data); +#else + (void)timeout_ms; #endif #ifndef CURL_DISABLE_DOH @@ -463,7 +472,8 @@ static CURLcode hostip_resolv(struct Curl_easy *data, int st; #ifdef CURLRES_ASYNCH if(!data->state.async) { - result = hostip_async_new(data, hostname, port, ip_version); + result = hostip_async_new(data, hostname, port, ip_version, + transport, timeout_ms); if(result) goto error; } @@ -501,7 +511,8 @@ static CURLcode hostip_resolv(struct Curl_easy *data, #ifndef CURL_DISABLE_DOH else if(!Curl_is_ipaddr(hostname) && allowDOH && data->set.doh) { if(!data->state.async) { - result = hostip_async_new(data, hostname, port, ip_version); + result = hostip_async_new(data, hostname, port, ip_version, + transport, timeout_ms); if(result) goto error; } @@ -518,7 +529,8 @@ static CURLcode hostip_resolv(struct Curl_easy *data, #ifdef CURLRES_ASYNCH if(!data->state.async) { - result = hostip_async_new(data, hostname, port, ip_version); + result = hostip_async_new(data, hostname, port, ip_version, + transport, timeout_ms); if(result) goto error; } @@ -526,7 +538,7 @@ static CURLcode hostip_resolv(struct Curl_easy *data, respwait = TRUE; #else respwait = FALSE; /* no async waiting here */ - addr = Curl_sync_getaddrinfo(data, hostname, port, ip_version); + addr = Curl_sync_getaddrinfo(data, hostname, port, ip_version, transport); if(addr) result = CURLE_OK; #endif @@ -577,13 +589,15 @@ CURLcode Curl_resolv_blocking(struct Curl_easy *data, const char *hostname, uint16_t port, uint8_t ip_version, + uint8_t transport, struct Curl_dns_entry **pdns) { CURLcode result; DEBUGASSERT(hostname && *hostname); *pdns = NULL; /* We cannot do a blocking resolve using DoH currently */ - result = hostip_resolv(data, hostname, port, ip_version, FALSE, pdns); + result = hostip_resolv(data, hostname, port, ip_version, + transport, 0, FALSE, pdns); switch(result) { case CURLE_OK: DEBUGASSERT(*pdns); @@ -619,7 +633,8 @@ static CURLcode resolv_alarm_timeout(struct Curl_easy *data, const char *hostname, uint16_t port, uint8_t ip_version, - timediff_t timeoutms, + uint8_t transport, + timediff_t timeout_ms, struct Curl_dns_entry **entry) { #ifdef HAVE_SIGACTION @@ -636,14 +651,14 @@ static CURLcode resolv_alarm_timeout(struct Curl_easy *data, CURLcode result; DEBUGASSERT(hostname && *hostname); - DEBUGASSERT(timeoutms > 0); + DEBUGASSERT(timeout_ms > 0); DEBUGASSERT(!data->set.no_signal); #ifndef CURL_DISABLE_DOH DEBUGASSERT(!data->set.doh); #endif *entry = NULL; - timeout = (timeoutms > LONG_MAX) ? LONG_MAX : (long)timeoutms; + timeout = (timeout_ms > LONG_MAX) ? LONG_MAX : (long)timeout_ms; if(timeout < 1000) { /* The alarm() function only provides integer second resolution, so if we want to wait less than one second we must bail out already now. */ @@ -696,7 +711,8 @@ static CURLcode resolv_alarm_timeout(struct Curl_easy *data, /* Perform the actual name resolution. This might be interrupted by an * alarm if it takes too long. */ - result = hostip_resolv(data, hostname, port, ip_version, TRUE, entry); + result = hostip_resolv(data, hostname, port, ip_version, transport, + timeout_ms, TRUE, entry); clean_up: if(!prev_alarm) @@ -772,33 +788,35 @@ CURLcode Curl_resolv(struct Curl_easy *data, const char *hostname, uint16_t port, uint8_t ip_version, - timediff_t timeoutms, + uint8_t transport, + timediff_t timeout_ms, struct Curl_dns_entry **entry) { DEBUGASSERT(hostname && *hostname); *entry = NULL; - if(timeoutms < 0) + if(timeout_ms < 0) /* got an already expired timeout */ return CURLE_OPERATION_TIMEDOUT; #ifdef USE_ALARM_TIMEOUT - if(timeoutms && data->set.no_signal) { + if(timeout_ms && data->set.no_signal) { /* Cannot use ALARM when signals are disabled */ - timeoutms = 0; + timeout_ms = 0; } - if(timeoutms && !Curl_doh_wanted(data)) { + if(timeout_ms && !Curl_doh_wanted(data)) { return resolv_alarm_timeout(data, hostname, port, ip_version, - timeoutms, entry); + transport, timeout_ms, entry); } #endif /* !USE_ALARM_TIMEOUT */ #ifndef CURLRES_ASYNCH - if(timeoutms) + if(timeout_ms) infof(data, "timeout on name lookup is not supported"); #endif - return hostip_resolv(data, hostname, port, ip_version, TRUE, entry); + return hostip_resolv(data, hostname, port, ip_version, transport, + timeout_ms, TRUE, entry); } diff --git a/lib/hostip.h b/lib/hostip.h index 9fef885318..2a936b5647 100644 --- a/lib/hostip.h +++ b/lib/hostip.h @@ -91,13 +91,15 @@ CURLcode Curl_resolv(struct Curl_easy *data, const char *hostname, uint16_t port, uint8_t ip_version, - timediff_t timeoutms, + uint8_t transport, + timediff_t timeout_ms, struct Curl_dns_entry **pdns); CURLcode Curl_resolv_blocking(struct Curl_easy *data, const char *hostname, uint16_t port, uint8_t ip_version, + uint8_t transport, struct Curl_dns_entry **pdns); CURLcode Curl_resolv_timeout(struct Curl_easy *data, @@ -127,7 +129,8 @@ CURLcode Curl_resolver_error(struct Curl_easy *data, const char *detail); struct Curl_addrinfo *Curl_sync_getaddrinfo(struct Curl_easy *data, const char *hostname, uint16_t port, - uint8_t ip_version); + uint8_t ip_version, + uint8_t transport); #endif diff --git a/lib/hostip4.c b/lib/hostip4.c index fdf3825397..bff42f23a2 100644 --- a/lib/hostip4.c +++ b/lib/hostip4.c @@ -70,11 +70,13 @@ struct Curl_addrinfo *Curl_sync_getaddrinfo(struct Curl_easy *data, const char *hostname, uint16_t port, - uint8_t ip_version) + uint8_t ip_version, + uint8_t transport) { struct Curl_addrinfo *ai = NULL; (void)ip_version; + (void)transport; ai = Curl_ipv4_resolve_r(hostname, port); if(!ai) diff --git a/lib/hostip6.c b/lib/hostip6.c index 44f4f821f1..146086645a 100644 --- a/lib/hostip6.c +++ b/lib/hostip6.c @@ -65,7 +65,8 @@ struct Curl_addrinfo *Curl_sync_getaddrinfo(struct Curl_easy *data, const char *hostname, uint16_t port, - uint8_t ip_version) + uint8_t ip_version, + uint8_t transport) { struct addrinfo hints; struct Curl_addrinfo *res; @@ -83,8 +84,7 @@ struct Curl_addrinfo *Curl_sync_getaddrinfo(struct Curl_easy *data, memset(&hints, 0, sizeof(hints)); hints.ai_family = pf; - hints.ai_socktype = - (Curl_conn_get_transport(data, data->conn) == TRNSPRT_TCP) ? + hints.ai_socktype = (transport == TRNSPRT_TCP) ? SOCK_STREAM : SOCK_DGRAM; #ifndef USE_RESOLVE_ON_IPS diff --git a/lib/multi.c b/lib/multi.c index 482663edd5..139cad3ada 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -307,10 +307,24 @@ struct Curl_multi *Curl_multi_handle(uint32_t xfer_table_size, goto error; #endif +#ifdef USE_RESOLV_THREADED + if(xfer_table_size < CURL_XFER_TABLE_SIZE) { /* easy multi */ + if(Curl_async_thrdd_multi_init(multi, 0, 2, 10)) + goto error; + } + else { /* real multi handle */ + if(Curl_async_thrdd_multi_init(multi, 0, 20, 2000)) + goto error; + } +#endif + return multi; error: +#ifdef USE_RESOLV_THREADED + Curl_async_thrdd_multi_destroy(multi, TRUE); +#endif Curl_multi_ev_cleanup(multi); Curl_hash_destroy(&multi->proto_hash); Curl_dnscache_destroy(&multi->dnscache); @@ -2522,6 +2536,9 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, Curl_uint32_bset_remove(&multi->dirty, data->mid); if(data == multi->admin) { +#ifdef USE_RESOLV_THREADED + Curl_async_thrdd_multi_process(multi); +#endif Curl_cshutdn_perform(&multi->cshutdn, multi->admin, sigpipe_ctx); return CURLM_OK; } @@ -2951,6 +2968,9 @@ CURLMcode curl_multi_cleanup(CURLM *m) } while(Curl_uint32_tbl_next(&multi->xfers, mid, &mid, &entry)); } +#ifdef USE_RESOLV_THREADED + Curl_async_thrdd_multi_destroy(multi, !multi->quick_exit); +#endif Curl_cpool_destroy(&multi->cpool); Curl_cshutdn_destroy(&multi->cshutdn, multi->admin); if(multi->admin) { @@ -3347,6 +3367,34 @@ CURLMcode curl_multi_setopt(CURLM *m, CURLMoption option, ...) case CURLMOPT_NOTIFYDATA: multi->ntfy.ntfy_cb_data = va_arg(param, void *); break; + case CURLMOPT_RESOLVE_THREADS_MAX: +#ifdef USE_RESOLV_THREADED + uarg = va_arg(param, long); + if((uarg <= 0) || (uarg > UINT32_MAX)) + mresult = CURLM_BAD_FUNCTION_ARGUMENT; + else { + CURLcode result = Curl_async_thrdd_multi_set_props( + multi, 0, (uint32_t)uarg, 2000); + switch(result) { + case CURLE_OK: + mresult = CURLM_OK; + break; + case CURLE_BAD_FUNCTION_ARGUMENT: + mresult = CURLM_BAD_FUNCTION_ARGUMENT; + break; + case CURLE_OUT_OF_MEMORY: + mresult = CURLM_OUT_OF_MEMORY; + break; + default: + mresult = CURLM_INTERNAL_ERROR; + break; + } + } +#endif + break; + case CURLMOPT_QUICK_EXIT: + multi->quick_exit = va_arg(param, long) ? 1 : 0; + break; default: mresult = CURLM_UNKNOWN_OPTION; break; diff --git a/lib/multihandle.h b/lib/multihandle.h index 4cb3f3eec4..f158f1d771 100644 --- a/lib/multihandle.h +++ b/lib/multihandle.h @@ -109,6 +109,9 @@ struct Curl_multi { struct Curl_dnscache dnscache; /* DNS cache */ struct Curl_ssl_scache *ssl_scache; /* TLS session pool */ +#ifdef USE_RESOLV_THREADED + struct curl_thrdq *resolv_thrdq; +#endif #ifdef USE_LIBPSL /* PSL cache. */ @@ -186,6 +189,7 @@ struct Curl_multi { BIT(xfer_buf_borrowed); /* xfer_buf is currently being borrowed */ BIT(xfer_ulbuf_borrowed); /* xfer_ulbuf is currently being borrowed */ BIT(xfer_sockbuf_borrowed); /* xfer_sockbuf is currently being borrowed */ + BIT(quick_exit); /* do not join threads on cleanup */ #ifdef DEBUGBUILD BIT(warned); /* true after user warned of DEBUGBUILD */ #endif diff --git a/lib/socks.c b/lib/socks.c index 6f60c43016..7d1ac6e95c 100644 --- a/lib/socks.c +++ b/lib/socks.c @@ -324,7 +324,9 @@ static CURLproxycode socks4_resolving(struct socks_state *sx, DEBUGASSERT(sx->hostname && *sx->hostname); result = Curl_resolv(data, sx->hostname, sx->remote_port, - cf->conn->ip_version, 0, &dns); + cf->conn->ip_version, + Curl_conn_cf_get_transport(cf, data), + 0, &dns); if(result == CURLE_AGAIN) { CURL_TRC_CF(data, cf, "SOCKS4 non-blocking resolve of %s", sx->hostname); return CURLPX_OK; @@ -853,7 +855,9 @@ static CURLproxycode socks5_resolving(struct socks_state *sx, DEBUGASSERT(sx->hostname && *sx->hostname); result = Curl_resolv(data, sx->hostname, sx->remote_port, - cf->conn->ip_version, 0, &dns); + cf->conn->ip_version, + Curl_conn_cf_get_transport(cf, data), + 0, &dns); if(result == CURLE_AGAIN) { CURL_TRC_CF(data, cf, "SOCKS5 non-blocking resolve of %s", sx->hostname); return CURLPX_OK; diff --git a/lib/thrdpool.c b/lib/thrdpool.c index be6363fd1a..2cd87094bb 100644 --- a/lib/thrdpool.c +++ b/lib/thrdpool.c @@ -121,7 +121,8 @@ static CURL_THREAD_RETURN_T CURL_STDCALL thrdslot_run(void *arg) tpool->fn_return(item, tpool->aborted ? NULL : tpool->fn_user_data); } - if(tpool->aborted) + if(tpool->aborted || + (Curl_llist_count(&tpool->slots) > tpool->max_threads)) goto out; tslot->idle = TRUE; @@ -235,6 +236,63 @@ static bool thrdpool_unlink(struct curl_thrdpool *tpool, bool locked) return TRUE; } +static CURLcode thrdpool_signal(struct curl_thrdpool *tpool, + uint32_t nthreads) +{ + struct Curl_llist_node *e, *n; + CURLcode result = CURLE_OK; + + DEBUGASSERT(!tpool->aborted); + thrdpool_join_zombies(tpool); + + for(e = Curl_llist_head(&tpool->slots); e && nthreads; e = n) { + struct thrdslot *tslot = Curl_node_elem(e); + n = Curl_node_next(e); + if(tslot->idle) { + Curl_cond_signal(&tslot->await); + --nthreads; + } + else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) { + /* starting thread, queries for work soon. */ + --nthreads; + } + } + + while(nthreads && !result && + Curl_llist_count(&tpool->slots) < tpool->max_threads) { + result = thrdslot_start(tpool); + if(result) + break; + --nthreads; + } + + return result; +} + +CURLcode Curl_thrdpool_set_props(struct curl_thrdpool *tpool, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms) +{ + CURLcode result = CURLE_OK; + size_t running; + + if(!max_threads || (min_threads > max_threads)) + return CURLE_BAD_FUNCTION_ARGUMENT; + + Curl_mutex_acquire(&tpool->lock); + tpool->min_threads = min_threads; + tpool->max_threads = max_threads; + tpool->idle_time_ms = idle_time_ms; + running = Curl_llist_count(&tpool->slots); + if(tpool->min_threads > running) { + result = thrdpool_signal(tpool, tpool->min_threads - (uint32_t)running); + } + Curl_mutex_release(&tpool->lock); + + return result; +} + CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool, const char *name, uint32_t min_threads, @@ -257,9 +315,6 @@ CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool, Curl_cond_init(&tpool->await); Curl_llist_init(&tpool->slots, NULL); Curl_llist_init(&tpool->zombies, NULL); - tpool->min_threads = min_threads; - tpool->max_threads = max_threads; - tpool->idle_time_ms = idle_time_ms; tpool->fn_take = fn_take; tpool->fn_process = fn_process; tpool->fn_return = fn_return; @@ -269,10 +324,8 @@ CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool, if(!tpool->name) goto out; - if(tpool->min_threads) - result = Curl_thrdpool_signal(tpool, tpool->min_threads); - else - result = CURLE_OK; + result = Curl_thrdpool_set_props(tpool, min_threads, max_threads, + idle_time_ms); out: if(result && tpool) { @@ -316,35 +369,10 @@ void Curl_thrdpool_destroy(struct curl_thrdpool *tpool, bool join) CURLcode Curl_thrdpool_signal(struct curl_thrdpool *tpool, uint32_t nthreads) { - struct Curl_llist_node *e, *n; - CURLcode result = CURLE_OK; + CURLcode result; Curl_mutex_acquire(&tpool->lock); - DEBUGASSERT(!tpool->aborted); - - thrdpool_join_zombies(tpool); - - for(e = Curl_llist_head(&tpool->slots); e && nthreads; e = n) { - struct thrdslot *tslot = Curl_node_elem(e); - n = Curl_node_next(e); - if(tslot->idle) { - Curl_cond_signal(&tslot->await); - --nthreads; - } - else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) { - /* starting thread, queries for work soon. */ - --nthreads; - } - } - - while(nthreads && !result && - Curl_llist_count(&tpool->slots) < tpool->max_threads) { - result = thrdslot_start(tpool); - if(result) - break; - --nthreads; - } - + result = thrdpool_signal(tpool, nthreads); Curl_mutex_release(&tpool->lock); return result; } diff --git a/lib/thrdpool.h b/lib/thrdpool.h index 8fa40538a3..1aba5c2605 100644 --- a/lib/thrdpool.h +++ b/lib/thrdpool.h @@ -91,6 +91,12 @@ CURLcode Curl_thrdpool_signal(struct curl_thrdpool *tpool, uint32_t nthreads); CURLcode Curl_thrdpool_await_idle(struct curl_thrdpool *tpool, uint32_t timeout_ms); +/* Change the properties of a threadpool. */ +CURLcode Curl_thrdpool_set_props(struct curl_thrdpool *tpool, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms); + #ifdef CURLVERBOSE void Curl_thrdpool_trace(struct curl_thrdpool *tpool, struct Curl_easy *data, diff --git a/lib/thrdqueue.c b/lib/thrdqueue.c index e36279ef12..c2bd628a4b 100644 --- a/lib/thrdqueue.c +++ b/lib/thrdqueue.c @@ -282,16 +282,20 @@ CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item, result = CURLE_OUT_OF_MEMORY; goto out; } + item = NULL; Curl_llist_append(&tqueue->sendq, qitem, &qitem->node); - result = CURLE_OK; signals = Curl_llist_count(&tqueue->sendq); + result = CURLE_OK; } out: Curl_mutex_release(&tqueue->lock); - /* Signal thread pool unlocked to avoid deadlocks */ + /* Signal thread pool unlocked to avoid deadlocks. Since we added + * item to the queue already, it might have been taken for processing + * already. Any error in signalling the pool cannot be reported to + * the caller since it needs to give up ownership of item. */ if(!result && signals) - result = Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals); + (void)Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals); return result; } @@ -357,6 +361,27 @@ CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue, return Curl_thrdpool_await_idle(tqueue->tpool, timeout_ms); } +CURLcode Curl_thrdq_set_props(struct curl_thrdq *tqueue, + uint32_t max_len, + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms) +{ + CURLcode result; + size_t signals; + + Curl_mutex_acquire(&tqueue->lock); + tqueue->send_max_len = max_len; + signals = Curl_llist_count(&tqueue->sendq); + Curl_mutex_release(&tqueue->lock); + + result = Curl_thrdpool_set_props(tqueue->tpool, min_threads, + max_threads, idle_time_ms); + if(!result && signals) + result = Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals); + return result; +} + #ifdef CURLVERBOSE void Curl_thrdq_trace(struct curl_thrdq *tqueue, struct Curl_easy *data, diff --git a/lib/thrdqueue.h b/lib/thrdqueue.h index 516fcc043f..8f5a69dd6c 100644 --- a/lib/thrdqueue.h +++ b/lib/thrdqueue.h @@ -103,6 +103,12 @@ void Curl_thrdq_clear(struct curl_thrdq *tqueue, CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue, uint32_t timeout_ms); +CURLcode Curl_thrdq_set_props(struct curl_thrdq *tqueue, + uint32_t max_len, /* 0 for unlimited */ + uint32_t min_threads, + uint32_t max_threads, + uint32_t idle_time_ms); + #ifdef CURLVERBOSE void Curl_thrdq_trace(struct curl_thrdq *tqueue, struct Curl_easy *data, diff --git a/lib/url.c b/lib/url.c index cffd10215e..953064c6f1 100644 --- a/lib/url.c +++ b/lib/url.c @@ -3039,7 +3039,8 @@ static CURLcode resolve_server(struct Curl_easy *data, } result = Curl_resolv(data, ehost->name, eport, - conn->ip_version, timeout_ms, pdns); + conn->ip_version, conn->transport_wanted, + timeout_ms, pdns); DEBUGASSERT(!result || !*pdns); if(!result) { /* resolved right away, either sync or from dnscache */ DEBUGASSERT(*pdns); diff --git a/src/tool_operate.c b/src/tool_operate.c index c006ddfa83..afb89b47fc 100644 --- a/src/tool_operate.c +++ b/src/tool_operate.c @@ -1867,6 +1867,9 @@ static CURLcode parallel_transfers(CURLSH *share) if(!s->multi) return CURLE_OUT_OF_MEMORY; +#ifndef DEBUGBUILD + (void)curl_multi_setopt(s->multi, CURLMOPT_QUICK_EXIT, 1L); +#endif (void)curl_multi_setopt(s->multi, CURLMOPT_NOTIFYFUNCTION, mnotify); (void)curl_multi_setopt(s->multi, CURLMOPT_NOTIFYDATA, s); (void)curl_multi_notify_enable(s->multi, CURLMNOTIFY_INFO_READ); diff --git a/tests/data/test2500 b/tests/data/test2500 index 3c5b15cea5..d870d33887 100644 --- a/tests/data/test2500 +++ b/tests/data/test2500 @@ -72,7 +72,7 @@ via: 1.1 nghttpx s/^server: nghttpx.*\r?\n// -Allocations: 155 +Allocations: 160 Maximum allocated: 1800000 diff --git a/tests/http/Makefile.am b/tests/http/Makefile.am index f5257a377b..52cdd510cb 100644 --- a/tests/http/Makefile.am +++ b/tests/http/Makefile.am @@ -63,6 +63,7 @@ EXTRA_DIST = \ test_18_methods.py \ test_19_shutdown.py \ test_20_websockets.py \ + test_21_resolve.py \ test_30_vsftpd.py \ test_31_vsftpds.py \ test_32_ftps_vsftpd.py \ diff --git a/tests/http/test_21_resolve.py b/tests/http/test_21_resolve.py new file mode 100644 index 0000000000..b211c772f5 --- /dev/null +++ b/tests/http/test_21_resolve.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +#*************************************************************************** +# _ _ ____ _ +# Project ___| | | | _ \| | +# / __| | | | |_) | | +# | (__| |_| | _ <| |___ +# \___|\___/|_| \_\_____| +# +# Copyright (C) Daniel Stenberg, , et al. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at https://curl.se/docs/copyright.html. +# +# You may opt to use, copy, modify, merge, publish, distribute and/or sell +# copies of the Software, and permit persons to whom the Software is +# furnished to do so, under the terms of the COPYING file. +# +# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY +# KIND, either express or implied. +# +# SPDX-License-Identifier: curl +# +########################################################################### +# +import logging +import os +from datetime import timedelta +import pytest + +from testenv import Env, CurlClient, LocalClient + + +log = logging.getLogger(__name__) + + +@pytest.mark.skipif(condition=not Env.curl_is_debug(), reason="needs curl debug") +@pytest.mark.skipif(condition=Env.curl_uses_lib('c-ares'), reason="c-ares resolver skipped") +@pytest.mark.skipif(condition=not Env.curl_has_feature('AsynchDNS'), reason="needs AsynchDNS") +class TestResolve: + + @pytest.fixture(autouse=True, scope='class') + def _class_scope(self, env, httpd): + indir = httpd.docs_dir + env.make_data_file(indir=indir, fname="data-0k", fsize=0) + + # use .invalid host name that should never resolv + def test_21_01_resolv_invalid_one(self, env: Env, httpd, nghttpx): + count = 1 + run_env = os.environ.copy() + run_env['CURL_DBG_RESOLV_FAIL_DELAY'] = '5' + curl = CurlClient(env=env, run_env=run_env, force_resolv=False) + url = f'https://test-{count}.http.curl.invalid/' + r = curl.http_download(urls=[url], with_stats=True) + r.check_exit_code(6) + r.check_stats(count=count, http_status=0, exitcode=6) + + # use .invalid host name, one after the other + @pytest.mark.parametrize("delay_ms", [1, 50]) + def test_21_02_resolv_invalid_serial(self, env: Env, delay_ms, httpd, nghttpx): + count = 10 + run_env = os.environ.copy() + run_env['CURL_DBG_RESOLV_FAIL_DELAY'] = f'{delay_ms}' + curl = CurlClient(env=env, run_env=run_env, force_resolv=False) + urls = [ f'https://test-{i}.http.curl.invalid/' for i in range(count)] + r = curl.http_download(urls=urls, with_stats=True) + r.check_exit_code(6) + r.check_stats(count=count, http_status=0, exitcode=6) + + # use .invalid host name, parallel + @pytest.mark.parametrize("delay_ms", [1, 50]) + def test_21_03_resolv_invalid_parallel(self, env: Env, delay_ms, httpd, nghttpx): + count = 20 + run_env = os.environ.copy() + run_env['CURL_DBG_RESOLV_FAIL_DELAY'] = f'{delay_ms}' + curl = CurlClient(env=env, run_env=run_env, force_resolv=False) + urls = [ f'https://test-{i}.http.curl.invalid/' for i in range(count)] + r = curl.http_download(urls=urls, with_stats=True, extra_args=[ + '--parallel' + ]) + r.check_exit_code(6) + r.check_stats(count=count, http_status=0, exitcode=6) + + # resolve first url with ipv6 only and fail that, resolve second + # with ipv*, should succeed. + def test_21_04_resolv_inv_v6(self, env: Env, httpd): + count = 2 + run_env = os.environ.copy() + run_env['CURL_DBG_RESOLV_FAIL_IPV6'] = '1' + url = f'https://localhost:{env.https_port}/' + client = LocalClient(name='cli_hx_download', env=env, run_env=run_env) + if not client.exists(): + pytest.skip(f'example client not built: {client.name}') + dfiles = [client.download_file(i) for i in range(count)] + self._clean_files(dfiles) + # let the first URL resolve via ipv6 only, which we force to fail + r = client.run(args=[ + '-n', f'{count}', '-6', '-C', env.ca.cert_file, url + ]) + r.check_exit_code(6) + assert not os.path.exists(dfiles[0]) + assert os.path.exists(dfiles[1]) + + # use .invalid host name, parallel, single resolve thread + def test_21_05_resolv_single_thread(self, env: Env, httpd, nghttpx): + count = 10 + delay_ms = 50 + run_env = os.environ.copy() + run_env['CURL_DBG_RESOLV_FAIL_DELAY'] = f'{delay_ms}' + run_env['CURL_DBG_RESOLV_MAX_THREADS'] = '1' + curl = CurlClient(env=env, run_env=run_env, force_resolv=False) + urls = [ f'https://test-{i}.http.curl.invalid/' for i in range(count)] + r = curl.http_download(urls=urls, with_stats=True, extra_args=[ + '--parallel', '-6' + ]) + r.check_exit_code(6) + r.check_stats(count=count, http_status=0, exitcode=6) + assert r.duration > timedelta(milliseconds=count * delay_ms), f'{r}' + + def _clean_files(self, files): + for file in files: + if os.path.exists(file): + os.remove(file) diff --git a/tests/http/testenv/curl.py b/tests/http/testenv/curl.py index 1f812a1c2e..4528048488 100644 --- a/tests/http/testenv/curl.py +++ b/tests/http/testenv/curl.py @@ -630,6 +630,7 @@ class CurlClient: with_dtrace: bool = False, with_perf: bool = False, with_flame: bool = False, + force_resolv: bool = True, socks_args: Optional[List[str]] = None): self.env = env self._timeout = timeout if timeout else env.test_timeout @@ -659,6 +660,7 @@ class CurlClient: self._silent = silent self._run_env = run_env self._server_addr = server_addr if server_addr else '127.0.0.1' + self._force_resolv = force_resolv self._rmrf(self._run_dir) self._mkpath(self._run_dir) @@ -1086,7 +1088,6 @@ class CurlClient: def _raw(self, urls, intext='', timeout=None, options=None, insecure=False, alpn_proto: Optional[str] = None, url_options=None, - force_resolve=True, with_stats=False, with_headers=True, def_tracing=True, @@ -1094,9 +1095,8 @@ class CurlClient: with_tcpdump=False): args = self._complete_args( urls=urls, timeout=timeout, options=options, insecure=insecure, - alpn_proto=alpn_proto, force_resolve=force_resolve, - with_headers=with_headers, def_tracing=def_tracing, - url_options=url_options) + alpn_proto=alpn_proto, with_headers=with_headers, + def_tracing=def_tracing, url_options=url_options) r = self._run(args, intext=intext, with_stats=with_stats, with_profile=with_profile, with_tcpdump=with_tcpdump) if r.exit_code == 0 and with_headers: @@ -1104,8 +1104,7 @@ class CurlClient: return r def _complete_args(self, urls, timeout=None, options=None, - insecure=False, force_resolve=True, - alpn_proto: Optional[str] = None, + insecure=False, alpn_proto: Optional[str] = None, url_options=None, with_headers: bool = True, def_tracing: bool = True): @@ -1115,6 +1114,8 @@ class CurlClient: if options is not None and '--resolve' in options: force_resolve = False + else: + force_resolve = self._force_resolv args = [self._curl, "-s", "--path-as-is"] if 'CURL_TEST_EVENT' in os.environ: