xfer: manage pause bits

Concentrate the handling of KEEP_RECV_PAUSE and KEEP_SEND_PAUSE into
common transfer functions. Setting or clearing these bits requires
subsequent actions involving connection events and client reader/writer
notifications. Have it in one place.

Closes #17650
This commit is contained in:
Stefan Eissing 2025-06-17 13:13:26 +02:00 committed by Daniel Stenberg
parent e60103cbb8
commit 0d70dfb79b
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
9 changed files with 114 additions and 76 deletions

View File

@ -31,6 +31,7 @@
#include "headers.h"
#include "multiif.h"
#include "sendf.h"
#include "transfer.h"
#include "cw-out.h"
#include "cw-pause.h"
@ -234,11 +235,9 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
failf(data, "Write callback asked for PAUSE when not supported");
return CURLE_WRITE_ERROR;
}
/* mark the connection as RECV paused */
data->req.keepon |= KEEP_RECV_PAUSE;
ctx->paused = TRUE;
CURL_TRC_WRITE(data, "[OUT] PAUSE requested by client");
break;
return Curl_xfer_pause_recv(data, TRUE);
}
else if(CURL_WRITEFUNC_ERROR == nwritten) {
failf(data, "client returned ERROR on write of %zu bytes", wlen);

View File

@ -1129,13 +1129,12 @@ void curl_easy_reset(CURL *d)
*/
CURLcode curl_easy_pause(CURL *d, int action)
{
struct SingleRequest *k;
CURLcode result = CURLE_OK;
int oldstate;
int newstate;
CURLcode result = CURLE_OK, r2;
bool recursive = FALSE;
bool keep_changed, unpause_read, not_all_paused;
bool changed = FALSE;
struct Curl_easy *data = d;
bool recv_paused, recv_paused_new;
bool send_paused, send_paused_new;
if(!GOOD_EASY_HANDLE(data) || !data->conn)
/* crazy input, do not continue */
@ -1143,62 +1142,46 @@ CURLcode curl_easy_pause(CURL *d, int action)
if(Curl_is_in_callback(data))
recursive = TRUE;
k = &data->req;
oldstate = k->keepon & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE);
/* first switch off both pause bits then set the new pause bits */
newstate = (k->keepon &~ (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) |
((action & CURLPAUSE_RECV) ? KEEP_RECV_PAUSE : 0) |
((action & CURLPAUSE_SEND) ? KEEP_SEND_PAUSE : 0);
recv_paused = Curl_xfer_recv_is_paused(data);
recv_paused_new = (action & CURLPAUSE_RECV);
send_paused = Curl_xfer_send_is_paused(data);
send_paused_new = (action & CURLPAUSE_SEND);
keep_changed = ((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) != oldstate);
not_all_paused = (newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) !=
(KEEP_RECV_PAUSE|KEEP_SEND_PAUSE);
unpause_read = ((k->keepon & ~newstate & KEEP_SEND_PAUSE) &&
(data->mstate == MSTATE_PERFORMING ||
data->mstate == MSTATE_RATELIMITING));
/* Unpausing writes is detected on the next run in
* transfer.c:Curl_sendrecv(). This is because this may result
* in a transfer error if the application's callbacks fail */
if(send_paused != send_paused_new) {
changed = TRUE;
r2 = Curl_xfer_pause_send(data, send_paused_new);
if(r2)
result = r2;
}
/* Set the new keepon state, so it takes effect no matter what error
* may happen afterwards. */
k->keepon = newstate;
if(recv_paused != recv_paused_new) {
changed = TRUE;
r2 = Curl_xfer_pause_recv(data, recv_paused_new);
if(r2)
result = r2;
}
/* If not completely pausing both directions now, run again in any case. */
if(not_all_paused) {
if(!Curl_xfer_is_blocked(data)) {
Curl_expire(data, 0, EXPIRE_RUN_NOW);
/* reset the too-slow time keeper */
data->state.keeps_speed.tv_sec = 0;
/* Simulate socket events on next run for unpaused directions */
if(!(newstate & KEEP_SEND_PAUSE))
if(!send_paused_new)
data->state.select_bits |= CURL_CSELECT_OUT;
if(!(newstate & KEEP_RECV_PAUSE))
if(!recv_paused_new)
data->state.select_bits |= CURL_CSELECT_IN;
/* On changes, tell application to update its timers. */
if(keep_changed && data->multi) {
if(Curl_update_timer(data->multi)) {
if(changed && data->multi) {
if(Curl_update_timer(data->multi) && !result)
result = CURLE_ABORTED_BY_CALLBACK;
goto out;
}
}
}
if(unpause_read) {
result = Curl_creader_unpause(data);
if(result)
goto out;
}
if(!(k->keepon & KEEP_RECV_PAUSE) && Curl_cwriter_is_paused(data)) {
Curl_conn_ev_data_pause(data, FALSE);
result = Curl_cwriter_unpause(data);
}
out:
if(!result && !data->state.done && keep_changed && data->multi)
if(!result && changed && !data->state.done && data->multi)
/* pause/unpausing may result in multi event changes */
if(Curl_multi_ev_assess_xfer(data->multi, data))
if(Curl_multi_ev_assess_xfer(data->multi, data) && !result)
result = CURLE_ABORTED_BY_CALLBACK;
if(recursive)

View File

@ -32,6 +32,7 @@ struct Curl_easy;
#include "curlx/warnless.h"
#include "urldata.h"
#include "sendf.h"
#include "transfer.h"
#include "strdup.h"
#include "curlx/base64.h"
@ -1962,6 +1963,7 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
size_t *pnread, bool *peos)
{
struct cr_mime_ctx *ctx = reader->ctx;
CURLcode result = CURLE_OK;
size_t nread;
char tmp[256];
@ -1990,7 +1992,6 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
}
if(!Curl_bufq_is_empty(&ctx->tmpbuf)) {
CURLcode result = CURLE_OK;
ssize_t n = Curl_bufq_read(&ctx->tmpbuf, (unsigned char *)buf, blen,
&result);
if(n < 0) {
@ -2008,7 +2009,6 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
CURL_TRC_READ(data, "cr_mime_read(len=%zu), small read, using tmp", blen);
nread = Curl_mime_read(tmp, 1, sizeof(tmp), ctx->part);
if(nread <= sizeof(tmp)) {
CURLcode result = CURLE_OK;
ssize_t n = Curl_bufq_write(&ctx->tmpbuf, (unsigned char *)tmp, nread,
&result);
if(n < 0) {
@ -2051,14 +2051,15 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
*peos = FALSE;
ctx->errored = TRUE;
ctx->error_result = CURLE_ABORTED_BY_CALLBACK;
return CURLE_ABORTED_BY_CALLBACK;
result = CURLE_ABORTED_BY_CALLBACK;
break;
case CURL_READFUNC_PAUSE:
/* CURL_READFUNC_PAUSE pauses read callbacks that feed socket writes */
CURL_TRC_READ(data, "cr_mime_read(len=%zu), paused by callback", blen);
data->req.keepon |= KEEP_SEND_PAUSE; /* mark socket send as paused */
*pnread = 0;
*peos = FALSE;
result = Curl_xfer_pause_send(data, TRUE);
break; /* nothing was read */
case STOP_FILLING:
@ -2068,7 +2069,8 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
*peos = FALSE;
ctx->errored = TRUE;
ctx->error_result = CURLE_READ_ERROR;
return CURLE_READ_ERROR;
result = CURLE_READ_ERROR;
break;
default:
if(nread > blen) {
@ -2090,8 +2092,8 @@ static CURLcode cr_mime_read(struct Curl_easy *data,
CURL_TRC_READ(data, "cr_mime_read(len=%zu, total=%" FMT_OFF_T
", read=%"FMT_OFF_T") -> %d, %zu, %d",
blen, ctx->total_len, ctx->read_len, CURLE_OK, *pnread, *peos);
return CURLE_OK;
blen, ctx->total_len, ctx->read_len, result, *pnread, *peos);
return result;
}
static bool cr_mime_needs_rewind(struct Curl_easy *data,

View File

@ -1056,7 +1056,8 @@ void Curl_multi_getsock(struct Curl_easy *data,
case 0:
CURL_TRC_M(data, "%s pollset[], timeouts=%zu, paused %d/%d (r/w)",
caller, Curl_llist_count(&data->state.timeoutlist),
Curl_creader_is_paused(data), Curl_cwriter_is_paused(data));
Curl_xfer_send_is_paused(data),
Curl_xfer_recv_is_paused(data));
break;
case 1:
CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",

View File

@ -435,7 +435,7 @@ CURLcode Curl_req_send_more(struct Curl_easy *data)
/* Fill our send buffer if more from client can be read. */
if(!data->req.upload_aborted &&
!data->req.eos_read &&
!(data->req.keepon & KEEP_SEND_PAUSE) &&
!Curl_xfer_send_is_paused(data) &&
!Curl_bufq_is_full(&data->req.sendbuf)) {
ssize_t nread = Curl_bufq_sipn(&data->req.sendbuf, 0,
add_from_client, data, &result);

View File

@ -38,6 +38,7 @@
#include "urldata.h"
#include "sendf.h"
#include "transfer.h"
#include "cfilters.h"
#include "connect.h"
#include "content_encoding.h"
@ -660,6 +661,7 @@ static CURLcode cr_in_read(struct Curl_easy *data,
size_t *pnread, bool *peos)
{
struct cr_in_ctx *ctx = reader->ctx;
CURLcode result = CURLE_OK;
size_t nread;
ctx->is_paused = FALSE;
@ -697,7 +699,8 @@ static CURLcode cr_in_read(struct Curl_easy *data,
failf(data, "client read function EOF fail, "
"only %"FMT_OFF_T"/%"FMT_OFF_T " of needed bytes read",
ctx->read_len, ctx->total_len);
return CURLE_READ_ERROR;
result = CURLE_READ_ERROR;
break;
}
*pnread = 0;
*peos = TRUE;
@ -710,7 +713,8 @@ static CURLcode cr_in_read(struct Curl_easy *data,
*peos = FALSE;
ctx->errored = TRUE;
ctx->error_result = CURLE_ABORTED_BY_CALLBACK;
return CURLE_ABORTED_BY_CALLBACK;
result = CURLE_ABORTED_BY_CALLBACK;
break;
case CURL_READFUNC_PAUSE:
if(data->conn->handler->flags & PROTOPT_NONETWORK) {
@ -718,14 +722,15 @@ static CURLcode cr_in_read(struct Curl_easy *data,
actually only FILE:// just now, and it cannot pause since the transfer
is not done using the "normal" procedure. */
failf(data, "Read callback asked for PAUSE when not supported");
return CURLE_READ_ERROR;
result = CURLE_READ_ERROR;
break;
}
/* CURL_READFUNC_PAUSE pauses read callbacks that feed socket writes */
CURL_TRC_READ(data, "cr_in_read, callback returned CURL_READFUNC_PAUSE");
ctx->is_paused = TRUE;
data->req.keepon |= KEEP_SEND_PAUSE; /* mark socket send as paused */
*pnread = 0;
*peos = FALSE;
result = Curl_xfer_pause_send(data, TRUE);
break; /* nothing was read */
default:
@ -736,7 +741,8 @@ static CURLcode cr_in_read(struct Curl_easy *data,
*peos = FALSE;
ctx->errored = TRUE;
ctx->error_result = CURLE_READ_ERROR;
return CURLE_READ_ERROR;
result = CURLE_READ_ERROR;
break;
}
ctx->read_len += nread;
if(ctx->total_len >= 0)
@ -747,9 +753,9 @@ static CURLcode cr_in_read(struct Curl_easy *data,
}
CURL_TRC_READ(data, "cr_in_read(len=%zu, total=%"FMT_OFF_T
", read=%"FMT_OFF_T") -> %d, nread=%zu, eos=%d",
blen, ctx->total_len, ctx->read_len, CURLE_OK,
blen, ctx->total_len, ctx->read_len, result,
*pnread, *peos);
return CURLE_OK;
return result;
}
static bool cr_in_needs_rewind(struct Curl_easy *data,

View File

@ -27,6 +27,7 @@
#include <curl/curl.h>
#include "urldata.h"
#include "sendf.h"
#include "transfer.h"
#include "multiif.h"
#include "speedcheck.h"
@ -41,7 +42,7 @@ void Curl_speedinit(struct Curl_easy *data)
CURLcode Curl_speedcheck(struct Curl_easy *data,
struct curltime now)
{
if(data->req.keepon & KEEP_RECV_PAUSE)
if(Curl_xfer_recv_is_paused(data))
/* A paused transfer is not qualified for speed checks */
return CURLE_OK;

View File

@ -364,6 +364,8 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
data->state.select_bits = CURL_CSELECT_IN;
if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
data->state.select_bits |= CURL_CSELECT_OUT;
if(!Curl_xfer_is_blocked(data))
Curl_expire(data, 0, EXPIRE_RUN_NOW);
CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, "
"set select_bits=%x", data->state.select_bits);
}
@ -410,13 +412,11 @@ static int select_bits_paused(struct Curl_easy *data, int select_bits)
* NOTE: we are only interested in PAUSE, not HOLD. */
/* if there is data in a direction not paused, return false */
if(((select_bits & CURL_CSELECT_IN) &&
!(data->req.keepon & KEEP_RECV_PAUSE)) ||
((select_bits & CURL_CSELECT_OUT) &&
!(data->req.keepon & KEEP_SEND_PAUSE)))
if(((select_bits & CURL_CSELECT_IN) && !Curl_xfer_recv_is_paused(data)) ||
((select_bits & CURL_CSELECT_OUT) && !Curl_xfer_send_is_paused(data)))
return FALSE;
return (data->req.keepon & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE));
return Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data);
}
/*
@ -979,9 +979,48 @@ bool Curl_xfer_is_blocked(struct Curl_easy *data)
bool want_send = ((data)->req.keepon & KEEP_SEND);
bool want_recv = ((data)->req.keepon & KEEP_RECV);
if(!want_send)
return want_recv && Curl_cwriter_is_paused(data);
return want_recv && Curl_xfer_recv_is_paused(data);
else if(!want_recv)
return want_send && Curl_creader_is_paused(data);
return want_send && Curl_xfer_send_is_paused(data);
else
return Curl_creader_is_paused(data) && Curl_cwriter_is_paused(data);
return Curl_xfer_recv_is_paused(data) && Curl_xfer_send_is_paused(data);
}
bool Curl_xfer_send_is_paused(struct Curl_easy *data)
{
return (data->req.keepon & KEEP_SEND_PAUSE);
}
bool Curl_xfer_recv_is_paused(struct Curl_easy *data)
{
return (data->req.keepon & KEEP_RECV_PAUSE);
}
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable)
{
CURLcode result = CURLE_OK;
if(enable) {
data->req.keepon |= KEEP_SEND_PAUSE;
}
else {
data->req.keepon &= ~KEEP_SEND_PAUSE;
if(Curl_creader_is_paused(data))
result = Curl_creader_unpause(data);
}
return result;
}
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable)
{
CURLcode result = CURLE_OK;
if(enable) {
data->req.keepon |= KEEP_RECV_PAUSE;
}
else {
data->req.keepon &= ~KEEP_RECV_PAUSE;
if(Curl_cwriter_is_paused(data))
result = Curl_cwriter_unpause(data);
}
Curl_conn_ev_data_pause(data, enable);
return result;
}

View File

@ -137,11 +137,18 @@ CURLcode Curl_xfer_recv(struct Curl_easy *data,
CURLcode Curl_xfer_send_close(struct Curl_easy *data);
CURLcode Curl_xfer_send_shutdown(struct Curl_easy *data, bool *done);
/**
* Return TRUE iff the transfer is not done, but further progress
/* Return TRUE if the transfer is not done, but further progress
* is blocked. For example when it is only receiving and its writer
* is PAUSED.
*/
* is PAUSED. */
bool Curl_xfer_is_blocked(struct Curl_easy *data);
/* Query if send/recv for transfer is paused. */
bool Curl_xfer_send_is_paused(struct Curl_easy *data);
bool Curl_xfer_recv_is_paused(struct Curl_easy *data);
/* Enable/Disable pausing of send/recv for the transfer. */
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable);
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable);
#endif /* HEADER_CURL_TRANSFER_H */