/*************************************************************************** * _ _ ____ _ * Project ___| | | | _ \| | * / __| | | | |_) | | * | (__| |_| | _ <| |___ * \___|\___/|_| \_\_____| * * Copyright (C) Daniel Stenberg, , et al. * * This software is licensed as described in the file COPYING, which * you should have received as part of this distribution. The terms * are also available at https://curl.se/docs/copyright.html. * * You may opt to use, copy, modify, merge, publish, distribute and/or sell * copies of the Software, and permit persons to whom the Software is * furnished to do so, under the terms of the COPYING file. * * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY * KIND, either express or implied. * * SPDX-License-Identifier: curl * ***************************************************************************/ #include "curl_setup.h" #ifdef USE_THREADS #include "llist.h" #include "curl_threads.h" #include "thrdpool.h" #include "thrdqueue.h" #include "curlx/timeval.h" #ifdef CURLVERBOSE #include "curl_trc.h" #include "urldata.h" #endif struct curl_thrdq { char *name; curl_mutex_t lock; curl_cond_t await; struct Curl_llist sendq; struct Curl_llist recvq; struct curl_thrdpool *tpool; Curl_thrdq_item_free_cb *fn_free; Curl_thrdq_item_process_cb *fn_process; Curl_thrdq_ev_cb *fn_event; void *fn_user_data; uint32_t send_max_len; BIT(aborted); }; struct thrdq_item { struct Curl_llist_node node; Curl_thrdq_item_free_cb *fn_free; Curl_thrdq_item_process_cb *fn_process; void *item; struct curltime start; timediff_t timeout_ms; const char *description; }; static struct thrdq_item *thrdq_item_create(struct curl_thrdq *tqueue, void *item, const char *description, timediff_t timeout_ms) { struct thrdq_item *qitem; qitem = curlx_calloc(1, sizeof(*qitem)); if(!qitem) return NULL; qitem->item = item; qitem->description = description; qitem->fn_free = tqueue->fn_free; qitem->fn_process = tqueue->fn_process; if(timeout_ms) { qitem->start = curlx_now(); qitem->timeout_ms = timeout_ms; } return qitem; } static void thrdq_item_destroy(struct thrdq_item *qitem) { if(qitem->item) qitem->fn_free(qitem->item); curlx_free(qitem); } static void thrdq_item_list_dtor(void *user_data, void *elem) { (void)user_data; thrdq_item_destroy(elem); } static void *thrdq_tpool_take(void *user_data, const char **pdescription, timediff_t *ptimeout_ms) { struct curl_thrdq *tqueue = user_data; struct thrdq_item *qitem = NULL; struct Curl_llist_node *e; Curl_thrdq_ev_cb *fn_event = NULL; void *fn_user_data = NULL; Curl_mutex_acquire(&tqueue->lock); *pdescription = NULL; *ptimeout_ms = 0; if(!tqueue->aborted) { e = Curl_llist_head(&tqueue->sendq); if(e) { struct curltime now = curlx_now(); timediff_t timeout_ms; while(e) { qitem = Curl_node_take_elem(e); timeout_ms = (!qitem->timeout_ms) ? 0 : (qitem->timeout_ms - curlx_ptimediff_ms(&now, &qitem->start)); if(timeout_ms < 0) { /* timed out while queued, place on receive queue */ Curl_llist_append(&tqueue->recvq, qitem, &qitem->node); fn_event = tqueue->fn_event; fn_user_data = tqueue->fn_user_data; qitem = NULL; e = Curl_llist_head(&tqueue->sendq); continue; } else { *pdescription = qitem->description; *ptimeout_ms = timeout_ms; break; } } } } Curl_mutex_release(&tqueue->lock); /* avoiding deadlocks */ if(fn_event) fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data); return qitem; } static void thrdq_tpool_return(void *item, void *user_data) { struct curl_thrdq *tqueue = user_data; struct thrdq_item *qitem = item; Curl_thrdq_ev_cb *fn_event = NULL; void *fn_user_data = NULL; if(!tqueue) { thrdq_item_destroy(item); return; } Curl_mutex_acquire(&tqueue->lock); if(tqueue->aborted) { thrdq_item_destroy(qitem); } else { DEBUGASSERT(!Curl_node_llist(&qitem->node)); Curl_llist_append(&tqueue->recvq, qitem, &qitem->node); fn_event = tqueue->fn_event; fn_user_data = tqueue->fn_user_data; } Curl_mutex_release(&tqueue->lock); /* avoiding deadlocks */ if(fn_event) fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data); } static void thrdq_tpool_process(void *item) { struct thrdq_item *qitem = item; qitem->fn_process(qitem->item); } static void thrdq_unlink(struct curl_thrdq *tqueue, bool locked, bool join) { DEBUGASSERT(tqueue->aborted); if(tqueue->tpool) { if(locked) Curl_mutex_release(&tqueue->lock); Curl_thrdpool_destroy(tqueue->tpool, join); tqueue->tpool = NULL; if(locked) Curl_mutex_acquire(&tqueue->lock); } Curl_llist_destroy(&tqueue->sendq, NULL); Curl_llist_destroy(&tqueue->recvq, NULL); curlx_free(tqueue->name); Curl_cond_destroy(&tqueue->await); if(locked) Curl_mutex_release(&tqueue->lock); Curl_mutex_destroy(&tqueue->lock); curlx_free(tqueue); } CURLcode Curl_thrdq_create(struct curl_thrdq **ptqueue, const char *name, uint32_t max_len, uint32_t min_threads, uint32_t max_threads, uint32_t idle_time_ms, Curl_thrdq_item_free_cb *fn_free, Curl_thrdq_item_process_cb *fn_process, Curl_thrdq_ev_cb *fn_event, void *user_data) { struct curl_thrdq *tqueue; CURLcode result = CURLE_OUT_OF_MEMORY; tqueue = curlx_calloc(1, sizeof(*tqueue)); if(!tqueue) goto out; Curl_mutex_init(&tqueue->lock); Curl_cond_init(&tqueue->await); Curl_llist_init(&tqueue->sendq, thrdq_item_list_dtor); Curl_llist_init(&tqueue->recvq, thrdq_item_list_dtor); tqueue->fn_free = fn_free; tqueue->fn_process = fn_process; tqueue->fn_event = fn_event; tqueue->fn_user_data = user_data; tqueue->send_max_len = max_len; tqueue->name = curlx_strdup(name); if(!tqueue->name) goto out; result = Curl_thrdpool_create(&tqueue->tpool, name, min_threads, max_threads, idle_time_ms, thrdq_tpool_take, thrdq_tpool_process, thrdq_tpool_return, tqueue); out: if(result && tqueue) { tqueue->aborted = TRUE; thrdq_unlink(tqueue, FALSE, TRUE); tqueue = NULL; } *ptqueue = tqueue; return result; } void Curl_thrdq_destroy(struct curl_thrdq *tqueue, bool join) { Curl_mutex_acquire(&tqueue->lock); DEBUGASSERT(!tqueue->aborted); tqueue->aborted = TRUE; thrdq_unlink(tqueue, TRUE, join); } CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item, const char *description, timediff_t timeout_ms) { CURLcode result = CURLE_AGAIN; size_t signals = 0; Curl_mutex_acquire(&tqueue->lock); if(tqueue->aborted) { DEBUGASSERT(0); result = CURLE_SEND_ERROR; goto out; } if(timeout_ms < 0) { result = CURLE_OPERATION_TIMEDOUT; goto out; } if(!tqueue->send_max_len || (Curl_llist_count(&tqueue->sendq) < tqueue->send_max_len)) { struct thrdq_item *qitem = thrdq_item_create(tqueue, item, description, timeout_ms); if(!qitem) { result = CURLE_OUT_OF_MEMORY; goto out; } item = NULL; Curl_llist_append(&tqueue->sendq, qitem, &qitem->node); signals = Curl_llist_count(&tqueue->sendq); result = CURLE_OK; } out: Curl_mutex_release(&tqueue->lock); /* Signal thread pool unlocked to avoid deadlocks. Since we added * item to the queue already, it might have been taken for processing * already. Any error in signalling the pool cannot be reported to * the caller since it needs to give up ownership of item. */ if(!result && signals) (void)Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals); return result; } CURLcode Curl_thrdq_recv(struct curl_thrdq *tqueue, void **pitem) { CURLcode result = CURLE_AGAIN; struct Curl_llist_node *e; *pitem = NULL; Curl_mutex_acquire(&tqueue->lock); if(tqueue->aborted) { DEBUGASSERT(0); result = CURLE_RECV_ERROR; goto out; } e = Curl_llist_head(&tqueue->recvq); if(e) { struct thrdq_item *qitem = Curl_node_take_elem(e); *pitem = qitem->item; qitem->item = NULL; thrdq_item_destroy(qitem); result = CURLE_OK; } out: Curl_mutex_release(&tqueue->lock); return result; } static void thrdq_llist_clean_matches(struct Curl_llist *llist, Curl_thrdq_item_match_cb *fn_match, void *match_data) { struct Curl_llist_node *e, *n; struct thrdq_item *qitem; for(e = Curl_llist_head(llist); e; e = n) { n = Curl_node_next(e); qitem = Curl_node_elem(e); if(fn_match(qitem->item, match_data)) Curl_node_remove(e); } } void Curl_thrdq_clear(struct curl_thrdq *tqueue, Curl_thrdq_item_match_cb *fn_match, void *match_data) { Curl_mutex_acquire(&tqueue->lock); if(tqueue->aborted) { DEBUGASSERT(0); goto out; } thrdq_llist_clean_matches(&tqueue->sendq, fn_match, match_data); thrdq_llist_clean_matches(&tqueue->recvq, fn_match, match_data); out: Curl_mutex_release(&tqueue->lock); } CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue, uint32_t timeout_ms) { return Curl_thrdpool_await_idle(tqueue->tpool, timeout_ms); } CURLcode Curl_thrdq_set_props(struct curl_thrdq *tqueue, uint32_t max_len, uint32_t min_threads, uint32_t max_threads, uint32_t idle_time_ms) { CURLcode result; size_t signals; Curl_mutex_acquire(&tqueue->lock); tqueue->send_max_len = max_len; signals = Curl_llist_count(&tqueue->sendq); Curl_mutex_release(&tqueue->lock); result = Curl_thrdpool_set_props(tqueue->tpool, min_threads, max_threads, idle_time_ms); if(!result && signals) result = Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals); return result; } #ifdef CURLVERBOSE void Curl_thrdq_trace(struct curl_thrdq *tqueue, struct Curl_easy *data) { struct curl_trc_feat *feat = &Curl_trc_feat_threads; if(Curl_trc_ft_is_verbose(data, feat)) { struct Curl_llist_node *e; struct thrdq_item *qitem; Curl_thrdpool_trace(tqueue->tpool, data); Curl_mutex_acquire(&tqueue->lock); if(!Curl_llist_count(&tqueue->sendq) && !Curl_llist_count(&tqueue->recvq)) { Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] empty", tqueue->name); } for(e = Curl_llist_head(&tqueue->sendq); e; e = Curl_node_next(e)) { qitem = Curl_node_elem(e); Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] in: %s", tqueue->name, qitem->description); } for(e = Curl_llist_head(&tqueue->recvq); e; e = Curl_node_next(e)) { qitem = Curl_node_elem(e); Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] out: %s", tqueue->name, qitem->description); } Curl_mutex_release(&tqueue->lock); } } #endif #endif /* USE_THREADS */