lib: add thread pool and queue

- new source files for thread pool and queue
- test cases 3217 and 3218 for them
- internal documentation

Closes #20916
This commit is contained in:
Stefan Eissing 2026-03-13 10:22:07 +01:00 committed by Daniel Stenberg
parent 664db28d29
commit 6f9f4b3cb7
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
18 changed files with 1641 additions and 1 deletions

View File

@ -68,6 +68,7 @@ INTERNALDOCS = \
internals/SCORECARD.md \ internals/SCORECARD.md \
internals/SPLAY.md \ internals/SPLAY.md \
internals/STRPARSE.md \ internals/STRPARSE.md \
internals/THRDPOOL+QUEUE.md \
internals/TIME-KEEPING.md \ internals/TIME-KEEPING.md \
internals/TLS-SESSIONS.md \ internals/TLS-SESSIONS.md \
internals/UINT_SETS.md \ internals/UINT_SETS.md \

View File

@ -0,0 +1,112 @@
<!--
Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
SPDX-License-Identifier: curl
-->
# Thread Pool and Queue
The thread pool and queue manage asynchronous processing of "work items"
to a user. The "work items" are opaque to the pool and queue, represented
by a `void *`, handled via callback functions.
Thread pool and queue are available with `pthreads` or native Win32
builds.
## `Curl_thrdpool`
This data structure manages a pool of threads for asynchronous operations.
### Properties
A pool's properties are:
- minimum number of threads running, default 0
- maximum number of threads running
- timeout for idle threads before they shut down
The minimum number of threads is started at creation of the pool and
kept always running. On demand, when more work is available but all
existing threads are busy, it starts new threads, up to maximum.
When work ceases, the threads "above" the minimum number exit again
after the given idle time.
### Operation
The pool is created with providing three callback functions:
- `take`: the pool calls this to take a new "work item" for processing. From
the pool's point of view, a work item is a `void *`. "take" is called from
the pool's threads. When getting anything besides `NULL`, the thread is
"busy". On getting `NULL`, the thread becomes idle.
- `process`: called by a pool thread to process a work item. This can not
return any error. Any error handling must be done via properties in
the work item itself, opaque to the pool.
- `return`: after processing, the work item is returned and the pool has
no longer have any memory of it.
The pool only tries to "take" new work items when told to. Calling
`Curl_thrdpool_signal(pool, n)` awakens up to `n`threads which then
take new work items. This may cause new threads being started. The other
time a pool thread "take"s work it when it has finished
processing and returned another item.
A thread pool can be destroyed via `Curl_thrdpool_destroy(pool, join)` where
`join` determines if active threads shall be joined or detached.
### Safety
The thread pool operates use a mutex and condition variables to manage
concurrency. All interactions and callback invocation are done under
the pool's mutex lock, *except* the "process" callback which is invoked
unlocked.
To avoid deadlocks, no callback must invoked other pool functions. Also,
any call of pool functions may result in callback invocations.
The "work items", once "taken" by the pool, should not be referenced
from any other place. Thread pools **always** invoke the "return"
callback on a work item, even after the pool has been destroyed by
detaching the threads.
There is a `user_data` in the pool's creation that is passed to "take"
and "return" callbacks. Once a pool is destroyed, this `user_data` is
cleared and "return" callbacks always see a `NULL`. This way,
the "return" callback may act on that fact.
## `Curl_thrdq`
A `thrdq` is a two-way queue with a thread pool. Users of a thread queue may
"send" work items into the queue and "receive" processed items back.
### Properties
A queue's properties are:
- The properties of the thread pool to create
- the maximum length of the "send" queue, 0 for unlimited
### Operation
The queue is created with providing three callback functions:
- `free`: called to free a work item that is in the queue but is
no longer returned (or processed). This happens when the queue is
destroyed or when work items are removed for other reasons.
- `process`: process the item. Can not fail.
- `event`: called when work items have been added to the "receive" list.
Users of a thread queue call `Curl_thrdq_send()` to add a work item to
the queue. Calling `Curl_thrdq_recv()` delivers processed items back.
### Safety
The thread queue operates use a mutex and condition variables to manage
concurrency. All interactions and callback invocation are done under
the queue's mutex lock, *except* the "process" callback which is invoked
unlocked.
Users of a thread queue should not hold any reference to work items sent
into the queue. The provided "free" callback has to take care of any
resources allocated by work items.

View File

@ -259,6 +259,8 @@ LIB_CFILES = \
system_win32.c \ system_win32.c \
telnet.c \ telnet.c \
tftp.c \ tftp.c \
thrdpool.c \
thrdqueue.c \
transfer.c \ transfer.c \
uint-bset.c \ uint-bset.c \
uint-hash.c \ uint-hash.c \
@ -388,6 +390,8 @@ LIB_HFILES = \
system_win32.h \ system_win32.h \
telnet.h \ telnet.h \
tftp.h \ tftp.h \
thrdpool.h \
thrdqueue.h \
transfer.h \ transfer.h \
uint-bset.h \ uint-bset.h \
uint-hash.h \ uint-hash.h \

View File

@ -23,6 +23,7 @@
***************************************************************************/ ***************************************************************************/
#include "curl_setup.h" #include "curl_setup.h"
#include "curl_threads.h" #include "curl_threads.h"
#include "curlx/timeval.h"
#ifdef USE_THREADS #ifdef USE_THREADS
@ -132,3 +133,82 @@ int Curl_thread_join(curl_thread_t *hnd)
#error neither HAVE_THREADS_POSIX nor _WIN32 defined #error neither HAVE_THREADS_POSIX nor _WIN32 defined
#endif #endif
#endif /* USE_THREADS */ #endif /* USE_THREADS */
#ifdef USE_MUTEX
#ifdef HAVE_THREADS_POSIX
void Curl_cond_signal(pthread_cond_t *c)
{
/* return code defined as always 0 */
(void)pthread_cond_signal(c);
}
void Curl_cond_wait(pthread_cond_t *c, pthread_mutex_t *m)
{
/* return code defined as always 0 */
(void)pthread_cond_wait(c, m);
}
CURLcode Curl_cond_timedwait(pthread_cond_t *c, pthread_mutex_t *m,
uint32_t timeout_ms)
{
struct curltime now;
struct timespec ts;
timediff_t usec;
int rc;
/* POSIX expects an "absolute" time until the condition wait ends.
* We cannot use `curlx_now()` here that may run on some monotonic clock
* that will be most likely in the past, as far as POSIX abstime is
* concerned. */
#ifdef HAVE_GETTIMEOFDAY
struct timeval tv;
(void)gettimeofday(&tv, NULL);
now.tv_sec = tv.tv_sec;
now.tv_usec = (int)tv.tv_usec;
#else
now.tv_sec = time(NULL);
now.tv_usec = 0;
#endif
ts.tv_sec = now.tv_sec + (timeout_ms / 1000);
usec = now.tv_usec + ((timeout_ms % 1000) * 1000);
if(usec >= 1000000) {
++ts.tv_sec;
usec %= 1000000;
}
ts.tv_nsec = (long)usec * 1000;
rc = pthread_cond_timedwait(c, m, &ts);
if(rc == SOCKETIMEDOUT)
return CURLE_OPERATION_TIMEDOUT;
return rc ? CURLE_UNRECOVERABLE_POLL : CURLE_OK;
}
#elif defined(_WIN32)
void Curl_cond_signal(CONDITION_VARIABLE *c)
{
WakeConditionVariable(c);
}
void Curl_cond_wait(CONDITION_VARIABLE *c, CRITICAL_SECTION *m)
{
SleepConditionVariableCS(c, m, INFINITE);
}
CURLcode Curl_cond_timedwait(CONDITION_VARIABLE *c, CRITICAL_SECTION *m,
uint32_t timeout_ms)
{
if(!SleepConditionVariableCS(c, m, (DWORD)timeout_ms)) {
DWORD err = GetLastError();
return (err == ERROR_TIMEOUT) ?
CURLE_OPERATION_TIMEDOUT : CURLE_UNRECOVERABLE_POLL;
}
return CURLE_OK;
}
#else
#error neither HAVE_THREADS_POSIX nor _WIN32 defined
#endif
#endif /* USE_MUTEX */

View File

@ -39,6 +39,9 @@
# define Curl_mutex_acquire(m) pthread_mutex_lock(m) # define Curl_mutex_acquire(m) pthread_mutex_lock(m)
# define Curl_mutex_release(m) pthread_mutex_unlock(m) # define Curl_mutex_release(m) pthread_mutex_unlock(m)
# define Curl_mutex_destroy(m) pthread_mutex_destroy(m) # define Curl_mutex_destroy(m) pthread_mutex_destroy(m)
# define curl_cond_t pthread_cond_t
# define Curl_cond_init(c) pthread_cond_init(c, NULL)
# define Curl_cond_destroy(c) pthread_cond_destroy(c)
#elif defined(_WIN32) #elif defined(_WIN32)
# define CURL_THREAD_RETURN_T DWORD # define CURL_THREAD_RETURN_T DWORD
# define CURL_STDCALL WINAPI # define CURL_STDCALL WINAPI
@ -49,9 +52,18 @@
# define Curl_mutex_acquire(m) EnterCriticalSection(m) # define Curl_mutex_acquire(m) EnterCriticalSection(m)
# define Curl_mutex_release(m) LeaveCriticalSection(m) # define Curl_mutex_release(m) LeaveCriticalSection(m)
# define Curl_mutex_destroy(m) DeleteCriticalSection(m) # define Curl_mutex_destroy(m) DeleteCriticalSection(m)
# define curl_cond_t CONDITION_VARIABLE
# define Curl_cond_init(c) InitializeConditionVariable(c)
# define Curl_cond_destroy(c) (void)(c)
#else #else
#error neither HAVE_THREADS_POSIX nor _WIN32 defined #error neither HAVE_THREADS_POSIX nor _WIN32 defined
#endif #endif
void Curl_cond_signal(curl_cond_t *c);
void Curl_cond_wait(curl_cond_t *c, curl_mutex_t *m);
/* Returns CURLE_OPERATION_TIMEDOUT on timeout */
CURLcode Curl_cond_timedwait(curl_cond_t *c, curl_mutex_t *m,
uint32_t timeout_ms);
#endif /* USE_MUTEX */ #endif /* USE_MUTEX */
#ifdef USE_THREADS #ifdef USE_THREADS

View File

@ -278,6 +278,19 @@ void Curl_trc_cf_infof(struct Curl_easy *data, const struct Curl_cfilter *cf,
} }
} }
void Curl_trc_feat_infof(struct Curl_easy *data,
struct curl_trc_feat *feat,
const char *fmt, ...)
{
DEBUGASSERT(feat);
if(Curl_trc_ft_is_verbose(data, feat)) {
va_list ap;
va_start(ap, fmt);
trc_infof(data, feat, NULL, 0, fmt, ap);
va_end(ap);
}
}
static const char * const Curl_trc_timer_names[] = { static const char * const Curl_trc_timer_names[] = {
"100_TIMEOUT", "100_TIMEOUT",
"ASYNC_NAME", "ASYNC_NAME",

View File

@ -101,6 +101,10 @@ struct curl_trc_feat {
int log_level; int log_level;
}; };
void Curl_trc_feat_infof(struct Curl_easy *data,
struct curl_trc_feat *feat,
const char *fmt, ...) CURL_PRINTF(3, 4);
#ifndef CURL_DISABLE_FTP #ifndef CURL_DISABLE_FTP
extern struct curl_trc_feat Curl_trc_feat_ftp; extern struct curl_trc_feat Curl_trc_feat_ftp;
void Curl_trc_ftp(struct Curl_easy *data, void Curl_trc_ftp(struct Curl_easy *data,

View File

@ -99,6 +99,7 @@ static void curl_dbg_log_locked(const char *format, ...) CURL_PRINTF(1, 2);
_exit() comes after the atexit handlers are called. curl/curl#6620 */ _exit() comes after the atexit handlers are called. curl/curl#6620 */
static void curl_dbg_cleanup(void) static void curl_dbg_cleanup(void)
{ {
bool locked = curl_dbg_lock();
if(curl_dbg_logfile && if(curl_dbg_logfile &&
curl_dbg_logfile != stderr && curl_dbg_logfile != stderr &&
curl_dbg_logfile != stdout) { curl_dbg_logfile != stdout) {
@ -108,6 +109,7 @@ static void curl_dbg_cleanup(void)
fclose(curl_dbg_logfile); fclose(curl_dbg_logfile);
} }
curl_dbg_logfile = NULL; curl_dbg_logfile = NULL;
curl_dbg_unlock(locked);
#ifdef USE_MUTEX #ifdef USE_MUTEX
if(dbg_mutex_init) { if(dbg_mutex_init) {
Curl_mutex_destroy(&dbg_mutex); Curl_mutex_destroy(&dbg_mutex);

457
lib/thrdpool.c Normal file
View File

@ -0,0 +1,457 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#ifdef USE_THREADS
#if defined(USE_THREADS_POSIX) && defined(HAVE_PTHREAD_H)
#include <pthread.h>
#endif
#include "llist.h"
#include "curl_threads.h"
#include "curlx/timeval.h"
#include "thrdpool.h"
#ifdef CURLVERBOSE
#include "curl_trc.h"
#include "urldata.h"
#endif
struct thrdslot {
struct Curl_llist_node node;
struct curl_thrdpool *tpool;
curl_thread_t thread;
curl_cond_t await;
struct curltime starttime;
const char *work_description;
timediff_t work_timeout_ms;
uint32_t id;
BIT(running);
BIT(idle);
};
struct curl_thrdpool {
char *name;
uint64_t refcount;
curl_mutex_t lock;
curl_cond_t await;
struct Curl_llist slots;
struct Curl_llist zombies;
Curl_thrdpool_take_item_cb *fn_take;
Curl_thrdpool_process_item_cb *fn_process;
Curl_thrdpool_return_item_cb *fn_return;
void *fn_user_data;
CURLcode fatal_err;
uint32_t min_threads;
uint32_t max_threads;
uint32_t idle_time_ms;
uint32_t next_id;
BIT(aborted);
BIT(detached);
};
static void thrdpool_join_zombies(struct curl_thrdpool *tpool);
static bool thrdpool_unlink(struct curl_thrdpool *tpool, bool locked);
static void thrdslot_destroy(struct thrdslot *tslot)
{
DEBUGASSERT(tslot->thread == curl_thread_t_null);
DEBUGASSERT(!tslot->running);
Curl_cond_destroy(&tslot->await);
curlx_free(tslot);
}
static void thrdslot_done(struct thrdslot *tslot)
{
struct curl_thrdpool *tpool = tslot->tpool;
DEBUGASSERT(Curl_node_llist(&tslot->node) == &tpool->slots);
Curl_node_remove(&tslot->node);
tslot->running = FALSE;
Curl_llist_append(&tpool->zombies, tslot, &tslot->node);
Curl_cond_signal(&tpool->await);
}
static CURL_THREAD_RETURN_T CURL_STDCALL thrdslot_run(void *arg)
{
struct thrdslot *tslot = arg;
struct curl_thrdpool *tpool = tslot->tpool;
void *item;
Curl_mutex_acquire(&tpool->lock);
DEBUGASSERT(Curl_node_llist(&tslot->node) == &tpool->slots);
for(;;) {
while(!tpool->aborted) {
tslot->work_description = NULL;
tslot->work_timeout_ms = 0;
item = tpool->fn_take(tpool->fn_user_data, &tslot->work_description,
&tslot->work_timeout_ms);
if(!item)
break;
tslot->starttime = curlx_now();
tslot->idle = FALSE;
Curl_mutex_release(&tpool->lock);
tpool->fn_process(item);
Curl_mutex_acquire(&tpool->lock);
tslot->work_description = NULL;
tpool->fn_return(item, tpool->aborted ? NULL : tpool->fn_user_data);
}
if(tpool->aborted)
goto out;
tslot->idle = TRUE;
tslot->starttime = curlx_now();
thrdpool_join_zombies(tpool);
Curl_cond_signal(&tpool->await);
/* Only wait with idle timeout when we are above the minimum
* number of threads. Otherwise short idle timeouts will keep
* on activating threads that have no means to shut down. */
if((tpool->idle_time_ms > 0) &&
(Curl_llist_count(&tpool->slots) > tpool->min_threads)) {
CURLcode r = Curl_cond_timedwait(&tslot->await, &tpool->lock,
tpool->idle_time_ms);
if((r == CURLE_OPERATION_TIMEDOUT) &&
(Curl_llist_count(&tpool->slots) > tpool->min_threads)) {
goto out;
}
}
else {
Curl_cond_wait(&tslot->await, &tpool->lock);
}
}
out:
thrdslot_done(tslot);
if(!thrdpool_unlink(tslot->tpool, TRUE)) {
/* tpool not destroyed */
Curl_mutex_release(&tpool->lock);
}
return 0;
}
static CURLcode thrdslot_start(struct curl_thrdpool *tpool)
{
struct thrdslot *tslot;
CURLcode result = CURLE_OUT_OF_MEMORY;
tslot = curlx_calloc(1, sizeof(*tslot));
if(!tslot)
goto out;
tslot->id = tpool->next_id++;
tslot->tpool = tpool;
tslot->thread = curl_thread_t_null;
Curl_cond_init(&tslot->await);
tpool->refcount++;
tslot->running = TRUE;
tslot->thread = Curl_thread_create(thrdslot_run, tslot);
if(tslot->thread == curl_thread_t_null) { /* never started */
tslot->running = FALSE;
thrdpool_unlink(tpool, TRUE);
result = CURLE_FAILED_INIT;
goto out;
}
Curl_llist_append(&tpool->slots, tslot, &tslot->node);
tslot = NULL;
result = CURLE_OK;
out:
if(tslot)
thrdslot_destroy(tslot);
return result;
}
static void thrdpool_wake_all(struct curl_thrdpool *tpool)
{
struct Curl_llist_node *e;
for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) {
struct thrdslot *tslot = Curl_node_elem(e);
Curl_cond_signal(&tslot->await);
}
}
static void thrdpool_join_zombies(struct curl_thrdpool *tpool)
{
struct Curl_llist_node *e;
for(e = Curl_llist_head(&tpool->zombies); e;
e = Curl_llist_head(&tpool->zombies)) {
struct thrdslot *tslot = Curl_node_elem(e);
Curl_node_remove(&tslot->node);
if(tslot->thread != curl_thread_t_null) {
Curl_mutex_release(&tpool->lock);
Curl_thread_join(&tslot->thread);
Curl_mutex_acquire(&tpool->lock);
tslot->thread = curl_thread_t_null;
}
thrdslot_destroy(tslot);
}
}
static bool thrdpool_unlink(struct curl_thrdpool *tpool, bool locked)
{
DEBUGASSERT(tpool->refcount);
if(tpool->refcount)
tpool->refcount--;
if(tpool->refcount)
return FALSE;
/* no more references, free */
DEBUGASSERT(tpool->aborted);
thrdpool_join_zombies(tpool);
if(locked)
Curl_mutex_release(&tpool->lock);
curlx_free(tpool->name);
Curl_cond_destroy(&tpool->await);
Curl_mutex_destroy(&tpool->lock);
curlx_free(tpool);
return TRUE;
}
CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool,
const char *name,
uint32_t min_threads,
uint32_t max_threads,
uint32_t idle_time_ms,
Curl_thrdpool_take_item_cb *fn_take,
Curl_thrdpool_process_item_cb *fn_process,
Curl_thrdpool_return_item_cb *fn_return,
void *user_data)
{
struct curl_thrdpool *tpool;
CURLcode result = CURLE_OUT_OF_MEMORY;
tpool = curlx_calloc(1, sizeof(*tpool));
if(!tpool)
goto out;
tpool->refcount = 1;
Curl_mutex_init(&tpool->lock);
Curl_cond_init(&tpool->await);
Curl_llist_init(&tpool->slots, NULL);
Curl_llist_init(&tpool->zombies, NULL);
tpool->min_threads = min_threads;
tpool->max_threads = max_threads;
tpool->idle_time_ms = idle_time_ms;
tpool->fn_take = fn_take;
tpool->fn_process = fn_process;
tpool->fn_return = fn_return;
tpool->fn_user_data = user_data;
tpool->name = curlx_strdup(name);
if(!tpool->name)
goto out;
if(tpool->min_threads)
result = Curl_thrdpool_signal(tpool, tpool->min_threads);
else
result = CURLE_OK;
out:
if(result && tpool) {
tpool->aborted = TRUE;
thrdpool_unlink(tpool, FALSE);
tpool = NULL;
}
*ptpool = tpool;
return result;
}
void Curl_thrdpool_destroy(struct curl_thrdpool *tpool, bool join)
{
Curl_mutex_acquire(&tpool->lock);
tpool->aborted = TRUE;
while(join && Curl_llist_count(&tpool->slots)) {
thrdpool_wake_all(tpool);
Curl_cond_wait(&tpool->await, &tpool->lock);
}
thrdpool_join_zombies(tpool);
/* detach all still running threads */
if(Curl_llist_count(&tpool->slots)) {
struct Curl_llist_node *e;
for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) {
struct thrdslot *tslot = Curl_node_elem(e);
if(tslot->thread != curl_thread_t_null)
Curl_thread_destroy(&tslot->thread);
}
tpool->detached = TRUE;
}
if(!thrdpool_unlink(tpool, TRUE)) {
/* tpool not destroyed */
Curl_mutex_release(&tpool->lock);
}
}
CURLcode Curl_thrdpool_signal(struct curl_thrdpool *tpool, uint32_t nthreads)
{
struct Curl_llist_node *e, *n;
CURLcode result = CURLE_OK;
Curl_mutex_acquire(&tpool->lock);
DEBUGASSERT(!tpool->aborted);
thrdpool_join_zombies(tpool);
for(e = Curl_llist_head(&tpool->slots); e && nthreads; e = n) {
struct thrdslot *tslot = Curl_node_elem(e);
n = Curl_node_next(e);
if(tslot->idle) {
Curl_cond_signal(&tslot->await);
--nthreads;
}
else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) {
/* starting thread, queries for work soon. */
--nthreads;
}
}
while(nthreads && !result &&
Curl_llist_count(&tpool->slots) < tpool->max_threads) {
result = thrdslot_start(tpool);
if(result)
break;
--nthreads;
}
Curl_mutex_release(&tpool->lock);
return result;
}
static bool thrdpool_all_idle(struct curl_thrdpool *tpool)
{
struct Curl_llist_node *e;
for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) {
struct thrdslot *tslot = Curl_node_elem(e);
if(!tslot->idle)
return FALSE;
}
return TRUE;
}
CURLcode Curl_thrdpool_await_idle(struct curl_thrdpool *tpool,
uint32_t timeout_ms)
{
CURLcode result = CURLE_OK;
struct curltime end = { 0 };
Curl_mutex_acquire(&tpool->lock);
DEBUGASSERT(!tpool->aborted);
if(tpool->aborted) {
result = CURLE_FAILED_INIT;
goto out;
}
while(!thrdpool_all_idle(tpool)) {
if(timeout_ms) {
timediff_t remain_ms;
CURLcode r;
if(!end.tv_sec && !end.tv_usec) {
end = curlx_now();
end.tv_sec += (time_t)(timeout_ms / 1000);
end.tv_usec += (int)(timeout_ms % 1000) * 1000;
if(end.tv_usec >= 1000000) {
end.tv_sec++;
end.tv_usec -= 1000000;
}
}
remain_ms = curlx_timediff_ms(curlx_now(), end);
if(remain_ms <= 0)
r = CURLE_OPERATION_TIMEDOUT;
else
r = Curl_cond_timedwait(&tpool->await, &tpool->lock,
(uint32_t)remain_ms);
if(r == CURLE_OPERATION_TIMEDOUT) {
result = r;
break;
}
}
else {
Curl_cond_wait(&tpool->await, &tpool->lock);
}
}
out:
thrdpool_join_zombies(tpool);
Curl_mutex_release(&tpool->lock);
return result;
}
#ifdef CURLVERBOSE
void Curl_thrdpool_trace(struct curl_thrdpool *tpool,
struct Curl_easy *data,
struct curl_trc_feat *feat)
{
if(Curl_trc_ft_is_verbose(data, feat)) {
struct Curl_llist_node *e;
struct curltime now = curlx_now();
Curl_mutex_acquire(&tpool->lock);
if(!Curl_llist_count(&tpool->slots)) {
Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] no threads running",
tpool->name);
}
for(e = Curl_llist_head(&tpool->slots); e; e = Curl_node_next(e)) {
struct thrdslot *tslot = Curl_node_elem(e);
timediff_t elapsed_ms = curlx_ptimediff_ms(&now, &tslot->starttime);
if(!tslot->running) {
Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: not running",
tpool->name, tslot->id);
}
else if(!tslot->starttime.tv_sec && !tslot->starttime.tv_usec) {
Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: starting...",
tpool->name, tslot->id);
}
else if(tslot->idle) {
Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: idle for %"
FMT_TIMEDIFF_T "ms",
tpool->name, tslot->id, elapsed_ms);
}
else {
timediff_t remain_ms = tslot->work_timeout_ms ?
(tslot->work_timeout_ms - elapsed_ms) : 0;
Curl_trc_feat_infof(data, feat, "[%s] [TPOOL] [%u]: busy %"
FMT_TIMEDIFF_T "ms, timeout in %" FMT_TIMEDIFF_T
"ms: %s",
tpool->name, tslot->id, elapsed_ms, remain_ms,
tslot->work_description);
}
}
Curl_mutex_release(&tpool->lock);
}
}
#endif
#endif /* USE_THREADS */

102
lib/thrdpool.h Normal file
View File

@ -0,0 +1,102 @@
#ifndef HEADER_CURL_THRDPOOL_H
#define HEADER_CURL_THRDPOOL_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include "curlx/timediff.h"
#ifdef USE_THREADS
struct curl_thrdpool;
struct Curl_easy;
struct curl_trc_feat;
/* Invoked under thread pool lock to get an "item" to work on. Must
* return NULL if there is nothing to do.
* Caller might return a descriptive string about the "item", where
* available. The string needs to have the same lifetime as the
* item itself. */
typedef void *Curl_thrdpool_take_item_cb(void *user_data,
const char **pdescription,
timediff_t *ptimeout_ms);
/* Invoked outside thread pool lock to process the item taken. */
typedef void Curl_thrdpool_process_item_cb(void *item);
/* Invoked under thread pool lock to return a processed item back
* to the producer.
* If the thread pool has been destroyed, `user_data` will be NULL
* and the callback is responsible to release all `item` resources. */
typedef void Curl_thrdpool_return_item_cb(void *item, void *user_data);
/* Create a new thread pool.
* @param name name of pool for tracing purposes
* @param min_threads minimum number of threads to have always running
* @param max_threads maximum number of threads running, ever.
* @param idle_time_ms maximum time a thread should wait for tasks to
* process before shutting down (unless the pool is
* already at minimum thread count), use 0 for
* infinite wait.
* @param fn_take take the next item to process
* @param fn_process process the item taken
* @param fn_return return the processed item
* @param user_data parameter passed to take/return callbacks
*/
CURLcode Curl_thrdpool_create(struct curl_thrdpool **ptpool,
const char *name,
uint32_t min_threads,
uint32_t max_threads,
uint32_t idle_time_ms,
Curl_thrdpool_take_item_cb *fn_take,
Curl_thrdpool_process_item_cb *fn_process,
Curl_thrdpool_return_item_cb *fn_return,
void *user_data);
/* Destroy the thread pool, release its resources.
* With `join` being TRUE, the call will wait for all threads to finish
* processing before returning. On FALSE, it will detach all threads
* running. Ongoing item processing will continue to run and
* `fn_return` will be invoked with NULL user_data before the thread exits.
*/
void Curl_thrdpool_destroy(struct curl_thrdpool *tpool, bool join);
/* Signal the pool to wake up `nthreads` idle worker threads, possible
* creating new threads up to the max limit. The number should reflect
* the items that can actually be taken for processing right away, e.g.
* the producers "queue" length of outstanding items.
*/
CURLcode Curl_thrdpool_signal(struct curl_thrdpool *tpool, uint32_t nthreads);
CURLcode Curl_thrdpool_await_idle(struct curl_thrdpool *tpool,
uint32_t timeout_ms);
#ifdef CURLVERBOSE
void Curl_thrdpool_trace(struct curl_thrdpool *tpool,
struct Curl_easy *data,
struct curl_trc_feat *feat);
#endif
#endif /* USE_THREADS */
#endif /* HEADER_CURL_THRDPOOL_H */

390
lib/thrdqueue.c Normal file
View File

@ -0,0 +1,390 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#ifdef USE_THREADS
#if defined(USE_THREADS_POSIX) && defined(HAVE_PTHREAD_H)
#include <pthread.h>
#endif
#include "llist.h"
#include "curl_threads.h"
#include "thrdpool.h"
#include "thrdqueue.h"
#include "curlx/timeval.h"
#ifdef CURLVERBOSE
#include "curl_trc.h"
#include "urldata.h"
#endif
struct curl_thrdq {
char *name;
curl_mutex_t lock;
curl_cond_t await;
struct Curl_llist sendq;
struct Curl_llist recvq;
struct curl_thrdpool *tpool;
Curl_thrdq_item_free_cb *fn_free;
Curl_thrdq_item_process_cb *fn_process;
Curl_thrdq_ev_cb *fn_event;
void *fn_user_data;
uint32_t send_max_len;
BIT(aborted);
};
struct thrdq_item {
struct Curl_llist_node node;
Curl_thrdq_item_free_cb *fn_free;
Curl_thrdq_item_process_cb *fn_process;
void *item;
struct curltime start;
timediff_t timeout_ms;
const char *description;
};
static struct thrdq_item *thrdq_item_create(struct curl_thrdq *tqueue,
void *item,
const char *description,
timediff_t timeout_ms)
{
struct thrdq_item *qitem;
qitem = curlx_calloc(1, sizeof(*qitem));
if(!qitem)
return NULL;
qitem->item = item;
qitem->description = description;
qitem->fn_free = tqueue->fn_free;
qitem->fn_process = tqueue->fn_process;
if(timeout_ms) {
qitem->start = curlx_now();
qitem->timeout_ms = timeout_ms;
}
return qitem;
}
static void thrdq_item_destroy(struct thrdq_item *qitem)
{
if(qitem->item)
qitem->fn_free(qitem->item);
curlx_free(qitem);
}
static void thrdq_item_list_dtor(void *user_data, void *elem)
{
(void)user_data;
thrdq_item_destroy(elem);
}
static void *thrdq_tpool_take(void *user_data, const char **pdescription,
timediff_t *ptimeout_ms)
{
struct curl_thrdq *tqueue = user_data;
struct thrdq_item *qitem = NULL;
struct Curl_llist_node *e;
Curl_thrdq_ev_cb *fn_event = NULL;
void *fn_user_data = NULL;
Curl_mutex_acquire(&tqueue->lock);
*pdescription = NULL;
*ptimeout_ms = 0;
if(!tqueue->aborted) {
e = Curl_llist_head(&tqueue->sendq);
if(e) {
struct curltime now = curlx_now();
timediff_t timeout_ms;
while(e) {
qitem = Curl_node_take_elem(e);
timeout_ms = (!qitem->timeout_ms) ? 0 :
(qitem->timeout_ms - curlx_ptimediff_ms(&now, &qitem->start));
if(timeout_ms < 0) {
/* timed out while queued, place on receive queue */
Curl_llist_append(&tqueue->recvq, qitem, &qitem->node);
fn_event = tqueue->fn_event;
fn_user_data = tqueue->fn_user_data;
qitem = NULL;
e = Curl_llist_head(&tqueue->sendq);
continue;
}
else {
*pdescription = qitem->description;
*ptimeout_ms = timeout_ms;
break;
}
}
}
}
Curl_mutex_release(&tqueue->lock);
/* avoiding deadlocks */
if(fn_event)
fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data);
return qitem;
}
static void thrdq_tpool_return(void *item, void *user_data)
{
struct curl_thrdq *tqueue = user_data;
struct thrdq_item *qitem = item;
Curl_thrdq_ev_cb *fn_event = NULL;
void *fn_user_data = NULL;
if(!tqueue) {
thrdq_item_destroy(item);
return;
}
Curl_mutex_acquire(&tqueue->lock);
if(tqueue->aborted) {
thrdq_item_destroy(qitem);
}
else {
DEBUGASSERT(!Curl_node_llist(&qitem->node));
Curl_llist_append(&tqueue->recvq, qitem, &qitem->node);
fn_event = tqueue->fn_event;
fn_user_data = tqueue->fn_user_data;
}
Curl_mutex_release(&tqueue->lock);
/* avoiding deadlocks */
if(fn_event)
fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data);
}
static void thrdq_tpool_process(void *item)
{
struct thrdq_item *qitem = item;
qitem->fn_process(qitem->item);
}
static void thrdq_unlink(struct curl_thrdq *tqueue, bool locked, bool join)
{
DEBUGASSERT(tqueue->aborted);
if(tqueue->tpool) {
if(locked)
Curl_mutex_release(&tqueue->lock);
Curl_thrdpool_destroy(tqueue->tpool, join);
tqueue->tpool = NULL;
if(locked)
Curl_mutex_acquire(&tqueue->lock);
}
Curl_llist_destroy(&tqueue->sendq, NULL);
Curl_llist_destroy(&tqueue->recvq, NULL);
curlx_free(tqueue->name);
Curl_cond_destroy(&tqueue->await);
if(locked)
Curl_mutex_release(&tqueue->lock);
Curl_mutex_destroy(&tqueue->lock);
curlx_free(tqueue);
}
CURLcode Curl_thrdq_create(struct curl_thrdq **ptqueue,
const char *name,
uint32_t max_len,
uint32_t min_threads,
uint32_t max_threads,
uint32_t idle_time_ms,
Curl_thrdq_item_free_cb *fn_free,
Curl_thrdq_item_process_cb *fn_process,
Curl_thrdq_ev_cb *fn_event,
void *user_data)
{
struct curl_thrdq *tqueue;
CURLcode result = CURLE_OUT_OF_MEMORY;
tqueue = curlx_calloc(1, sizeof(*tqueue));
if(!tqueue)
goto out;
Curl_mutex_init(&tqueue->lock);
Curl_cond_init(&tqueue->await);
Curl_llist_init(&tqueue->sendq, thrdq_item_list_dtor);
Curl_llist_init(&tqueue->recvq, thrdq_item_list_dtor);
tqueue->fn_free = fn_free;
tqueue->fn_process = fn_process;
tqueue->fn_event = fn_event;
tqueue->fn_user_data = user_data;
tqueue->send_max_len = max_len;
tqueue->name = curlx_strdup(name);
if(!tqueue->name)
goto out;
result = Curl_thrdpool_create(&tqueue->tpool, name,
min_threads, max_threads, idle_time_ms,
thrdq_tpool_take,
thrdq_tpool_process,
thrdq_tpool_return,
tqueue);
out:
if(result && tqueue) {
tqueue->aborted = TRUE;
thrdq_unlink(tqueue, FALSE, TRUE);
tqueue = NULL;
}
*ptqueue = tqueue;
return result;
}
void Curl_thrdq_destroy(struct curl_thrdq *tqueue, bool join)
{
Curl_mutex_acquire(&tqueue->lock);
DEBUGASSERT(!tqueue->aborted);
tqueue->aborted = TRUE;
thrdq_unlink(tqueue, TRUE, join);
}
CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item,
const char *description, timediff_t timeout_ms)
{
CURLcode result = CURLE_AGAIN;
size_t signals = 0;
Curl_mutex_acquire(&tqueue->lock);
if(tqueue->aborted) {
DEBUGASSERT(0);
result = CURLE_SEND_ERROR;
goto out;
}
if(timeout_ms < 0) {
result = CURLE_OPERATION_TIMEDOUT;
goto out;
}
if(!tqueue->send_max_len ||
(Curl_llist_count(&tqueue->sendq) < tqueue->send_max_len)) {
struct thrdq_item *qitem = thrdq_item_create(tqueue, item, description,
timeout_ms);
if(!qitem) {
result = CURLE_OUT_OF_MEMORY;
goto out;
}
Curl_llist_append(&tqueue->sendq, qitem, &qitem->node);
result = CURLE_OK;
signals = Curl_llist_count(&tqueue->sendq);
}
out:
Curl_mutex_release(&tqueue->lock);
/* Signal thread pool unlocked to avoid deadlocks */
if(!result && signals)
result = Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals);
return result;
}
CURLcode Curl_thrdq_recv(struct curl_thrdq *tqueue, void **pitem)
{
CURLcode result = CURLE_AGAIN;
struct Curl_llist_node *e;
*pitem = NULL;
Curl_mutex_acquire(&tqueue->lock);
if(tqueue->aborted) {
DEBUGASSERT(0);
result = CURLE_RECV_ERROR;
goto out;
}
e = Curl_llist_head(&tqueue->recvq);
if(e) {
struct thrdq_item *qitem = Curl_node_take_elem(e);
*pitem = qitem->item;
qitem->item = NULL;
thrdq_item_destroy(qitem);
result = CURLE_OK;
}
out:
Curl_mutex_release(&tqueue->lock);
return result;
}
static void thrdq_llist_clean_matches(struct Curl_llist *llist,
Curl_thrdq_item_match_cb *fn_match,
void *match_data)
{
struct Curl_llist_node *e, *n;
struct thrdq_item *qitem;
for(e = Curl_llist_head(llist); e; e = n) {
n = Curl_node_next(e);
qitem = Curl_node_elem(e);
if(fn_match(qitem->item, match_data))
Curl_node_remove(e);
}
}
void Curl_thrdq_clear(struct curl_thrdq *tqueue,
Curl_thrdq_item_match_cb *fn_match,
void *match_data)
{
Curl_mutex_acquire(&tqueue->lock);
if(tqueue->aborted) {
DEBUGASSERT(0);
goto out;
}
thrdq_llist_clean_matches(&tqueue->sendq, fn_match, match_data);
thrdq_llist_clean_matches(&tqueue->recvq, fn_match, match_data);
out:
Curl_mutex_release(&tqueue->lock);
}
CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue,
uint32_t timeout_ms)
{
return Curl_thrdpool_await_idle(tqueue->tpool, timeout_ms);
}
#ifdef CURLVERBOSE
void Curl_thrdq_trace(struct curl_thrdq *tqueue,
struct Curl_easy *data,
struct curl_trc_feat *feat)
{
if(Curl_trc_ft_is_verbose(data, feat)) {
struct Curl_llist_node *e;
struct thrdq_item *qitem;
Curl_thrdpool_trace(tqueue->tpool, data, feat);
Curl_mutex_acquire(&tqueue->lock);
if(!Curl_llist_count(&tqueue->sendq) &&
!Curl_llist_count(&tqueue->recvq)) {
Curl_trc_feat_infof(data, feat, "[%s] [QUEUE] empty", tqueue->name);
}
for(e = Curl_llist_head(&tqueue->sendq); e; e = Curl_node_next(e)) {
qitem = Curl_node_elem(e);
Curl_trc_feat_infof(data, feat, "[%s] [QUEUE] in: %s",
tqueue->name, qitem->description);
}
for(e = Curl_llist_head(&tqueue->recvq); e; e = Curl_node_next(e)) {
qitem = Curl_node_elem(e);
Curl_trc_feat_infof(data, feat, "[%s] [QUEUE] out: %s",
tqueue->name, qitem->description);
}
Curl_mutex_release(&tqueue->lock);
}
}
#endif
#endif /* USE_THREADS */

114
lib/thrdqueue.h Normal file
View File

@ -0,0 +1,114 @@
#ifndef HEADER_CURL_THRDQUEUE_H
#define HEADER_CURL_THRDQUEUE_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include "curlx/timediff.h"
#ifdef USE_THREADS
struct Curl_easy;
struct curl_trc_feat;
struct curl_thrdq;
typedef enum {
CURL_THRDQ_EV_ITEM_DONE /* an item has been processed and is ready */
} Curl_thrdq_event;
/* Notification callback when "events" happen in the queue. May be
* call from any thread, queue is not locked. */
typedef void Curl_thrdq_ev_cb(const struct curl_thrdq *tqueue,
Curl_thrdq_event ev,
void *user_data);
/* Process a queued item. Maybe call from any thread. Queue is
* not locked. */
typedef void Curl_thrdq_item_process_cb(void *item);
/* Free an item. May be called from any thread at any time for an
* item that is in the queue (either before or after processing). */
typedef void Curl_thrdq_item_free_cb(void *item);
/* Create a new queue processing "items" by a thread pool.
*/
CURLcode Curl_thrdq_create(struct curl_thrdq **ptqueue,
const char *name,
uint32_t max_len, /* 0 for unlimited */
uint32_t min_threads,
uint32_t max_threads,
uint32_t idle_time_ms,
Curl_thrdq_item_free_cb *fn_free,
Curl_thrdq_item_process_cb *fn_process,
Curl_thrdq_ev_cb *fn_event, /* optional */
void *user_data);
/* Destroy the queue, free all queued items unprocessed and destroy
* the thread pool used.
* @param join TRUE when thread pool shall be joined. FALSE for
* detaching any running threads.
*/
void Curl_thrdq_destroy(struct curl_thrdq *tqueue, bool join);
/* Send "item" onto the queue. The caller needs to clear any reference
* to "item" on success, e.g. the queue takes ownership.
* `description` is an optional string describing the item for tracing
* purposes. It needs to have the same lifetime as `item`.
* Returns CURLE_AGAIN when the queue has already been full.
*
* With`timeout_ms` != 0, items that get stuck that long in the send
* queue are removed and added to the receive queue right away.
*/
CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item,
const char *description, timediff_t timeout_ms);
/* Receive the oldest, processed item from the queue again, if there is one.
* The caller takes ownership of the item received, e.g. the queue
* relinquishes all references to item.
* Returns CURLE_AGAIN when there is no processed item, setting `pitem`
* to NULL.
*/
CURLcode Curl_thrdq_recv(struct curl_thrdq *tqueue, void **pitem);
/* Return TRUE if the passed "item" matches. */
typedef bool Curl_thrdq_item_match_cb(void *item, void *match_data);
/* Clear all scheduled/processed items that match from the queue. This
* will *not* be able to clear items that are being processed.
*/
void Curl_thrdq_clear(struct curl_thrdq *tqueue,
Curl_thrdq_item_match_cb *fn_match,
void *match_data);
CURLcode Curl_thrdq_await_done(struct curl_thrdq *tqueue,
uint32_t timeout_ms);
#ifdef CURLVERBOSE
void Curl_thrdq_trace(struct curl_thrdq *tqueue,
struct Curl_easy *data,
struct curl_trc_feat *feat);
#endif
#endif /* USE_THREADS */
#endif /* HEADER_CURL_THRDQUEUE_H */

View File

@ -286,6 +286,8 @@ test3200 test3201 test3202 test3203 test3204 test3205 test3206 test3207 \
test3208 test3209 test3210 test3211 test3212 test3213 test3214 test3215 \ test3208 test3209 test3210 test3211 test3212 test3213 test3214 test3215 \
test3216 test3217 test3218 test3219 \ test3216 test3217 test3218 test3219 \
\ \
test3300 test3301 \
\
test4000 test4001 test4000 test4001
EXTRA_DIST = $(TESTCASES) DISABLED data-xml1 data320.html \ EXTRA_DIST = $(TESTCASES) DISABLED data-xml1 data320.html \

19
tests/data/test3300 Normal file
View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="US-ASCII"?>
<testcase>
<info>
<keywords>
unittest
threads
</keywords>
</info>
# Client-side
<client>
<features>
unittest
</features>
<name>
thrdpool unit tests
</name>
</client>
</testcase>

19
tests/data/test3301 Normal file
View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="US-ASCII"?>
<testcase>
<info>
<keywords>
unittest
threads
</keywords>
</info>
# Client-side
<client>
<features>
unittest
</features>
<name>
thrdqueue unit tests
</name>
</client>
</testcase>

View File

@ -45,4 +45,5 @@ TESTS_C = \
unit1979.c unit1980.c \ unit1979.c unit1980.c \
unit2600.c unit2601.c unit2602.c unit2603.c unit2604.c unit2605.c \ unit2600.c unit2601.c unit2602.c unit2603.c unit2604.c unit2605.c \
unit3200.c unit3205.c \ unit3200.c unit3205.c \
unit3211.c unit3212.c unit3213.c unit3214.c unit3216.c unit3219.c unit3211.c unit3212.c unit3213.c unit3214.c unit3216.c unit3219.c \
unit3300.c unit3301.c

164
tests/unit/unit3300.c Normal file
View File

@ -0,0 +1,164 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "unitcheck.h"
#include "curlx/wait.h"
#include "thrdpool.h"
#ifdef USE_THREADS
struct unit3300_ctx {
uint32_t total;
uint32_t taken;
uint32_t returned;
};
static uint32_t unit3300_item = 23;
static uint32_t unit3300_delay_ms = 0;
static void unit3300_ctx_init(struct unit3300_ctx *ctx,
uint32_t total,
uint32_t delay_ms)
{
memset(ctx, 0, sizeof(*ctx));
ctx->total = total;
unit3300_delay_ms = delay_ms;
}
static void *unit3300_take(void *user_data, const char **pdescription,
timediff_t *ptimeout_ms)
{
struct unit3300_ctx *ctx = user_data;
*pdescription = NULL;
*ptimeout_ms = 0;
if(ctx->taken < ctx->total) {
ctx->taken++;
return &unit3300_item;
}
return NULL;
}
static void unit3300_process(void *item)
{
fail_unless(item == &unit3300_item, "process unexpected item");
if(unit3300_delay_ms) {
curlx_wait_ms(unit3300_delay_ms);
}
}
static void unit3300_return(void *item, void *user_data)
{
struct unit3300_ctx *ctx = user_data;
(void)item;
if(ctx) {
ctx->returned++;
fail_unless(ctx->returned <= ctx->total, "returned too many");
}
}
static CURLcode test_unit3300(const char *arg)
{
UNITTEST_BEGIN_SIMPLE
struct curl_thrdpool *tpool;
struct unit3300_ctx ctx;
CURLcode r;
/* pool without minimum, will not start anything */
unit3300_ctx_init(&ctx, 10, 0);
r = Curl_thrdpool_create(&tpool, "unit3300a", 0, 2, 0,
unit3300_take, unit3300_process, unit3300_return,
&ctx);
fail_unless(!r, "pool-a create");
Curl_thrdpool_destroy(tpool, TRUE);
fail_unless(!ctx.returned, "pool-a unexpected items returned");
fail_unless(!ctx.taken, "pool-a unexpected items taken");
/* pool without minimum, signal start, consumes everything */
unit3300_ctx_init(&ctx, 10, 0);
r = Curl_thrdpool_create(&tpool, "unit3300b", 0, 2, 0,
unit3300_take, unit3300_process, unit3300_return,
&ctx);
fail_unless(!r, "pool-b create");
r = Curl_thrdpool_signal(tpool, 2);
fail_unless(!r, "pool-b signal");
Curl_thrdpool_await_idle(tpool, 0);
Curl_thrdpool_destroy(tpool, TRUE);
fail_unless(ctx.returned == ctx.total, "pool-b items returned missing");
fail_unless(ctx.taken == ctx.total, "pool-b items taken missing");
/* pool with minimum, consumes everything without signal */
unit3300_ctx_init(&ctx, 10, 0);
r = Curl_thrdpool_create(&tpool, "unit3300c", 1, 2, 0,
unit3300_take, unit3300_process, unit3300_return,
&ctx);
fail_unless(!r, "pool-c create");
Curl_thrdpool_await_idle(tpool, 0);
Curl_thrdpool_destroy(tpool, TRUE);
fail_unless(ctx.returned == ctx.total, "pool-c items returned missing");
fail_unless(ctx.taken == ctx.total, "pool-c items taken missing");
/* pool with many max, signal abundance, consumes everything */
unit3300_ctx_init(&ctx, 100, 0);
r = Curl_thrdpool_create(&tpool, "unit3300d", 0, 50, 0,
unit3300_take, unit3300_process, unit3300_return,
&ctx);
fail_unless(!r, "pool-d create");
r = Curl_thrdpool_signal(tpool, 100);
fail_unless(!r, "pool-d signal");
Curl_thrdpool_await_idle(tpool, 0);
Curl_thrdpool_destroy(tpool, TRUE);
fail_unless(ctx.returned == ctx.total, "pool-d items returned missing");
fail_unless(ctx.taken == ctx.total, "pool-d items taken missing");
/* pool with 1 max, many to take, no await, destroy without join */
unit3300_ctx_init(&ctx, 10000000, 1);
r = Curl_thrdpool_create(&tpool, "unit3300e", 0, 1, 0,
unit3300_take, unit3300_process, unit3300_return,
&ctx);
fail_unless(!r, "pool-e create");
r = Curl_thrdpool_signal(tpool, 100);
fail_unless(!r, "pool-e signal");
Curl_thrdpool_destroy(tpool, FALSE);
fail_unless(ctx.returned < ctx.total, "pool-e returned all");
fail_unless(ctx.taken < ctx.total, "pool-e took all");
#ifdef DEBUGBUILD
/* pool thread will notice destruction and should immediately abort.
* No memory leak should be reported. if the wait is too short on
* a slow system, thread sanitizer will freak out as memdebug will
* be called by threads after main thread shut down. */
curlx_wait_ms(1000);
#endif
UNITTEST_END_SIMPLE
}
#else
static CURLcode test_unit3300(const char *arg)
{
UNITTEST_BEGIN_SIMPLE
(void)arg;
UNITTEST_END_SIMPLE
}
#endif /* USE_THREADS */

144
tests/unit/unit3301.c Normal file
View File

@ -0,0 +1,144 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "unitcheck.h"
#include "curlx/wait.h"
#include "thrdqueue.h"
#include "curl_threads.h"
#ifdef USE_THREADS
struct unit3301_item {
int id;
BIT(processed);
};
struct unit3301_ctx {
volatile int event;
};
static struct unit3301_item *unit3301_item_create(int id)
{
struct unit3301_item *uitem;
uitem = curlx_calloc(1, sizeof(*uitem));
if(uitem) {
uitem->id = id;
curl_mfprintf(stderr, "created item %d\n", uitem->id);
}
return uitem;
}
static void unit3301_item_free(void *item)
{
struct unit3301_item *uitem = item;
curl_mfprintf(stderr, "free item %d\n", uitem->id);
curlx_free(uitem);
}
static void unit3301_event(const struct curl_thrdq *tqueue,
Curl_thrdq_event ev,
void *user_data)
{
struct unit3301_ctx *ctx = user_data;
(void)tqueue;
switch(ev) {
case CURL_THRDQ_EV_ITEM_DONE:
ctx->event = 1;
break;
default:
break;
}
}
static void unit3301_process(void *item)
{
struct unit3301_item *uitem = item;
curlx_wait_ms(1);
uitem->processed = TRUE;
}
static CURLcode test_unit3301(const char *arg)
{
UNITTEST_BEGIN_SIMPLE
struct curl_thrdq *tqueue;
struct unit3301_ctx ctx;
int i, count, nrecvd;
CURLcode r;
/* create and teardown queue */
memset(&ctx, 0, sizeof(ctx));
r = Curl_thrdq_create(&tqueue, "unit3301-a", 0, 0, 2, 1,
unit3301_item_free, unit3301_process, unit3301_event,
&ctx);
fail_unless(!r, "queue-a create");
Curl_thrdq_destroy(tqueue, TRUE);
tqueue = NULL;
fail_unless(!ctx.event, "queue-a unexpected done count");
/* create queue, have it process `count` items */
count = 10;
memset(&ctx, 0, sizeof(ctx));
r = Curl_thrdq_create(&tqueue, "unit3301-b", 0, 0, 2, 1,
unit3301_item_free, unit3301_process, unit3301_event,
&ctx);
fail_unless(!r, "queue-b create");
for(i = 0; i < count; ++i) {
struct unit3301_item *uitem = unit3301_item_create(i);
fail_unless(uitem, "queue-b item create");
r = Curl_thrdq_send(tqueue, uitem, NULL, 0);
fail_unless(!r, "queue-b send");
}
r = Curl_thrdq_await_done(tqueue, 0);
fail_unless(!r, "queue-b await done");
nrecvd = 0;
for(i = 0; i < count; ++i) {
void *item;
r = Curl_thrdq_recv(tqueue, &item);
fail_unless(!r, "queue-b recv");
if(item) {
struct unit3301_item *uitem = item;
curl_mfprintf(stderr, "received item %d\n", uitem->id);
++nrecvd;
fail_unless(uitem->processed, "queue-b recv unprocessed item");
unit3301_item_free(item);
}
}
Curl_thrdq_destroy(tqueue, TRUE);
tqueue = NULL;
fail_unless(nrecvd == count, "queue-b unexpected done count");
UNITTEST_END_SIMPLE
}
#else
static CURLcode test_unit3301(const char *arg)
{
UNITTEST_BEGIN_SIMPLE
(void)arg;
UNITTEST_END_SIMPLE
}
#endif /* USE_THREADS */