multi: add notifications API

Add infrastructure to colled and dispatch notifications for transfers
and the multi handle in general. Applications can register a callback
and en-/disable notification type the are interested in.

Without a callback installed, notifications are not collected. Same when
a notification type has not been enabled.

Memory allocation failures on adding notifications lead to a general
multi failure state and result in CURLM_OUT_OF_MEMORY returned from
curl_multi_perform() and curl_multi_socket*() invocations.

Closes #18432
This commit is contained in:
Stefan Eissing 2025-09-01 11:58:16 +02:00 committed by Daniel Stenberg
parent f4e83a0adc
commit 357808f4ad
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
24 changed files with 767 additions and 44 deletions

View File

@ -133,9 +133,10 @@ curl_multi_setopt
curl_multi_assign
curl_multi_get_handles
curl_multi_get_offt
curl_multi_notify_disable
curl_multi_notify_enable
curl_pushheader_bynum
curl_pushheader_byname
curl_multi_waitfds
curl_easy_option_by_name
curl_easy_option_by_id
curl_easy_option_next

View File

@ -78,6 +78,8 @@ man_MANS = \
curl_multi_get_offt.3 \
curl_multi_info_read.3 \
curl_multi_init.3 \
curl_multi_notify_disable.3 \
curl_multi_notify_enable.3 \
curl_multi_perform.3 \
curl_multi_poll.3 \
curl_multi_remove_handle.3 \

View File

@ -0,0 +1,66 @@
---
c: Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
SPDX-License-Identifier: curl
Title: curl_multi_notify_disable
Section: 3
Source: libcurl
See-also:
- CURLMOPT_NOTIFYFUNCTION (3)
- CURLMOPT_NOTIFYDATA (3)
- curl_multi_notify_enable (3)
Protocol:
- All
Added-in: 8.17.0
---
# NAME
curl_multi_notify_disable - disable a notification type
# SYNOPSIS
~~~c
#include <curl/curl.h>
CURLMcode curl_multi_notify_disable(CURLM *multi_handle,
unsigned int notification);
~~~
# DESCRIPTION
Disables collecting the given notification type in the multi handle. A
callback function installed via CURLMOPT_NOTIFYFUNCTION(3) is no longer
called when this notification happens.
Only when a notification callback is installed *and* a notification
is enabled are these collected and dispatched to the callback.
Several notification types can be enabled at the same time. Disabling
an already disabled notification is not an error.
A notification can be enabled again via curl_multi_notify_enable(3).
# %PROTOCOLS%
# EXAMPLE
~~~c
int main(void)
{
int rc;
CURLM *multi = curl_multi_init();
rc = curl_multi_notify_disable(multi, CURLM_NTFY_INFO_READ);
}
~~~
# %AVAILABILITY%
# RETURN VALUE
This function returns a CURLMcode indicating success or error.
CURLM_OK (0) means everything was OK, non-zero means an error occurred, see
libcurl-errors(3).
The return code is for the whole multi stack. Problems still might have
occurred on individual transfers even when one of these functions return OK.

View File

@ -0,0 +1,66 @@
---
c: Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
SPDX-License-Identifier: curl
Title: curl_multi_notify_enable
Section: 3
Source: libcurl
See-also:
- CURLMOPT_NOTIFYFUNCTION (3)
- CURLMOPT_NOTIFYDATA (3)
- curl_multi_notify_disable (3)
Protocol:
- All
Added-in: 8.17.0
---
# NAME
curl_multi_notify_enable - enable a notification type
# SYNOPSIS
~~~c
#include <curl/curl.h>
CURLMcode curl_multi_notify_enable(CURLM *multi_handle,
unsigned int notification);
~~~
# DESCRIPTION
Enables collecting the given notification type in the multi handle. A
callback function installed via CURLMOPT_NOTIFYFUNCTION(3) is called
when this notification happens.
Only when a notification callback is installed *and* a notification
is enabled are these collected and dispatched to the callback.
Several notification types can be enabled at the same time. Enabling
an already enabled notification is not an error.
A notification can be disabled again via curl_multi_notify_disable(3).
# %PROTOCOLS%
# EXAMPLE
~~~c
int main(void)
{
int rc;
CURLM *multi = curl_multi_init();
rc = curl_multi_notify_enable(multi, CURLM_NTFY_INFO_READ);
}
~~~
# %AVAILABILITY%
# RETURN VALUE
This function returns a CURLMcode indicating success or error.
CURLM_OK (0) means everything was OK, non-zero means an error occurred, see
libcurl-errors(3).
The return code is for the whole multi stack. Problems still might have
occurred on individual transfers even when one of these functions return OK.

View File

@ -72,6 +72,14 @@ Max simultaneously open connections. See CURLMOPT_MAX_TOTAL_CONNECTIONS(3)
Signal that the network has changed. See CURLMOPT_NETWORK_CHANGED(3)
## CURLMOPT_NOTIFYDATA
Custom pointer passed to the notify callback. See CURLMOPT_NOTIFYDATA(3)
## CURLMOPT_NOTIFYFUNCTION
Callback that receives notifications. See CURLMOPT_NOTIFYFUNCTION(3)
## CURLMOPT_PIPELINING
Enable HTTP multiplexing. See CURLMOPT_PIPELINING(3)

View File

@ -0,0 +1,72 @@
---
c: Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
SPDX-License-Identifier: curl
Title: CURLMOPT_NOTIFYDATA
Section: 3
Source: libcurl
See-also:
- CURLMOPT_NOTIFYFUNCTION (3)
- curl_multi_notify_disable (3)
- curl_multi_notify_enable (3)
Protocol:
- All
Added-in: 8.17.0
---
# NAME
CURLMOPT_NOTIFYDATA - custom pointer passed to the notification callback
# SYNOPSIS
~~~c
#include <curl/curl.h>
CURLMcode curl_multi_setopt(CURLM *handle, CURLMOPT_NOTIFYDATA, void *pointer);
~~~
# DESCRIPTION
A data *pointer* to pass to the notification callback set with the
CURLMOPT_NOTIFYFUNCTION(3) option.
This pointer is not touched by libcurl but is only passed in as the
notification callback's **clientp** argument.
# DEFAULT
NULL
# %PROTOCOLS%
# EXAMPLE
~~~c
struct priv {
void *ours;
};
static void ntfy_cb(CURLM *multi, unsigned int notification,
CURL *easy, void *ntfyp)
{
struct priv *p = ntfyp;
printf("my ptr: %p\n", p->ours);
/* ... */
}
int main(void)
{
struct priv setup;
CURLM *multi = curl_multi_init();
/* ... use socket callback and custom pointer */
curl_multi_setopt(multi, CURLMOPT_NOTIFYFUNCTION, ntfy_cb);
curl_multi_setopt(multi, CURLMOPT_NOTIFYDATA, &setup);
curl_multi_notify_enable(multi, CURLM_NTFY_INFO_READ);
}
~~~
# %AVAILABILITY%
# RETURN VALUE
Returns CURLM_OK.

View File

@ -0,0 +1,129 @@
---
c: Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
SPDX-License-Identifier: curl
Title: CURLMOPT_NOTIFYFUNCTION
Section: 3
Source: libcurl
See-also:
- CURLMOPT_NOTIFYDATA (3)
- curl_multi_socket_action (3)
- curl_multi_notify_disable (3)
- curl_multi_notify_enable (3)
Protocol:
- All
Added-in: 8.17.0
---
# NAME
CURLMOPT_NOTIFYFUNCTION - callback receiving notifications
# SYNOPSIS
~~~c
#include <curl/curl.h>
void ntfy_callback(CURLM *multi, /* multi handle */
unsigned int notification, /* notification type */
CURL *easy, /* easy handle */
void *ntfyp); /* private ntfy pointer */
CURLMcode curl_multi_setopt(CURLM *handle, CURLMOPT_NOTIFYFUNCTION, ntfy_callback);
~~~
# DESCRIPTION
Pass a pointer to your callback function, which should match the prototype
shown above.
When the multi handle processes transfers, changes can be observed
by receiving notifications about them. This can eliminate the need to
constantly interrogate the multi handle to observe such changes to
act on them.
Notifications are collected and dispatched to the application's callback
function at an appropriate time.
The notify callback is different from other callbacks in that it
can use more libcurl API functions. Apart from curl_multi_perform(3),
curl_multi_socket(3), curl_multi_socket_action(3), curl_multi_socket_all(3)
and curl_multi_cleanup(3) it may call all other methods on the
multi and easy handles. This includes adding and removing easy
handles to/from the multi handle.
This callback may get invoked at any time when interacting with libcurl.
This may even happen after all transfers are done and *may also*
happen *during* a call to curl_multi_cleanup(3) when cached connections
are shut down.
# CALLBACK ARGUMENTS
*multi* identifies the multi handle that triggered the notification.
**notification** is the type of notification, e.g. what happened. The
following types are available:
## CURLM_NTFY_INFO_READ
When enabled via curl_multi_notify_enable(3), this informs the application
that there are new messages to be processed via curl_multi_info_read(3).
This notification happens whenever a message is added to an empty
message stack in the multi handle and not for subsequent additions. The
notification callback is then expected to read all available message,
emptying the stack, so a subsequent addition triggers the notification
again.
The *easy* handle passed is an internal handle.
## CURLM_NTFY_EASY_DONE
When enabled via curl_multi_notify_enable(3), this notification is triggered
when a an easy handle has finished. This happens both for
successful and failed transfers.
The *easy* handle passed is the transfer that is done. This *may* be
an internal handle when DoH or other features are used.
*easy* identifies the transfer involved. This may be one of the
application's own easy handle or an internal handle.
**ntfyp** is set with CURLMOPT_NOTIFYDATA(3).
# DEFAULT
NULL (no callback)
# %PROTOCOLS%
# EXAMPLE
~~~c
struct priv {
void *ours;
};
static void ntfy_cb(CURLM *multi, unsigned int notification,
CURL *easy, void *ntfyp)
{
struct priv *p = ntfyp;
printf("my ptr: %p\n", p->ours);
/* ... */
}
int main(void)
{
struct priv setup;
CURLM *multi = curl_multi_init();
/* ... use socket callback and custom pointer */
curl_multi_setopt(multi, CURLMOPT_NOTIFYFUNCTION, ntfy_cb);
curl_multi_setopt(multi, CURLMOPT_NOTIFYDATA, &setup);
curl_multi_notify_enable(multi, CURLM_NTFY_INFO_READ);
}
~~~
# %AVAILABILITY%
# RETURN VALUE
Returns CURLM_OK.

View File

@ -114,6 +114,8 @@ man_MANS = \
CURLMOPT_MAX_TOTAL_CONNECTIONS.3 \
CURLMOPT_MAXCONNECTS.3 \
CURLMOPT_NETWORK_CHANGED.3 \
CURLMOPT_NOTIFYDATA.3 \
CURLMOPT_NOTIFYFUNCTION.3 \
CURLMOPT_PIPELINING.3 \
CURLMOPT_PIPELINING_SERVER_BL.3 \
CURLMOPT_PIPELINING_SITE_BL.3 \

View File

@ -545,6 +545,8 @@ CURLM_BAD_SOCKET 7.15.4
CURLM_CALL_MULTI_PERFORM 7.9.6
CURLM_CALL_MULTI_SOCKET 7.15.5
CURLM_INTERNAL_ERROR 7.9.6
CURLM_NTFY_EASY_DONE 8.17.0
CURLM_NTFY_INFO_READ 8.17.0
CURLM_OK 7.9.6
CURLM_OUT_OF_MEMORY 7.9.6
CURLM_RECURSIVE_API_CALL 7.59.0
@ -568,6 +570,8 @@ CURLMOPT_MAX_PIPELINE_LENGTH 7.30.0
CURLMOPT_MAX_TOTAL_CONNECTIONS 7.30.0
CURLMOPT_MAXCONNECTS 7.16.3
CURLMOPT_NETWORK_CHANGED 8.16.0
CURLMOPT_NOTIFYDATA 8.17.0
CURLMOPT_NOTIFYFUNCTION 8.17.0
CURLMOPT_PIPELINING 7.16.0
CURLMOPT_PIPELINING_SERVER_BL 7.30.0
CURLMOPT_PIPELINING_SITE_BL 7.30.0

View File

@ -398,6 +398,12 @@ typedef enum {
/* network has changed, adjust caches/connection reuse */
CURLOPT(CURLMOPT_NETWORK_CHANGED, CURLOPTTYPE_LONG, 17),
/* This is the notify callback function pointer */
CURLOPT(CURLMOPT_NOTIFYFUNCTION, CURLOPTTYPE_FUNCTIONPOINT, 18),
/* This is the argument passed to the notify callback */
CURLOPT(CURLMOPT_NOTIFYDATA, CURLOPTTYPE_OBJECTPOINT, 19),
CURLMOPT_LASTENTRY /* the last unused */
} CURLMoption;
@ -520,6 +526,27 @@ CURL_EXTERN CURLMcode curl_multi_waitfds(CURLM *multi,
unsigned int size,
unsigned int *fd_count);
/*
* Notifications dispatched by a multi handle, when enabled.
*/
#define CURLM_NTFY_INFO_READ 0
#define CURLM_NTFY_EASY_DONE 1
/*
* Callback to install via CURLMOPT_NOTIFYFUNCTION.
*/
typedef void (*curl_notify_callback)(CURLM *multi,
unsigned int notification,
CURL *easy,
void *user_data);
CURL_EXTERN CURLMcode curl_multi_notify_disable(CURLM *multi,
unsigned int notification);
CURL_EXTERN CURLMcode curl_multi_notify_enable(CURLM *multi,
unsigned int notification);
#ifdef __cplusplus
} /* end of extern "C" */
#endif

View File

@ -208,6 +208,9 @@
if(curlcheck_charpp_option(option)) \
if(!curlcheck_ptrptr(value, char)) \
Wcurl_multi_setopt_err_charpp(); \
if((option) == CURLMOPT_NOTIFYFUNCTION) \
if(!curlcheck_multintfy_cb(value)) \
Wcurl_multi_setopt_err_ntfycb(); \
if((option) == CURLMOPT_PUSHFUNCTION) \
if(!curlcheck_multipush_cb(value)) \
Wcurl_multi_setopt_err_pushcb(); \
@ -224,7 +227,8 @@
/* evaluates to true if the option takes a data argument to pass to a
callback */
#define curlcheck_multicb_data_option(option) \
((option) == CURLMOPT_PUSHDATA || \
((option) == CURLMOPT_NOTIFYDATA || \
(option) == CURLMOPT_PUSHDATA || \
(option) == CURLMOPT_SOCKETDATA || \
(option) == CURLMOPT_TIMERDATA || \
0)
@ -250,6 +254,11 @@
(curlcheck_NULL(expr) || \
curlcheck_cb_compatible((expr), curl_push_callback))
/* evaluates to true if expr is of type curl_push_callback */
#define curlcheck_multintfy_cb(expr) \
(curlcheck_NULL(expr) || \
curlcheck_cb_compatible((expr), curl_notify_callback))
/*
* For now, just make sure that the functions are called with three arguments
*/
@ -275,6 +284,8 @@ CURLWARNING(Wcurl_multi_setopt_err_charpp,
"curl_multi_setopt expects a 'char **' argument")
CURLWARNING(Wcurl_multi_setopt_err_pushcb,
"curl_multi_setopt expects a curl_push_callback argument")
CURLWARNING(Wcurl_multi_setopt_err_ntfycb,
"curl_multi_setopt expects a curl_notify_callback argument")
CURLWARNING(Wcurl_multi_setopt_err_socketcb,
"curl_multi_setopt expects a curl_socket_callback argument")
CURLWARNING(Wcurl_multi_setopt_err_timercb,

View File

@ -225,6 +225,7 @@ LIB_CFILES = \
mqtt.c \
multi.c \
multi_ev.c \
multi_ntfy.c \
netrc.c \
noproxy.c \
openldap.c \
@ -357,6 +358,7 @@ LIB_HFILES = \
mqtt.h \
multihandle.h \
multi_ev.h \
multi_ntfy.h \
multiif.h \
netrc.h \
noproxy.h \

View File

@ -57,6 +57,8 @@ curl_multi_get_handles
curl_multi_get_offt
curl_multi_info_read
curl_multi_init
curl_multi_notify_disable
curl_multi_notify_enable
curl_multi_perform
curl_multi_poll
curl_multi_remove_handle

View File

@ -171,8 +171,15 @@ static void mstate(struct Curl_easy *data, CURLMstate state
#endif
data->mstate = state;
if(state == MSTATE_COMPLETED) {
switch(state) {
case MSTATE_DONE:
CURLM_NTFY(data, CURLM_NTFY_EASY_DONE);
break;
case MSTATE_COMPLETED:
/* we sometimes directly jump to COMPLETED, trigger also a notification
* in that case. */
if(oldstate < MSTATE_DONE)
CURLM_NTFY(data, CURLM_NTFY_EASY_DONE);
/* changing to COMPLETED means it is in process and needs to go */
DEBUGASSERT(Curl_uint_bset_contains(&data->multi->process, data->mid));
Curl_uint_bset_remove(&data->multi->process, data->mid);
@ -182,6 +189,9 @@ static void mstate(struct Curl_easy *data, CURLMstate state
/* free the transfer buffer when we have no more active transfers */
multi_xfer_bufs_free(data->multi);
}
break;
default:
break;
}
/* if this state has an init-function, run it */
@ -215,6 +225,8 @@ static void ph_freeentry(void *p)
*/
static void multi_addmsg(struct Curl_multi *multi, struct Curl_message *msg)
{
if(!Curl_llist_count(&multi->msglist))
CURLM_NTFY(multi->admin, CURLM_NTFY_INFO_READ);
Curl_llist_append(&multi->msglist, msg, &msg->list);
}
@ -232,6 +244,7 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size,
multi->magic = CURL_MULTI_HANDLE;
Curl_dnscache_init(&multi->dnscache, dnssize);
Curl_mntfy_init(multi);
Curl_multi_ev_init(multi, ev_hashsize);
Curl_uint_tbl_init(&multi->xfers, NULL);
Curl_uint_bset_init(&multi->process);
@ -246,7 +259,8 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size,
multi->max_concurrent_streams = 100;
multi->last_timeout_ms = -1;
if(Curl_uint_bset_resize(&multi->process, xfer_table_size) ||
if(Curl_mntfy_resize(multi) ||
Curl_uint_bset_resize(&multi->process, xfer_table_size) ||
Curl_uint_bset_resize(&multi->pending, xfer_table_size) ||
Curl_uint_bset_resize(&multi->dirty, xfer_table_size) ||
Curl_uint_bset_resize(&multi->msgsent, xfer_table_size) ||
@ -305,6 +319,7 @@ error:
multi->admin->multi = NULL;
Curl_close(&multi->admin);
}
Curl_mntfy_cleanup(multi);
Curl_uint_bset_destroy(&multi->process);
Curl_uint_bset_destroy(&multi->dirty);
@ -2754,6 +2769,9 @@ CURLMcode curl_multi_perform(CURLM *m, int *running_handles)
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
if(multi->in_ntfy_callback)
return CURLM_RECURSIVE_API_CALL;
sigpipe_init(&pipe_st);
if(Curl_uint_bset_first(&multi->process, &mid)) {
CURL_TRC_M(multi->admin, "multi_perform(running=%u)",
@ -2785,6 +2803,9 @@ CURLMcode curl_multi_perform(CURLM *m, int *running_handles)
if(multi_ischanged(m, TRUE))
process_pending_handles(m);
if(!returncode)
returncode = Curl_mntfy_dispatch_all(multi);
/*
* Simply remove all expired timers from the splay since handles are dealt
* with unconditionally by this function and curl_multi_timeout() requires
@ -2831,6 +2852,8 @@ CURLMcode curl_multi_cleanup(CURLM *m)
unsigned int mid;
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
if(multi->in_ntfy_callback)
return CURLM_RECURSIVE_API_CALL;
/* First remove all remaining easy handles,
* close internal ones. admin handle is special */
@ -2900,6 +2923,7 @@ CURLMcode curl_multi_cleanup(CURLM *m)
#endif
multi_xfer_bufs_free(multi);
Curl_mntfy_cleanup(multi);
#ifdef DEBUGBUILD
if(Curl_uint_tbl_count(&multi->xfers)) {
multi_xfer_tbl_dump(multi);
@ -3180,6 +3204,9 @@ out:
if(multi_ischanged(multi, TRUE))
process_pending_handles(multi);
if(!result)
result = Curl_mntfy_dispatch_all(multi);
if(running_handles) {
unsigned int running = Curl_multi_xfers_running(multi);
*running_handles = (running < INT_MAX) ? (int)running : INT_MAX;
@ -3269,6 +3296,12 @@ CURLMcode curl_multi_setopt(CURLM *m,
}
break;
}
case CURLMOPT_NOTIFYFUNCTION:
multi->ntfy.ntfy_cb = va_arg(param, curl_notify_callback);
break;
case CURLMOPT_NOTIFYDATA:
multi->ntfy.ntfy_cb_data = va_arg(param, void *);
break;
default:
res = CURLM_UNKNOWN_OPTION;
break;
@ -3285,6 +3318,8 @@ CURLMcode curl_multi_socket(CURLM *m, curl_socket_t s, int *running_handles)
struct Curl_multi *multi = m;
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
if(multi->in_ntfy_callback)
return CURLM_RECURSIVE_API_CALL;
return multi_socket(multi, FALSE, s, 0, running_handles);
}
@ -3294,6 +3329,8 @@ CURLMcode curl_multi_socket_action(CURLM *m, curl_socket_t s,
struct Curl_multi *multi = m;
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
if(multi->in_ntfy_callback)
return CURLM_RECURSIVE_API_CALL;
return multi_socket(multi, FALSE, s, ev_bitmask, running_handles);
}
@ -3302,6 +3339,8 @@ CURLMcode curl_multi_socket_all(CURLM *m, int *running_handles)
struct Curl_multi *multi = m;
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
if(multi->in_ntfy_callback)
return CURLM_RECURSIVE_API_CALL;
return multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles);
}
@ -3996,6 +4035,24 @@ void Curl_multi_clear_dirty(struct Curl_easy *data)
Curl_uint_bset_remove(&data->multi->dirty, data->mid);
}
CURLMcode curl_multi_notify_enable(CURLM *m, unsigned int notification)
{
struct Curl_multi *multi = m;
if(!GOOD_MULTI_HANDLE(multi))
return CURLM_BAD_HANDLE;
return Curl_mntfy_enable(multi, notification);
}
CURLMcode curl_multi_notify_disable(CURLM *m, unsigned int notification)
{
struct Curl_multi *multi = m;
if(!GOOD_MULTI_HANDLE(multi))
return CURLM_BAD_HANDLE;
return Curl_mntfy_disable(multi, notification);
}
#ifdef DEBUGBUILD
static void multi_xfer_dump(struct Curl_multi *multi, unsigned int mid,
void *entry)

212
lib/multi_ntfy.c Normal file
View File

@ -0,0 +1,212 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include <curl/curl.h>
#include "urldata.h"
#include "curl_trc.h"
#include "multihandle.h"
#include "multiif.h"
#include "multi_ntfy.h"
/* The last 3 #include files should be in this order */
#include "curl_printf.h"
#include "curl_memory.h"
#include "memdebug.h"
struct mntfy_entry {
unsigned int mid;
unsigned int type;
};
#define CURL_MNTFY_CHUNK_SIZE 128
struct mntfy_chunk {
struct mntfy_chunk *next;
size_t r_offset;
size_t w_offset;
struct mntfy_entry entries[CURL_MNTFY_CHUNK_SIZE];
};
static struct mntfy_chunk *mnfty_chunk_create(void)
{
return calloc(1, sizeof(struct mntfy_chunk));
}
static void mnfty_chunk_destroy(struct mntfy_chunk *chunk)
{
free(chunk);
}
static void mnfty_chunk_reset(struct mntfy_chunk *chunk)
{
memset(chunk, 0, sizeof(*chunk));
}
static bool mntfy_chunk_append(struct mntfy_chunk *chunk,
struct Curl_easy *data,
unsigned int type)
{
struct mntfy_entry *e;
if(chunk->w_offset >= CURL_MNTFY_CHUNK_SIZE)
return FALSE;
e = &chunk->entries[chunk->w_offset++];
e->mid = data->mid;
e->type = type;
return TRUE;
}
static struct mntfy_chunk *mntfy_non_full_tail(struct curl_multi_ntfy *mntfy)
{
struct mntfy_chunk *chunk;
if(!mntfy->tail) {
chunk = mnfty_chunk_create();
if(!chunk)
return NULL;
DEBUGASSERT(!mntfy->head);
mntfy->head = mntfy->tail = chunk;
return chunk;
}
else if(mntfy->tail->w_offset < CURL_MNTFY_CHUNK_SIZE)
return mntfy->tail;
else { /* tail is full. */
chunk = mnfty_chunk_create();
if(!chunk)
return NULL;
DEBUGASSERT(mntfy->head);
mntfy->tail->next = chunk;
mntfy->tail = chunk;
return chunk;
}
}
static void mntfy_chunk_dispatch_all(struct Curl_multi *multi,
struct mntfy_chunk *chunk)
{
struct mntfy_entry *e;
struct Curl_easy *data;
if(multi->ntfy.ntfy_cb) {
while((chunk->r_offset < chunk->w_offset) && !multi->ntfy.failure) {
e = &chunk->entries[chunk->r_offset];
data = e->mid ? Curl_multi_get_easy(multi, e->mid) : multi->admin;
/* only when notification has not been disabled in the meantime */
if(data && Curl_uint_bset_contains(&multi->ntfy.enabled, e->type)) {
/* this may cause new notifications to be added! */
CURL_TRC_M(multi->admin, "[NTFY] dispatch %d to xfer %u",
e->type, e->mid);
multi->ntfy.ntfy_cb(multi, e->type, data, multi->ntfy.ntfy_cb_data);
}
/* once dispatched, safe to increment */
chunk->r_offset++;
}
}
mnfty_chunk_reset(chunk);
}
void Curl_mntfy_init(struct Curl_multi *multi)
{
memset(&multi->ntfy, 0, sizeof(multi->ntfy));
Curl_uint_bset_init(&multi->ntfy.enabled);
}
CURLMcode Curl_mntfy_resize(struct Curl_multi *multi)
{
if(Curl_uint_bset_resize(&multi->ntfy.enabled, CURLM_NTFY_EASY_DONE + 1))
return CURLM_OUT_OF_MEMORY;
return CURLM_OK;
}
void Curl_mntfy_cleanup(struct Curl_multi *multi)
{
while(multi->ntfy.head) {
struct mntfy_chunk *chunk = multi->ntfy.head;
multi->ntfy.head = chunk->next;
mnfty_chunk_destroy(chunk);
}
multi->ntfy.tail = NULL;
Curl_uint_bset_destroy(&multi->ntfy.enabled);
}
CURLMcode Curl_mntfy_enable(struct Curl_multi *multi, unsigned int type)
{
if(type > CURLM_NTFY_EASY_DONE)
return CURLM_UNKNOWN_OPTION;
Curl_uint_bset_add(&multi->ntfy.enabled, type);
return CURLM_OK;
}
CURLMcode Curl_mntfy_disable(struct Curl_multi *multi, unsigned int type)
{
if(type > CURLM_NTFY_EASY_DONE)
return CURLM_UNKNOWN_OPTION;
Curl_uint_bset_remove(&multi->ntfy.enabled, type);
return CURLM_OK;
}
void Curl_mntfy_add(struct Curl_easy *data, unsigned int type)
{
struct Curl_multi *multi = data ? data->multi : NULL;
if(multi && multi->ntfy.ntfy_cb && !multi->ntfy.failure &&
Curl_uint_bset_contains(&multi->ntfy.enabled, type)) {
/* append to list of outstanding notifications */
struct mntfy_chunk *tail = mntfy_non_full_tail(&multi->ntfy);
CURL_TRC_M(data, "[NTFY] add %d for xfer %u", type, data->mid);
if(tail)
mntfy_chunk_append(tail, data, type);
else
multi->ntfy.failure = CURLM_OUT_OF_MEMORY;
}
}
CURLMcode Curl_mntfy_dispatch_all(struct Curl_multi *multi)
{
DEBUGASSERT(!multi->in_ntfy_callback);
multi->in_ntfy_callback = TRUE;
while(multi->ntfy.head && !multi->ntfy.failure) {
struct mntfy_chunk *chunk = multi->ntfy.head;
/* this may cause new notifications to be added! */
mntfy_chunk_dispatch_all(multi, chunk);
DEBUGASSERT(chunk->r_offset == chunk->w_offset);
if(chunk == multi->ntfy.tail) /* last one, keep */
break;
DEBUGASSERT(chunk->next);
DEBUGASSERT(multi->ntfy.head != multi->ntfy.tail);
multi->ntfy.head = chunk->next;
mnfty_chunk_destroy(chunk);
}
multi->in_ntfy_callback = FALSE;
if(multi->ntfy.failure) {
CURLMcode result = multi->ntfy.failure;
multi->ntfy.failure = CURLM_OK; /* reset, once delivered */
return result;
}
return CURLM_OK;
}

57
lib/multi_ntfy.h Normal file
View File

@ -0,0 +1,57 @@
#ifndef HEADER_CURL_MULTI_NTFY_H
#define HEADER_CURL_MULTI_NTFY_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "uint-bset.h"
struct Curl_easy;
struct Curl_multi;
struct curl_multi_ntfy {
curl_notify_callback ntfy_cb;
void *ntfy_cb_data;
struct uint_bset enabled;
CURLMcode failure;
struct mntfy_chunk *head;
struct mntfy_chunk *tail;
};
void Curl_mntfy_init(struct Curl_multi *multi);
CURLMcode Curl_mntfy_resize(struct Curl_multi *multi);
void Curl_mntfy_cleanup(struct Curl_multi *multi);
CURLMcode Curl_mntfy_enable(struct Curl_multi *multi, unsigned int type);
CURLMcode Curl_mntfy_disable(struct Curl_multi *multi, unsigned int type);
void Curl_mntfy_add(struct Curl_easy *data, unsigned int type);
#define CURLM_NTFY(d,t) \
do { if((d) && (d)->multi && (d)->multi->ntfy.ntfy_cb) \
Curl_mntfy_add((d), (t)); } while(0)
CURLMcode Curl_mntfy_dispatch_all(struct Curl_multi *multi);
#endif /* HEADER_CURL_MULTI_NTFY_H */

View File

@ -30,6 +30,7 @@
#include "cshutdn.h"
#include "hostip.h"
#include "multi_ev.h"
#include "multi_ntfy.h"
#include "psl.h"
#include "socketpair.h"
#include "uint-bset.h"
@ -134,6 +135,8 @@ struct Curl_multi {
/* multi event related things */
struct curl_multi_ev ev;
/* multi notification related things */
struct curl_multi_ntfy ntfy;
/* `proto_hash` is a general key-value store for protocol implementations
* with the lifetime of the multi handle. The number of elements kept here
@ -178,6 +181,7 @@ struct Curl_multi {
BIT(multiplexing); /* multiplexing wanted */
BIT(recheckstate); /* see Curl_multi_connchanged */
BIT(in_callback); /* true while executing a callback */
BIT(in_ntfy_callback); /* true while dispatching notifications */
#ifdef USE_OPENSSL
BIT(ssl_seeded);
#endif

View File

@ -116,6 +116,8 @@ my %api = (
'curl_multi_get_offt' => 'API',
'curl_multi_info_read' => 'API',
'curl_multi_init' => 'API',
'curl_multi_notify_disable' => 'API',
'curl_multi_notify_enable' => 'API',
'curl_multi_perform' => 'API',
'curl_multi_remove_handle' => 'API',
'curl_multi_setopt' => 'API',

View File

@ -1464,26 +1464,8 @@ struct contextuv {
struct datauv *uv;
};
static CURLcode check_finished(struct parastate *s);
static void check_multi_info(struct datauv *uv)
{
CURLcode result;
result = check_finished(uv->s);
if(result && !uv->s->result)
uv->s->result = result;
if(uv->s->more_transfers) {
result = add_parallel_transfers(uv->s->multi, uv->s->share,
&uv->s->more_transfers,
&uv->s->added_transfers);
if(result && !uv->s->result)
uv->s->result = result;
if(result)
uv_stop(uv->loop);
}
}
static void mnotify(CURLM *multi, unsigned int notification,
CURL *easy, void *user_data);
/* callback from libuv on socket activity */
static void on_uv_socket(uv_poll_t *req, int status, int events)
@ -1510,7 +1492,6 @@ static void on_uv_timeout(uv_timer_t *req)
if(uv && uv->s) {
curl_multi_socket_action(uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
&uv->s->still_running);
check_multi_info(uv);
}
}
@ -1596,8 +1577,6 @@ static int cb_socket(CURL *easy, curl_socket_t s, int action,
uv_poll_stop(&c->poll_handle);
destroy_context(c);
curl_multi_assign(uv->s->multi, s, NULL);
/* check if we can do more now */
check_multi_info(uv);
}
break;
default:
@ -1641,10 +1620,6 @@ static CURLcode parallel_event(struct parastate *s)
curl_mfprintf(tool_stderr, "parallel_event: uv_run() returned\n");
#endif
result = check_finished(s);
if(result && !s->result)
s->result = result;
/* early exit called */
if(s->wrapitup) {
if(s->still_running && !s->wrapitup_processed) {
@ -1657,13 +1632,6 @@ static CURLcode parallel_event(struct parastate *s)
}
break;
}
if(s->more_transfers) {
result = add_parallel_transfers(s->multi, s->share, &s->more_transfers,
&s->added_transfers);
if(result && !s->result)
s->result = result;
}
}
result = s->result;
@ -1758,6 +1726,27 @@ static CURLcode check_finished(struct parastate *s)
return result;
}
static void mnotify(CURLM *multi, unsigned int notification,
CURL *easy, void *user_data)
{
struct parastate *s = user_data;
CURLcode result;
(void)multi;
(void)easy;
switch(notification) {
case CURLM_NTFY_INFO_READ:
result = check_finished(s);
/* remember first failure */
if(result && !s->result)
s->result = result;
break;
default:
break;
}
}
static CURLcode parallel_transfers(CURLSH *share)
{
CURLcode result;
@ -1775,6 +1764,10 @@ static CURLcode parallel_transfers(CURLSH *share)
if(!s->multi)
return CURLE_OUT_OF_MEMORY;
(void)curl_multi_setopt(s->multi, CURLMOPT_NOTIFYFUNCTION, mnotify);
(void)curl_multi_setopt(s->multi, CURLMOPT_NOTIFYDATA, s);
(void)curl_multi_notify_enable(s->multi, CURLM_NTFY_INFO_READ);
result = add_parallel_transfers(s->multi, s->share,
&s->more_transfers, &s->added_transfers);
if(result) {
@ -1813,13 +1806,14 @@ static CURLcode parallel_transfers(CURLSH *share)
s->mcode = curl_multi_poll(s->multi, NULL, 0, 1000, NULL);
if(!s->mcode)
s->mcode = curl_multi_perform(s->multi, &s->still_running);
if(!s->mcode)
result = check_finished(s);
}
(void)progress_meter(s->multi, &s->start, TRUE);
}
/* Result is the first failed transfer - if there was one. */
result = s->result;
/* Make sure to return some kind of error if there was a multi problem */
if(s->mcode) {
result = (s->mcode == CURLM_OUT_OF_MEMORY) ? CURLE_OUT_OF_MEMORY :

View File

@ -109,6 +109,8 @@ curl_multi_get_offt
curl_pushheader_bynum
curl_pushheader_byname
curl_multi_waitfds
curl_multi_notify_disable
curl_multi_notify_enable
curl_easy_option_by_name
curl_easy_option_by_id
curl_easy_option_next

View File

@ -172,7 +172,7 @@ https://localhost:%HTTPSPORT/%TESTNUMBER %CERTDIR/certs/test-ca.crt
# Verify data after the test has been "shot"
<verify>
<limits>
Allocations: 13500
Allocations: 13600
</limits>
</verify>
</testcase>

View File

@ -55,7 +55,7 @@ Accept: */*
</protocol>
<limits>
Allocations: 81
Allocations: 82
Maximum allocated: 33400
</limits>
</verify>

View File

@ -126,6 +126,9 @@ class DTraceProfile:
'-n', f'profile-97 /pid == {self._pid}/ {{ @[ustack()] = count(); }} tick-60s {{ exit(0); }}',
'-o', f'{self._file}'
]
if sys.platform.startswith('darwin'):
# macOS seems to like this for producing symbols in user stacks
args.extend(['-p', f'{self._pid}'])
self._proc = subprocess.Popen(args, text=True, cwd=self._run_dir, shell=False)
assert self._proc

View File

@ -43,7 +43,7 @@ static void checksize(const char *name, size_t size, size_t allowed)
/* the maximum sizes we allow specific structs to grow to */
#define MAX_CURL_EASY 5800
#define MAX_CONNECTDATA 1300
#define MAX_CURL_MULTI 750
#define MAX_CURL_MULTI 850
#define MAX_CURL_HTTPPOST 112
#define MAX_CURL_SLIST 16
#define MAX_CURL_KHKEY 24