mirror of
https://github.com/curl/curl.git
synced 2026-04-11 12:01:42 +08:00
- allow to specify when they are wanted on starting a resolve - match dns cache entries accordingly. An entry which never tried to get HTTPS-RRs is no answer for a resolve that wants it. - fix late arrivals of resolve answers to match the "async" records that started them - if it still exists. - provide for multiple "async" resolves in a transfer at the same time. We may need to resolve an IP interface while the main connection resolve has not finished yet. - allow lookup of HTTPS-RR information as soon as it is available, even if A/AAAA queries are still ongoing. For this, the "async" infrastructure is changed: - Defined bits for DNS queries `CURL_DNSQ_A`, `CURL_DNSQ_AAAA` and `CURL_DNSQ_HTTPS`. These replace `ip_version` which says nothing about HTTPS. Use them in dns cache entries for matching. - enhance the `async->id` to be a unique `uint32_t` for resolves inside one multi. This is weak, as the id may wrap around. However it is combined with the `mid` of the easy handle, making collisions highly unlikely. `data->state.async` is only accessed in few places where the mid/async-id match is performed. - vtls: for ECH supporting TLS backends (openssl, rustls, wolfssl), retrieve the HTTPS-RR information from the dns connection filter. Delay the connect if the HTTPS-RR is needed, but has not been resolved yet. The implementation of all this is complete for the threaded resolver. c-ares resolver and DoH do not take advantage of all new async features yet. To be done in separate PRs. Details: c-ares: cleanup settings and initialisation. Any ares channel is only being created on starting a resolve and propagating operations in setopt.c to the channel are not helpful. Changed threaded+ares pollset handling so that they do not overwrite each others `ASYNC_NAME` timeouts. Add trace name 'threads' for tracing thread queue and pool used by threaded resolver. Closes #21175
412 lines
12 KiB
C
412 lines
12 KiB
C
/***************************************************************************
|
|
* _ _ ____ _
|
|
* Project ___| | | | _ \| |
|
|
* / __| | | | |_) | |
|
|
* | (__| |_| | _ <| |___
|
|
* \___|\___/|_| \_\_____|
|
|
*
|
|
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
|
|
*
|
|
* This software is licensed as described in the file COPYING, which
|
|
* you should have received as part of this distribution. The terms
|
|
* are also available at https://curl.se/docs/copyright.html.
|
|
*
|
|
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
|
|
* copies of the Software, and permit persons to whom the Software is
|
|
* furnished to do so, under the terms of the COPYING file.
|
|
*
|
|
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
|
|
* KIND, either express or implied.
|
|
*
|
|
* SPDX-License-Identifier: curl
|
|
*
|
|
***************************************************************************/
|
|
#include "curl_setup.h"
|
|
|
|
#ifdef USE_THREADS
|
|
|
|
#include "llist.h"
|
|
#include "curl_threads.h"
|
|
#include "thrdpool.h"
|
|
#include "thrdqueue.h"
|
|
#include "curlx/timeval.h"
|
|
#ifdef CURLVERBOSE
|
|
#include "curl_trc.h"
|
|
#include "urldata.h"
|
|
#endif
|
|
|
|
|
|
struct curl_thrdq {
|
|
char *name;
|
|
curl_mutex_t lock;
|
|
curl_cond_t await;
|
|
struct Curl_llist sendq;
|
|
struct Curl_llist recvq;
|
|
struct curl_thrdpool *tpool;
|
|
Curl_thrdq_item_free_cb *fn_free;
|
|
Curl_thrdq_item_process_cb *fn_process;
|
|
Curl_thrdq_ev_cb *fn_event;
|
|
void *fn_user_data;
|
|
uint32_t send_max_len;
|
|
BIT(aborted);
|
|
};
|
|
|
|
struct thrdq_item {
|
|
struct Curl_llist_node node;
|
|
Curl_thrdq_item_free_cb *fn_free;
|
|
Curl_thrdq_item_process_cb *fn_process;
|
|
void *item;
|
|
struct curltime start;
|
|
timediff_t timeout_ms;
|
|
const char *description;
|
|
};
|
|
|
|
static struct thrdq_item *thrdq_item_create(struct curl_thrdq *tqueue,
|
|
void *item,
|
|
const char *description,
|
|
timediff_t timeout_ms)
|
|
{
|
|
struct thrdq_item *qitem;
|
|
|
|
qitem = curlx_calloc(1, sizeof(*qitem));
|
|
if(!qitem)
|
|
return NULL;
|
|
qitem->item = item;
|
|
qitem->description = description;
|
|
qitem->fn_free = tqueue->fn_free;
|
|
qitem->fn_process = tqueue->fn_process;
|
|
if(timeout_ms) {
|
|
qitem->start = curlx_now();
|
|
qitem->timeout_ms = timeout_ms;
|
|
}
|
|
return qitem;
|
|
}
|
|
|
|
static void thrdq_item_destroy(struct thrdq_item *qitem)
|
|
{
|
|
if(qitem->item)
|
|
qitem->fn_free(qitem->item);
|
|
curlx_free(qitem);
|
|
}
|
|
|
|
static void thrdq_item_list_dtor(void *user_data, void *elem)
|
|
{
|
|
(void)user_data;
|
|
thrdq_item_destroy(elem);
|
|
}
|
|
|
|
static void *thrdq_tpool_take(void *user_data, const char **pdescription,
|
|
timediff_t *ptimeout_ms)
|
|
{
|
|
struct curl_thrdq *tqueue = user_data;
|
|
struct thrdq_item *qitem = NULL;
|
|
struct Curl_llist_node *e;
|
|
Curl_thrdq_ev_cb *fn_event = NULL;
|
|
void *fn_user_data = NULL;
|
|
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
*pdescription = NULL;
|
|
*ptimeout_ms = 0;
|
|
if(!tqueue->aborted) {
|
|
e = Curl_llist_head(&tqueue->sendq);
|
|
if(e) {
|
|
struct curltime now = curlx_now();
|
|
timediff_t timeout_ms;
|
|
while(e) {
|
|
qitem = Curl_node_take_elem(e);
|
|
timeout_ms = (!qitem->timeout_ms) ? 0 :
|
|
(qitem->timeout_ms - curlx_ptimediff_ms(&now, &qitem->start));
|
|
if(timeout_ms < 0) {
|
|
/* timed out while queued, place on receive queue */
|
|
Curl_llist_append(&tqueue->recvq, qitem, &qitem->node);
|
|
fn_event = tqueue->fn_event;
|
|
fn_user_data = tqueue->fn_user_data;
|
|
qitem = NULL;
|
|
e = Curl_llist_head(&tqueue->sendq);
|
|
continue;
|
|
}
|
|
else {
|
|
*pdescription = qitem->description;
|
|
*ptimeout_ms = timeout_ms;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Curl_mutex_release(&tqueue->lock);
|
|
/* avoiding deadlocks */
|
|
if(fn_event)
|
|
fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data);
|
|
return qitem;
|
|
}
|
|
|
|
static void thrdq_tpool_return(void *item, void *user_data)
|
|
{
|
|
struct curl_thrdq *tqueue = user_data;
|
|
struct thrdq_item *qitem = item;
|
|
Curl_thrdq_ev_cb *fn_event = NULL;
|
|
void *fn_user_data = NULL;
|
|
|
|
if(!tqueue) {
|
|
thrdq_item_destroy(item);
|
|
return;
|
|
}
|
|
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
if(tqueue->aborted) {
|
|
thrdq_item_destroy(qitem);
|
|
}
|
|
else {
|
|
DEBUGASSERT(!Curl_node_llist(&qitem->node));
|
|
Curl_llist_append(&tqueue->recvq, qitem, &qitem->node);
|
|
fn_event = tqueue->fn_event;
|
|
fn_user_data = tqueue->fn_user_data;
|
|
}
|
|
Curl_mutex_release(&tqueue->lock);
|
|
/* avoiding deadlocks */
|
|
if(fn_event)
|
|
fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data);
|
|
}
|
|
|
|
static void thrdq_tpool_process(void *item)
|
|
{
|
|
struct thrdq_item *qitem = item;
|
|
qitem->fn_process(qitem->item);
|
|
}
|
|
|
|
static void thrdq_unlink(struct curl_thrdq *tqueue, bool locked, bool join)
|
|
{
|
|
DEBUGASSERT(tqueue->aborted);
|
|
if(tqueue->tpool) {
|
|
if(locked)
|
|
Curl_mutex_release(&tqueue->lock);
|
|
Curl_thrdpool_destroy(tqueue->tpool, join);
|
|
tqueue->tpool = NULL;
|
|
if(locked)
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
}
|
|
|
|
Curl_llist_destroy(&tqueue->sendq, NULL);
|
|
Curl_llist_destroy(&tqueue->recvq, NULL);
|
|
curlx_free(tqueue->name);
|
|
Curl_cond_destroy(&tqueue->await);
|
|
if(locked)
|
|
Curl_mutex_release(&tqueue->lock);
|
|
Curl_mutex_destroy(&tqueue->lock);
|
|
curlx_free(tqueue);
|
|
}
|
|
|
|
CURLcode Curl_thrdq_create(struct curl_thrdq **ptqueue,
|
|
const char *name,
|
|
uint32_t max_len,
|
|
uint32_t min_threads,
|
|
uint32_t max_threads,
|
|
uint32_t idle_time_ms,
|
|
Curl_thrdq_item_free_cb *fn_free,
|
|
Curl_thrdq_item_process_cb *fn_process,
|
|
Curl_thrdq_ev_cb *fn_event,
|
|
void *user_data)
|
|
{
|
|
struct curl_thrdq *tqueue;
|
|
CURLcode result = CURLE_OUT_OF_MEMORY;
|
|
|
|
tqueue = curlx_calloc(1, sizeof(*tqueue));
|
|
if(!tqueue)
|
|
goto out;
|
|
|
|
Curl_mutex_init(&tqueue->lock);
|
|
Curl_cond_init(&tqueue->await);
|
|
Curl_llist_init(&tqueue->sendq, thrdq_item_list_dtor);
|
|
Curl_llist_init(&tqueue->recvq, thrdq_item_list_dtor);
|
|
tqueue->fn_free = fn_free;
|
|
tqueue->fn_process = fn_process;
|
|
tqueue->fn_event = fn_event;
|
|
tqueue->fn_user_data = user_data;
|
|
tqueue->send_max_len = max_len;
|
|
|
|
tqueue->name = curlx_strdup(name);
|
|
if(!tqueue->name)
|
|
goto out;
|
|
|
|
result = Curl_thrdpool_create(&tqueue->tpool, name,
|
|
min_threads, max_threads, idle_time_ms,
|
|
thrdq_tpool_take,
|
|
thrdq_tpool_process,
|
|
thrdq_tpool_return,
|
|
tqueue);
|
|
|
|
out:
|
|
if(result && tqueue) {
|
|
tqueue->aborted = TRUE;
|
|
thrdq_unlink(tqueue, FALSE, TRUE);
|
|
tqueue = NULL;
|
|
}
|
|
*ptqueue = tqueue;
|
|
return result;
|
|
}
|
|
|
|
void Curl_thrdq_destroy(struct curl_thrdq *tqueue, bool join)
|
|
{
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
DEBUGASSERT(!tqueue->aborted);
|
|
tqueue->aborted = TRUE;
|
|
thrdq_unlink(tqueue, TRUE, join);
|
|
}
|
|
|
|
CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item,
|
|
const char *description, timediff_t timeout_ms)
|
|
{
|
|
CURLcode result = CURLE_AGAIN;
|
|
size_t signals = 0;
|
|
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
if(tqueue->aborted) {
|
|
DEBUGASSERT(0);
|
|
result = CURLE_SEND_ERROR;
|
|
goto out;
|
|
}
|
|
if(timeout_ms < 0) {
|
|
result = CURLE_OPERATION_TIMEDOUT;
|
|
goto out;
|
|
}
|
|
|
|
if(!tqueue->send_max_len ||
|
|
(Curl_llist_count(&tqueue->sendq) < tqueue->send_max_len)) {
|
|
struct thrdq_item *qitem = thrdq_item_create(tqueue, item, description,
|
|
timeout_ms);
|
|
if(!qitem) {
|
|
result = CURLE_OUT_OF_MEMORY;
|
|
goto out;
|
|
}
|
|
item = NULL;
|
|
Curl_llist_append(&tqueue->sendq, qitem, &qitem->node);
|
|
signals = Curl_llist_count(&tqueue->sendq);
|
|
result = CURLE_OK;
|
|
}
|
|
|
|
out:
|
|
Curl_mutex_release(&tqueue->lock);
|
|
/* Signal thread pool unlocked to avoid deadlocks. Since we added
|
|
* item to the queue already, it might have been taken for processing
|
|
* already. Any error in signalling the pool cannot be reported to
|
|
* the caller since it needs to give up ownership of item. */
|
|
if(!result && signals)
|
|
(void)Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals);
|
|
return result;
|
|
}
|
|
|
|
CURLcode Curl_thrdq_recv(struct curl_thrdq *tqueue, void **pitem)
|
|
{
|
|
CURLcode result = CURLE_AGAIN;
|
|
struct Curl_llist_node *e;
|
|
|
|
*pitem = NULL;
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
if(tqueue->aborted) {
|
|
DEBUGASSERT(0);
|
|
result = CURLE_RECV_ERROR;
|
|
goto out;
|
|
}
|
|
|
|
e = Curl_llist_head(&tqueue->recvq);
|
|
if(e) {
|
|
struct thrdq_item *qitem = Curl_node_take_elem(e);
|
|
*pitem = qitem->item;
|
|
qitem->item = NULL;
|
|
thrdq_item_destroy(qitem);
|
|
result = CURLE_OK;
|
|
}
|
|
out:
|
|
Curl_mutex_release(&tqueue->lock);
|
|
return result;
|
|
}
|
|
|
|
static void thrdq_llist_clean_matches(struct Curl_llist *llist,
|
|
Curl_thrdq_item_match_cb *fn_match,
|
|
void *match_data)
|
|
{
|
|
struct Curl_llist_node *e, *n;
|
|
struct thrdq_item *qitem;
|
|
|
|
for(e = Curl_llist_head(llist); e; e = n) {
|
|
n = Curl_node_next(e);
|
|
qitem = Curl_node_elem(e);
|
|
if(fn_match(qitem->item, match_data))
|
|
Curl_node_remove(e);
|
|
}
|
|
}
|
|
|
|
void Curl_thrdq_clear(struct curl_thrdq *tqueue,
|
|
Curl_thrdq_item_match_cb *fn_match,
|
|
void *match_data)
|
|
{
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
if(tqueue->aborted) {
|
|
DEBUGASSERT(0);
|
|
goto out;
|
|
}
|
|
thrdq_llist_clean_matches(&tqueue->sendq, fn_match, match_data);
|
|
thrdq_llist_clean_matches(&tqueue->recvq, fn_match, match_data);
|
|
out:
|
|
Curl_mutex_release(&tqueue->lock);
|
|
}
|
|
|
|
CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue,
|
|
uint32_t timeout_ms)
|
|
{
|
|
return Curl_thrdpool_await_idle(tqueue->tpool, timeout_ms);
|
|
}
|
|
|
|
CURLcode Curl_thrdq_set_props(struct curl_thrdq *tqueue,
|
|
uint32_t max_len,
|
|
uint32_t min_threads,
|
|
uint32_t max_threads,
|
|
uint32_t idle_time_ms)
|
|
{
|
|
CURLcode result;
|
|
size_t signals;
|
|
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
tqueue->send_max_len = max_len;
|
|
signals = Curl_llist_count(&tqueue->sendq);
|
|
Curl_mutex_release(&tqueue->lock);
|
|
|
|
result = Curl_thrdpool_set_props(tqueue->tpool, min_threads,
|
|
max_threads, idle_time_ms);
|
|
if(!result && signals)
|
|
result = Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals);
|
|
return result;
|
|
}
|
|
|
|
#ifdef CURLVERBOSE
|
|
void Curl_thrdq_trace(struct curl_thrdq *tqueue,
|
|
struct Curl_easy *data)
|
|
{
|
|
struct curl_trc_feat *feat = &Curl_trc_feat_threads;
|
|
if(Curl_trc_ft_is_verbose(data, feat)) {
|
|
struct Curl_llist_node *e;
|
|
struct thrdq_item *qitem;
|
|
|
|
Curl_thrdpool_trace(tqueue->tpool, data);
|
|
Curl_mutex_acquire(&tqueue->lock);
|
|
if(!Curl_llist_count(&tqueue->sendq) &&
|
|
!Curl_llist_count(&tqueue->recvq)) {
|
|
Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] empty", tqueue->name);
|
|
}
|
|
for(e = Curl_llist_head(&tqueue->sendq); e; e = Curl_node_next(e)) {
|
|
qitem = Curl_node_elem(e);
|
|
Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] in: %s",
|
|
tqueue->name, qitem->description);
|
|
}
|
|
for(e = Curl_llist_head(&tqueue->recvq); e; e = Curl_node_next(e)) {
|
|
qitem = Curl_node_elem(e);
|
|
Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] out: %s",
|
|
tqueue->name, qitem->description);
|
|
}
|
|
Curl_mutex_release(&tqueue->lock);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
#endif /* USE_THREADS */
|