socketpair: cleaner interface

Declutter the ifdefs in socketpair.h. Introduce Curl_wakeup_*()
function that encapsulate the details about how the socketpair
is implemented.

This moves the EVENTFD specials from the using code into socketpair
implemenatation, avoiding duplications in three places.

Closes #20340
This commit is contained in:
Stefan Eissing 2026-01-16 13:59:03 +01:00 committed by Daniel Stenberg
parent 1a57302d1a
commit 6c8956c1cb
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
4 changed files with 144 additions and 138 deletions

View File

@ -133,12 +133,7 @@ static void addr_ctx_unlink(struct async_thrdd_addr_ctx **paddr_ctx,
curlx_free(addr_ctx->hostname);
if(addr_ctx->res)
Curl_freeaddrinfo(addr_ctx->res);
#ifndef CURL_DISABLE_SOCKETPAIR
#ifndef USE_EVENTFD
wakeup_close(addr_ctx->sock_pair[1]);
#endif
wakeup_close(addr_ctx->sock_pair[0]);
#endif
Curl_wakeup_destroy(addr_ctx->sock_pair);
curlx_free(addr_ctx);
}
*paddr_ctx = NULL;
@ -169,7 +164,7 @@ addr_ctx_create(struct Curl_easy *data,
#ifndef CURL_DISABLE_SOCKETPAIR
/* create socket pair or pipe */
if(wakeup_create(addr_ctx->sock_pair, FALSE) < 0) {
if(Curl_wakeup_init(addr_ctx->sock_pair, FALSE) < 0) {
addr_ctx->sock_pair[0] = CURL_SOCKET_BAD;
addr_ctx->sock_pair[1] = CURL_SOCKET_BAD;
goto err_exit;
@ -231,15 +226,11 @@ static CURL_THREAD_RETURN_T CURL_STDCALL getaddrinfo_thread(void *arg)
Curl_mutex_release(&addr_ctx->mutx);
#ifndef CURL_DISABLE_SOCKETPAIR
if(!do_abort) {
#ifdef USE_EVENTFD
const uint64_t buf[1] = { 1 };
#else
const char buf[1] = { 1 };
#endif
/* Thread is done, notify transfer */
if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) {
int err = Curl_wakeup_signal(addr_ctx->sock_pair);
if(err) {
/* update sock_error to errno */
addr_ctx->sock_error = SOCKERRNO;
addr_ctx->sock_error = err;
}
}
#endif
@ -276,15 +267,10 @@ static CURL_THREAD_RETURN_T CURL_STDCALL gethostbyname_thread(void *arg)
Curl_mutex_release(&addr_ctx->mutx);
#ifndef CURL_DISABLE_SOCKETPAIR
if(!do_abort) {
#ifdef USE_EVENTFD
const uint64_t buf[1] = { 1 };
#else
const char buf[1] = { 1 };
#endif
/* Thread is done, notify transfer */
if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) {
int err = Curl_wakeup_signal(addr_ctx->sock_pair);
if(err) {
/* update sock_error to errno */
addr_ctx->sock_error = SOCKERRNO;
addr_ctx->sock_error = err;
}
}
#endif

View File

@ -294,7 +294,7 @@ struct Curl_multi *Curl_multi_handle(uint32_t xfer_table_size,
if(multi->wsa_event == WSA_INVALID_EVENT)
goto error;
#elif defined(ENABLE_WAKEUP)
if(wakeup_create(multi->wakeup_pair, TRUE) < 0) {
if(Curl_wakeup_init(multi->wakeup_pair, TRUE) < 0) {
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
}
@ -1533,20 +1533,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) {
char buf[64];
ssize_t nread;
while(1) {
/* the reading socket is non-blocking, try to read
data from it until it receives an error (except EINTR).
In normal cases it will get EAGAIN or EWOULDBLOCK
when there is no more data, breaking the loop. */
nread = wakeup_read(multi->wakeup_pair[0], buf, sizeof(buf));
if(nread <= 0) {
if(nread < 0 && SOCKEINTR == SOCKERRNO)
continue;
break;
}
}
(void)Curl_wakeup_consume(multi->wakeup_pair, TRUE);
/* do not count the wakeup socket into the returned value */
retcode--;
}
@ -1623,38 +1610,9 @@ CURLMcode curl_multi_wakeup(CURLM *m)
making it safe to access from another thread after the init part
and before cleanup */
if(multi->wakeup_pair[1] != CURL_SOCKET_BAD) {
while(1) {
#ifdef USE_EVENTFD
/* eventfd has a stringent rule of requiring the 8-byte buffer when
calling write(2) on it */
const uint64_t buf[1] = { 1 };
#else
const char buf[1] = { 1 };
#endif
/* swrite() is not thread-safe in general, because concurrent calls
can have their messages interleaved, but in this case the content
of the messages does not matter, which makes it ok to call.
The write socket is set to non-blocking, this way this function
cannot block, making it safe to call even from the same thread
that will call curl_multi_wait(). If swrite() returns that it
would block, it is considered successful because it means that
previous calls to this function will wake up the poll(). */
if(wakeup_write(multi->wakeup_pair[1], buf, sizeof(buf)) < 0) {
int err = SOCKERRNO;
int return_success;
#ifdef USE_WINSOCK
return_success = SOCKEWOULDBLOCK == err;
#else
if(SOCKEINTR == err)
continue;
return_success = SOCKEWOULDBLOCK == err || EAGAIN == err;
#endif
if(!return_success)
return CURLM_WAKEUP_FAILURE;
}
return CURLM_OK;
}
if(Curl_wakeup_signal(multi->wakeup_pair))
return CURLM_WAKEUP_FAILURE;
return CURLM_OK;
}
#endif
#endif
@ -2929,10 +2887,7 @@ CURLMcode curl_multi_cleanup(CURLM *m)
WSACloseEvent(multi->wsa_event);
#else
#ifdef ENABLE_WAKEUP
wakeup_close(multi->wakeup_pair[0]);
#ifndef USE_EVENTFD
wakeup_close(multi->wakeup_pair[1]);
#endif
Curl_wakeup_destroy(multi->wakeup_pair);
#endif
#endif

View File

@ -28,11 +28,14 @@
#include "rand.h"
#include "curlx/nonblock.h"
#ifndef CURL_DISABLE_SOCKETPAIR
/* choose implementation */
#ifdef USE_EVENTFD
#include <sys/eventfd.h>
int Curl_eventfd(curl_socket_t socks[2], bool nonblocking)
static int wakeup_eventfd(curl_socket_t socks[2], bool nonblocking)
{
int efd = eventfd(0, nonblocking ? EFD_CLOEXEC | EFD_NONBLOCK : EFD_CLOEXEC);
if(efd == -1) {
@ -49,7 +52,7 @@ int Curl_eventfd(curl_socket_t socks[2], bool nonblocking)
#include <fcntl.h>
#endif
int Curl_pipe(curl_socket_t socks[2], bool nonblocking)
static int wakeup_pipe(curl_socket_t socks[2], bool nonblocking)
{
#ifdef HAVE_PIPE2
int flags = nonblocking ? O_NONBLOCK | O_CLOEXEC : O_CLOEXEC;
@ -81,18 +84,24 @@ int Curl_pipe(curl_socket_t socks[2], bool nonblocking)
return 0;
}
#endif /* USE_EVENTFD */
#elif defined(HAVE_SOCKETPAIR) /* !USE_EVENTFD && !HAVE_PIPE */
#ifndef CURL_DISABLE_SOCKETPAIR
#ifdef HAVE_SOCKETPAIR
#ifdef USE_SOCKETPAIR
int Curl_socketpair(int domain, int type, int protocol,
curl_socket_t socks[2], bool nonblocking)
{
#ifdef SOCK_NONBLOCK
type = nonblocking ? type | SOCK_NONBLOCK : type;
#ifndef USE_UNIX_SOCKETS
#error "unsupported Unix domain and socketpair build combo"
#endif
if(CURL_SOCKETPAIR(domain, type, protocol, socks))
static int wakeup_socketpair(curl_socket_t socks[2], bool nonblocking)
{
int type = SOCK_STREAM;
#ifdef SOCK_CLOEXEC
type |= SOCK_CLOEXEC;
#endif
#ifdef SOCK_NONBLOCK
if(nonblocking)
type |= SOCK_NONBLOCK;
#endif
if(CURL_SOCKETPAIR(AF_UNIX, type, 0, socks))
return -1;
#ifndef SOCK_NONBLOCK
if(nonblocking) {
@ -106,8 +115,8 @@ int Curl_socketpair(int domain, int type, int protocol,
#endif
return 0;
}
#endif /* USE_SOCKETPAIR */
#else /* !HAVE_SOCKETPAIR */
#else /* !USE_EVENTFD && !HAVE_PIPE && !HAVE_SOCKETPAIR */
#ifdef HAVE_NETDB_H
#include <netdb.h>
@ -125,8 +134,7 @@ int Curl_socketpair(int domain, int type, int protocol,
#include "select.h" /* for Curl_poll */
int Curl_socketpair(int domain, int type, int protocol,
curl_socket_t socks[2], bool nonblocking)
static int wakeup_inet(curl_socket_t socks[2], bool nonblocking)
{
union {
struct sockaddr_in inaddr;
@ -136,9 +144,6 @@ int Curl_socketpair(int domain, int type, int protocol,
curl_socklen_t addrlen = sizeof(a.inaddr);
int reuse = 1;
struct pollfd pfd[1];
(void)domain;
(void)type;
(void)protocol;
listener = CURL_SOCKET(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(listener == CURL_SOCKET_BAD)
@ -257,5 +262,99 @@ error:
sclose(socks[1]);
return -1;
}
#endif /* choose implementation */
int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking)
{
#ifdef USE_EVENTFD
return wakeup_eventfd(socks, nonblocking);
#elif defined(HAVE_PIPE)
return wakeup_pipe(socks, nonblocking);
#elif defined(HAVE_SOCKETPAIR)
return wakeup_socketpair(socks, nonblocking);
#else
return wakeup_inet(socks, nonblocking);
#endif
}
#if defined(USE_EVENTFD) || defined(HAVE_PIPE)
#define wakeup_write write
#define wakeup_read read
#define wakeup_close close
#else /* !USE_EVENTFD && !HAVE_PIPE */
#define wakeup_write swrite
#define wakeup_read sread
#define wakeup_close sclose
#endif
int Curl_wakeup_signal(curl_socket_t socks[2])
{
int err = 0;
#ifdef USE_EVENTFD
const uint64_t buf[1] = { 1 };
#else
const char buf[1] = { 1 };
#endif
while(1) {
if(wakeup_write(socks[1], buf, sizeof(buf)) < 0) {
err = SOCKERRNO;
#ifdef USE_WINSOCK
if(err == SOCKEWOULDBLOCK)
err = 0; /* wakeup is already ongoing */
#else
if(SOCKEINTR == err)
continue;
if((err == SOCKEWOULDBLOCK) || (err == EAGAIN))
err = 0; /* wakeup is already ongoing */
#endif
}
break;
}
return err;
}
CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all)
{
char buf[64];
ssize_t rc;
CURLcode result = CURLE_OK;
do {
rc = wakeup_read(socks[0], buf, sizeof(buf));
if(!rc)
break;
else if(rc < 0) {
#ifdef USE_WINSOCK
if(SOCKERRNO == SOCKEWOULDBLOCK)
break;
#else
if(SOCKEINTR == SOCKERRNO)
continue;
if((SOCKERRNO == SOCKEWOULDBLOCK) || (SOCKERRNO == EAGAIN))
break;
#endif
result = CURLE_READ_ERROR;
break;
}
} while(all);
return result;
}
void Curl_wakeup_destroy(curl_socket_t socks[2])
{
#ifndef USE_EVENTFD
if(socks[1] != CURL_SOCKET_BAD)
wakeup_close(socks[1]);
#endif
if(socks[0] != CURL_SOCKET_BAD)
wakeup_close(socks[0]);
socks[0] = socks[1] = CURL_SOCKET_BAD;
}
#endif /* !CURL_DISABLE_SOCKETPAIR */

View File

@ -25,53 +25,19 @@
***************************************************************************/
#include "curl_setup.h"
#ifdef USE_EVENTFD
#define wakeup_write write
#define wakeup_read read
#define wakeup_close close
#define wakeup_create(p, nb) Curl_eventfd(p, nb)
int Curl_eventfd(curl_socket_t socks[2], bool nonblocking);
#elif defined(HAVE_PIPE)
#define wakeup_write write
#define wakeup_read read
#define wakeup_close close
#define wakeup_create(p, nb) Curl_pipe(p, nb)
int Curl_pipe(curl_socket_t socks[2], bool nonblocking);
#else /* !USE_EVENTFD && !HAVE_PIPE */
#define wakeup_write swrite
#define wakeup_read sread
#define wakeup_close sclose
#if defined(USE_UNIX_SOCKETS) && defined(HAVE_SOCKETPAIR)
#define SOCKETPAIR_FAMILY AF_UNIX
#elif !defined(HAVE_SOCKETPAIR)
#define SOCKETPAIR_FAMILY 0 /* not used */
#else
#error "unsupported Unix domain and socketpair build combo"
#endif
#ifdef SOCK_CLOEXEC
#define SOCKETPAIR_TYPE (SOCK_STREAM | SOCK_CLOEXEC)
#else
#define SOCKETPAIR_TYPE SOCK_STREAM
#endif
#define USE_SOCKETPAIR
#define wakeup_create(p, nb) \
Curl_socketpair(SOCKETPAIR_FAMILY, SOCKETPAIR_TYPE, 0, p, nb)
#endif /* USE_EVENTFD */
#ifndef CURL_DISABLE_SOCKETPAIR
int Curl_socketpair(int domain, int type, int protocol,
curl_socket_t socks[2], bool nonblocking);
/* return < 0 for failure to initialise */
int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking);
void Curl_wakeup_destroy(curl_socket_t socks[2]);
/* return 0 on success or errno on failure */
int Curl_wakeup_signal(curl_socket_t socks[2]);
CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all);
#else
#define Curl_wakeup_destroy(x) Curl_nop_stmt
#endif
#endif /* HEADER_CURL_SOCKETPAIR_H */