multi: add dirty bitset

Add a bitset `dirty` to the multi handle. The presence of a transfer int
he "dirty" set means: this transfer has something to do ASAP.

"dirty" is set by multiplexing protocols like HTTP/2 and 3 when
encountering response data for another transfer than the current one.
"dirty" is set by protocols that want to be called.

Implementation:

* just an additional `uint_bset` in the multi handle
* `Curl_multi_mark_dirty()` to add a transfer to the dirty set.
* `multi_runsingle()` clears the dirty bit of the transfer at
   start. Without new dirty marks, this empties the set after
   al dirty transfers have been run.
* `multi_timeout()` immediately gives the current time and
   timeout_ms == 0 when dirty transfers are present.
* multi_event: marks all transfers tracked for a socket as dirty.
  Then marks all expired transfers as dirty. Then it runs
  all dirty transfers.

With this mechanism:

* Most uses of `EXPIRE_RUN_NOW` are replaced by `Curl_multi_mark_dirty()`
* `Curl_multi_mark_dirty()` is cheaper than querying if a transfer is
  already dirty or set for timeout. There is no need to check, just do it.
* `data->state.select_bits` is eliminated. We need no longer to
  simulate a poll event to make a transfer run.

Closes #17662
This commit is contained in:
Stefan Eissing 2025-06-18 12:34:43 +02:00 committed by Daniel Stenberg
parent 7aa8d1eea1
commit 779937f840
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
20 changed files with 162 additions and 285 deletions

View File

@ -218,19 +218,10 @@ static void drain_tunnel(struct Curl_cfilter *cf,
struct tunnel_stream *tunnel)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
if(!tunnel->closed && !tunnel->reset &&
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf))
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
tunnel->stream_id, bits);
data->state.select_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
Curl_multi_mark_dirty(data);
}
static ssize_t proxy_nw_in_reader(void *reader_ctx,

View File

@ -684,7 +684,7 @@ evaluate:
/* next attempt was started */
CURL_TRC_CF(data, cf, "%s trying next", baller->name);
++ongoing;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data);
}
}
}

View File

@ -257,7 +257,7 @@ static void doh_probe_done(struct Curl_easy *data,
if(!dohp->pending) {
/* DoH completed, run the transfer picking up the results */
Curl_expire(data, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data);
}
}
}

View File

@ -1167,11 +1167,6 @@ CURLcode curl_easy_pause(CURL *d, int action)
Curl_expire(data, 0, EXPIRE_RUN_NOW);
/* reset the too-slow time keeper */
data->state.keeps_speed.tv_sec = 0;
/* Simulate socket events on next run for unpaused directions */
if(!send_paused_new)
data->state.select_bits |= CURL_CSELECT_OUT;
if(!recv_paused_new)
data->state.select_bits |= CURL_CSELECT_IN;
/* On changes, tell application to update its timers. */
if(changed && data->multi) {
if(Curl_update_timer(data->multi) && !result)

View File

@ -377,27 +377,6 @@ static CURLcode cf_h2_update_local_win(struct Curl_cfilter *cf,
}
#endif /* !NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE */
/*
* Mark this transfer to get "drained".
*/
static void drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct h2_stream_ctx *stream)
{
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
if(!stream->closed &&
(!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf)))
bits |= CURL_CSELECT_OUT;
if(stream->closed || (data->state.select_bits != bits)) {
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
stream->id, bits);
data->state.select_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
static CURLcode http2_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data,
@ -1200,7 +1179,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
drain_stream(cf, data, stream);
Curl_multi_mark_dirty(data);
break;
case NGHTTP2_PUSH_PROMISE:
rv = push_promise(cf, data, &frame->push_promise);
@ -1223,12 +1202,12 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
if(frame->rst_stream.error_code) {
stream->reset = TRUE;
}
drain_stream(cf, data, stream);
Curl_multi_mark_dirty(data);
break;
case NGHTTP2_WINDOW_UPDATE:
if(CURL_WANT_SEND(data) && Curl_bufq_is_empty(&stream->sendbuf)) {
/* need more data, force processing of transfer */
drain_stream(cf, data, stream);
Curl_multi_mark_dirty(data);
}
else if(!Curl_bufq_is_empty(&stream->sendbuf)) {
/* resume the potentially suspended stream */
@ -1254,7 +1233,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
stream->id, NGHTTP2_STREAM_CLOSED);
stream->closed = TRUE;
}
drain_stream(cf, data, stream);
Curl_multi_mark_dirty(data);
}
return CURLE_OK;
}
@ -1403,11 +1382,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
* window and *assume* that we treat this like a WINDOW_UPDATE. Some
* servers send an explicit WINDOW_UPDATE, but not all seem to do that.
* To be safe, we UNHOLD a stream in order not to stall. */
if(CURL_WANT_SEND(data)) {
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
if(stream)
drain_stream(cf, data, stream);
}
if(CURL_WANT_SEND(data))
Curl_multi_mark_dirty(data);
}
break;
}
@ -1552,7 +1528,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
stream_id, nghttp2_http2_strerror(error_code), error_code);
else
CURL_TRC_CF(data_s, cf, "[%d] CLOSED", stream_id);
drain_stream(cf, data_s, stream);
Curl_multi_mark_dirty(data_s);
/* remove `data_s` from the nghttp2 stream */
rv = nghttp2_session_set_stream_user_data(session, stream_id, 0);
@ -1746,7 +1722,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
}
/* if we receive data for another handle, wake that up */
if(CF_DATA_CURRENT(cf) != data_s)
Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data_s);
CURL_TRC_CF(data_s, cf, "[%d] status: HTTP/2 %03d",
stream->id, stream->status_code);
@ -1773,7 +1749,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
}
/* if we receive data for another handle, wake that up */
if(CF_DATA_CURRENT(cf) != data_s)
Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data_s);
CURL_TRC_CF(data_s, cf, "[%d] header: %.*s: %.*s",
stream->id, (int)namelen, name, (int)valuelen, value);
@ -2106,7 +2082,7 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
* this may leave data in underlying buffers that will not
* be consumed. */
if(!cf->next || !cf->next->cft->has_data_pending(cf->next, data))
drain_stream(cf, data, stream);
Curl_multi_mark_dirty(data);
break;
}
@ -2184,7 +2160,7 @@ static CURLcode cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
nghttp2_session_consume(ctx->h2, stream->id, *pnread);
if(stream->closed) {
CURL_TRC_CF(data, cf, "[%d] DRAIN closed stream", stream->id);
drain_stream(cf, data, stream);
Curl_multi_mark_dirty(data);
}
}
@ -2195,7 +2171,7 @@ out:
* monitor the socket for POLLOUT, but when not SENDING
* any more, we force processing of the transfer. */
if(!CURL_WANT_SEND(data))
drain_stream(cf, data, stream);
Curl_multi_mark_dirty(data);
}
else if(r2) {
result = r2;
@ -2712,8 +2688,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
* not. We may have already buffered and exhausted the new window
* by operating on things in flight during the handling of other
* transfers. */
drain_stream(cf, data, stream);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data);
}
CURL_TRC_CF(data, cf, "[%d] stream now %spaused", stream->id,
pause ? "" : "un");

View File

@ -1347,9 +1347,6 @@ static CURLcode imap_state_fetch_resp(struct Curl_easy *data,
else {
/* IMAP download */
data->req.maxdownload = size;
/* force a recv/send check of this connection, as the data might've been
read off the socket already */
data->state.select_bits = CURL_CSELECT_IN;
Curl_xfer_setup1(data, CURL_XFER_RECV, size, FALSE);
}
}

View File

@ -235,6 +235,7 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size,
Curl_multi_ev_init(multi, ev_hashsize);
Curl_uint_tbl_init(&multi->xfers, NULL);
Curl_uint_bset_init(&multi->process);
Curl_uint_bset_init(&multi->dirty);
Curl_uint_bset_init(&multi->pending);
Curl_uint_bset_init(&multi->msgsent);
Curl_hash_init(&multi->proto_hash, 23,
@ -247,6 +248,7 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size,
if(Curl_uint_bset_resize(&multi->process, xfer_table_size) ||
Curl_uint_bset_resize(&multi->pending, xfer_table_size) ||
Curl_uint_bset_resize(&multi->dirty, xfer_table_size) ||
Curl_uint_bset_resize(&multi->msgsent, xfer_table_size) ||
Curl_uint_tbl_resize(&multi->xfers, xfer_table_size))
goto error;
@ -301,6 +303,7 @@ error:
}
Curl_uint_bset_destroy(&multi->process);
Curl_uint_bset_destroy(&multi->dirty);
Curl_uint_bset_destroy(&multi->pending);
Curl_uint_bset_destroy(&multi->msgsent);
Curl_uint_tbl_destroy(&multi->xfers);
@ -355,6 +358,7 @@ static CURLMcode multi_xfers_add(struct Curl_multi *multi,
* to work properly when larger than the table, but not
* the other way around. */
if(Curl_uint_bset_resize(&multi->process, newsize) ||
Curl_uint_bset_resize(&multi->dirty, newsize) ||
Curl_uint_bset_resize(&multi->pending, newsize) ||
Curl_uint_bset_resize(&multi->msgsent, newsize) ||
Curl_uint_tbl_resize(&multi->xfers, newsize))
@ -401,6 +405,7 @@ CURLMcode curl_multi_add_handle(CURLM *m, CURL *d)
return CURLM_ABORTED_BY_CALLBACK;
multi->dead = FALSE;
Curl_uint_bset_clear(&multi->process);
Curl_uint_bset_clear(&multi->dirty);
Curl_uint_bset_clear(&multi->pending);
Curl_uint_bset_clear(&multi->msgsent);
}
@ -795,6 +800,7 @@ CURLMcode curl_multi_remove_handle(CURLM *m, CURL *d)
DEBUGASSERT(Curl_uint_tbl_contains(&multi->xfers, mid));
Curl_uint_tbl_remove(&multi->xfers, mid);
Curl_uint_bset_remove(&multi->process, mid);
Curl_uint_bset_remove(&multi->dirty, mid);
Curl_uint_bset_remove(&multi->pending, mid);
Curl_uint_bset_remove(&multi->msgsent, mid);
data->multi = NULL;
@ -1048,8 +1054,8 @@ void Curl_multi_getsock(struct Curl_easy *data,
(Curl_pollset_want_read(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
CURL_TRC_M(data, "%s pollset[] has POLLIN, but there is still "
"buffered input to consume -> EXPIRE_RUN_NOW", caller);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
"buffered input to consume -> mark as dirty", caller);
Curl_multi_mark_dirty(data);
}
switch(ps->num) {
@ -1965,14 +1971,6 @@ static CURLMcode state_performing(struct Curl_easy *data,
}
}
}
else if(data->state.select_bits && !Curl_xfer_is_blocked(data)) {
/* This avoids CURLM_CALL_MULTI_PERFORM so that a very fast transfer does
not get stuck on this transfer at the expense of other concurrent
transfers */
CURL_TRC_M(data, "EXPIRE_RUN_NOW unblocked, select_bits=%x",
data->state.select_bits);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
free(newurl);
*resultp = result;
return rc;
@ -2297,6 +2295,10 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
multi_warn_debug(multi, data);
/* transfer runs now, clear the dirty bit. This may be set
* again during processing, triggering a re-run later. */
Curl_uint_bset_remove(&multi->dirty, data->mid);
do {
/* A "stream" here is a logical stream if the protocol can handle that
(HTTP/2), or the full connection for older protocols */
@ -2840,6 +2842,7 @@ CURLMcode curl_multi_cleanup(CURLM *m)
}
#endif
Curl_uint_bset_destroy(&multi->process);
Curl_uint_bset_destroy(&multi->dirty);
Curl_uint_bset_destroy(&multi->pending);
Curl_uint_bset_destroy(&multi->msgsent);
Curl_uint_tbl_destroy(&multi->xfers);
@ -2963,12 +2966,11 @@ struct multi_run_ctx {
bool run_cpool;
};
static CURLMcode multi_run_expired(struct multi_run_ctx *mrc)
static void multi_mark_expired_as_dirty(struct multi_run_ctx *mrc)
{
struct Curl_multi *multi = mrc->multi;
struct Curl_easy *data = NULL;
struct Curl_tree *t = NULL;
CURLMcode result = CURLM_OK;
/*
* The loop following here will go on as long as there are expire-times left
@ -2980,33 +2982,59 @@ static CURLMcode multi_run_expired(struct multi_run_ctx *mrc)
extracts a matching node if there is one */
multi->timetree = Curl_splaygetbest(mrc->now, multi->timetree, &t);
if(!t)
goto out;
return;
data = Curl_splayget(t); /* assign this for next loop */
if(!data)
continue;
(void)add_next_timeout(mrc->now, multi, data);
if(data == multi->admin) {
mrc->run_cpool = TRUE;
continue;
}
Curl_multi_mark_dirty(data);
}
}
mrc->run_xfers++;
sigpipe_apply(data, &mrc->pipe_st);
result = multi_runsingle(multi, &mrc->now, data);
static CURLMcode multi_run_dirty(struct multi_run_ctx *mrc)
{
struct Curl_multi *multi = mrc->multi;
CURLMcode result = CURLM_OK;
unsigned int mid;
if(CURLM_OK >= result) {
/* reassess event handling of data */
result = Curl_multi_ev_assess_xfer(multi, data);
if(result)
goto out;
if(Curl_uint_bset_first(&multi->dirty, &mid)) {
do {
struct Curl_easy *data = Curl_multi_get_easy(multi, mid);
if(data) {
CURL_TRC_M(data, "multi_run_dirty");
if(data == multi->admin) {
Curl_uint_bset_remove(&multi->dirty, mid);
mrc->run_cpool = TRUE;
continue;
}
mrc->run_xfers++;
sigpipe_apply(data, &mrc->pipe_st);
/* runsingle() clears the dirty mid */
result = multi_runsingle(multi, &mrc->now, data);
if(CURLM_OK >= result) {
/* reassess event handling of data */
result = Curl_multi_ev_assess_xfer(multi, data);
if(result)
goto out;
}
}
else {
CURL_TRC_M(multi->admin, "multi_run_dirty, %u no longer found", mid);
Curl_uint_bset_remove(&multi->dirty, mid);
}
}
while(Curl_uint_bset_next(&multi->dirty, mid, &mid));
}
out:
return result;
}
static CURLMcode multi_socket(struct Curl_multi *multi,
bool checkall,
curl_socket_t s,
@ -3035,7 +3063,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
}
if(s != CURL_SOCKET_TIMEOUT) {
Curl_multi_ev_expire_xfers(multi, s, &mrc.now, &mrc.run_cpool);
/* Mark all transfers of that socket as dirty */
Curl_multi_ev_dirty_xfers(multi, s, &mrc.run_cpool);
}
else {
/* Asked to run due to time-out. Clear the 'last_expire_ts' variable to
@ -3047,7 +3076,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
mrc.run_cpool = TRUE;
}
result = multi_run_expired(&mrc);
multi_mark_expired_as_dirty(&mrc);
result = multi_run_dirty(&mrc);
if(result)
goto out;
@ -3058,7 +3088,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
* Do that only once or it might be unfair to transfers on other
* sockets. */
mrc.now = curlx_now();
result = multi_run_expired(&mrc);
multi_mark_expired_as_dirty(&mrc);
result = multi_run_dirty(&mrc);
}
out:
@ -3186,6 +3217,26 @@ CURLMcode curl_multi_socket_all(CURLM *m, int *running_handles)
return multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles);
}
static bool multi_has_dirties(struct Curl_multi *multi)
{
unsigned int mid;
if(Curl_uint_bset_first(&multi->dirty, &mid)) {
do {
struct Curl_easy *data = Curl_multi_get_easy(multi, mid);
if(data) {
return TRUE;
}
else {
CURL_TRC_M(multi->admin, "dirty transfer %u no longer found", mid);
Curl_uint_bset_remove(&multi->dirty, mid);
}
}
while(Curl_uint_bset_next(&multi->dirty, mid, &mid));
}
return FALSE;
}
static CURLMcode multi_timeout(struct Curl_multi *multi,
struct curltime *expire_time,
long *timeout_ms)
@ -3197,7 +3248,12 @@ static CURLMcode multi_timeout(struct Curl_multi *multi,
return CURLM_OK;
}
if(multi->timetree) {
if(multi_has_dirties(multi)) {
*expire_time = curlx_now();
*timeout_ms = 0;
return CURLM_OK;
}
else if(multi->timetree) {
/* we have a tree of expire times */
struct curltime now = curlx_now();
@ -3791,6 +3847,12 @@ unsigned int Curl_multi_xfers_running(struct Curl_multi *multi)
return multi->xfers_alive;
}
void Curl_multi_mark_dirty(struct Curl_easy *data)
{
if(data->multi && data->mid != UINT_MAX)
Curl_uint_bset_add(&data->multi->dirty, data->mid);
}
#ifdef DEBUGBUILD
static void multi_xfer_dump(struct Curl_multi *multi, unsigned int mid,
void *entry)

View File

@ -563,10 +563,9 @@ CURLMcode Curl_multi_ev_assign(struct Curl_multi *multi,
return CURLM_OK;
}
void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
curl_socket_t s,
const struct curltime *nowp,
bool *run_cpool)
void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi,
curl_socket_t s,
bool *run_cpool)
{
struct mev_sh_entry *entry;
@ -586,9 +585,11 @@ void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
do {
data = Curl_multi_get_easy(multi, mid);
if(data) {
/* Expire with out current now, so we will get it below when
* asking the splaytree for expired transfers. */
Curl_expire_ex(data, nowp, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data);
}
else {
CURL_TRC_M(multi->admin, "socket transfer %u no longer found", mid);
Curl_uint_spbset_remove(&entry->xfers, mid);
}
}
while(Curl_uint_spbset_next(&entry->xfers, mid, &mid));

View File

@ -61,11 +61,10 @@ CURLMcode Curl_multi_ev_assess_conn(struct Curl_multi *multi,
struct Curl_easy *data,
struct connectdata *conn);
/* Expire all transfers tied to the given socket */
void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
curl_socket_t s,
const struct curltime *nowp,
bool *run_cpool);
/* Mark all transfers tied to the given socket as dirty */
void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi,
curl_socket_t s,
bool *run_cpool);
/* Socket will be closed, forget anything we know about it. */
void Curl_multi_ev_socket_done(struct Curl_multi *multi,

View File

@ -98,6 +98,7 @@ struct Curl_multi {
struct uint_tbl xfers; /* transfers added to this multi */
/* Each transfer's mid may be present in at most one of these */
struct uint_bset process; /* transfer being processed */
struct uint_bset dirty; /* transfer to be run NOW, e.g. ASAP. */
struct uint_bset pending; /* transfers in waiting (conn limit etc.) */
struct uint_bset msgsent; /* transfers done with message for application */

View File

@ -173,4 +173,8 @@ struct Curl_easy *Curl_multi_get_easy(struct Curl_multi *multi,
/* Get the # of transfers current in process/pending. */
unsigned int Curl_multi_xfers_running(struct Curl_multi *multi);
/* Mark a transfer as dirty, e.g. to be rerun at earliest convenience.
* A cheap operation, can be done many times repeatedly. */
void Curl_multi_mark_dirty(struct Curl_easy *data);
#endif /* HEADER_CURL_MULTIIF_H */

View File

@ -358,16 +358,12 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
} while(maxloops--);
if(!rcvd_eagain || data_pending(data, rcvd_eagain)) {
if(!Curl_xfer_is_blocked(data) &&
(!rcvd_eagain || data_pending(data, rcvd_eagain))) {
/* Did not read until EAGAIN or there is still data pending
* in buffers. Mark as read-again via simulated SELECT results. */
data->state.select_bits = CURL_CSELECT_IN;
if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
data->state.select_bits |= CURL_CSELECT_OUT;
if(!Curl_xfer_is_blocked(data))
Curl_expire(data, 0, EXPIRE_RUN_NOW);
CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, "
"set select_bits=%x", data->state.select_bits);
Curl_multi_mark_dirty(data);
CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, mark as dirty");
}
if(((k->keepon & (KEEP_RECV|KEEP_SEND)) == KEEP_SEND) &&
@ -403,22 +399,6 @@ static CURLcode sendrecv_ul(struct Curl_easy *data, int *didwhat)
return CURLE_OK;
}
static int select_bits_paused(struct Curl_easy *data, int select_bits)
{
/* See issue #11982: we really need to be careful not to progress
* a transfer direction when that direction is paused. Not all parts
* of our state machine are handling PAUSED transfers correctly. So, we
* do not want to go there.
* NOTE: we are only interested in PAUSE, not HOLD. */
/* if there is data in a direction not paused, return false */
if(((select_bits & CURL_CSELECT_IN) && !Curl_xfer_recv_is_paused(data)) ||
((select_bits & CURL_CSELECT_OUT) && !Curl_xfer_send_is_paused(data)))
return FALSE;
return Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data);
}
/*
* Curl_sendrecv() is the low-level function to be called when data is to
* be read and written to/from the connection.
@ -430,14 +410,9 @@ CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp)
int didwhat = 0;
DEBUGASSERT(nowp);
if(data->state.select_bits) {
if(select_bits_paused(data, data->state.select_bits)) {
/* leave the bits unchanged, so they'll tell us what to do when
* this transfer gets unpaused. */
result = CURLE_OK;
goto out;
}
data->state.select_bits = 0;
if(Curl_xfer_is_blocked(data)) {
result = CURLE_OK;
goto out;
}
/* We go ahead and do a read if we have a readable socket or if the stream

View File

@ -1155,9 +1155,6 @@ struct UrlState {
#endif
unsigned char httpreq; /* Curl_HttpReq; what kind of HTTP request (if any)
is this */
unsigned char select_bits; /* != 0 -> bitmask of socket events for this
transfer overriding anything the socket may
report */
unsigned int creds_from:2; /* where is the server credentials originating
from, see the CREDS_* defines above */

View File

@ -244,33 +244,17 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
static void drain_stream_from_other_thread(struct Curl_easy *data,
struct h3_stream_ctx *stream)
{
unsigned char bits;
/* risky */
bits = CURL_CSELECT_IN;
if(stream && !stream->upload_done)
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
data->state.select_bits = bits;
/* cannot expire from other thread */
}
(void)data;
(void)stream;
/* cannot expire from other thread.
here is the disconnect between msh3 and curl */
}
static void h3_drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
if(stream && !stream->upload_done)
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
data->state.select_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
Curl_multi_mark_dirty(data);
}
static const MSH3_CONNECTION_IF msh3_conn_if = {

View File

@ -342,23 +342,6 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
}
}
static void h3_drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
if(stream && stream->upload_left && !stream->send_closed)
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
data->state.select_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
/* ngtcp2 default congestion controller does not perform pacing. Limit
the maximum packet burst to MAX_PKT_BURST packets. */
#define MAX_PKT_BURST 10
@ -744,7 +727,7 @@ static int cb_extend_max_stream_data(ngtcp2_conn *tconn, int64_t stream_id,
CURL_TRC_CF(s_data, cf, "[%" FMT_PRId64 "] unblock quic flow",
(curl_int64_t)stream_id);
stream->quic_flow_blocked = FALSE;
h3_drain_stream(cf, s_data);
Curl_multi_mark_dirty(s_data);
}
return 0;
}
@ -971,7 +954,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t sid,
else {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->id);
}
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
return 0;
}
@ -1072,7 +1055,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid,
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
return 0;
}
@ -1984,10 +1967,9 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf,
{
/* There seems to exist no API in ngtcp2 to shrink/enlarge the streams
* windows. As we do in HTTP/2. */
if(!pause) {
h3_drain_stream(cf, data);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
(void)cf;
if(!pause)
Curl_multi_mark_dirty(data);
return CURLE_OK;
}

View File

@ -711,31 +711,14 @@ static struct cf_osslq_stream *cf_osslq_get_qstream(struct Curl_cfilter *cf,
return NULL;
}
static void h3_drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
if(stream && stream->upload_left && !stream->send_closed)
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
data->state.select_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
static CURLcode h3_data_pause(struct Curl_cfilter *cf,
struct Curl_easy *data,
bool pause)
{
(void)cf;
if(!pause) {
/* unpaused. make it run again right away */
h3_drain_stream(cf, data);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data);
}
return CURLE_OK;
}
@ -766,7 +749,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
else {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->s.id);
}
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
return 0;
}
@ -831,7 +814,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
stream->download_recvd += (curl_off_t)buflen;
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu, total=%" FMT_OFF_T,
stream->s.id, buflen, stream->download_recvd);
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
return 0;
}
@ -943,7 +926,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid,
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
return 0;
}
@ -1566,7 +1549,7 @@ static CURLcode cf_osslq_check_and_unblock(struct Curl_cfilter *cf,
if(stream) {
nghttp3_conn_unblock_stream(ctx->h3.conn, stream->s.id);
stream->s.send_blocked = FALSE;
h3_drain_stream(cf, ctx->curl_items[idx_count]);
Curl_multi_mark_dirty(ctx->curl_items[idx_count]);
CURL_TRC_CF(ctx->curl_items[idx_count], cf, "unblocked");
}
result_count--;
@ -2163,7 +2146,7 @@ static CURLcode cf_osslq_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(*pnread) {
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
}
else {
if(stream->closed) {

View File

@ -237,7 +237,7 @@ static bool cf_quiche_do_resume(struct Curl_cfilter *cf,
(void)user_data;
if(stream->quic_flow_blocked) {
stream->quic_flow_blocked = FALSE;
Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(sdata);
CURL_TRC_CF(sdata, cf, "[%"FMT_PRIu64"] unblock", stream->id);
}
return TRUE;
@ -250,8 +250,8 @@ static bool cf_quiche_do_expire(struct Curl_cfilter *cf,
{
(void)stream;
(void)user_data;
CURL_TRC_CF(sdata, cf, "conn closed, expire transfer");
Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
CURL_TRC_CF(sdata, cf, "conn closed, mark as dirty");
Curl_multi_mark_dirty(sdata);
return TRUE;
}
@ -307,23 +307,6 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
}
}
static void h3_drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
if(stream && !stream->send_closed)
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
data->state.select_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
@ -562,7 +545,7 @@ static CURLcode cf_quiche_ev_process(struct Curl_cfilter *cf,
quiche_h3_event *ev)
{
CURLcode result = h3_process_event(cf, data, stream, ev);
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
if(result)
CURL_TRC_CF(data, cf, "error processing event %s "
"for [%"FMT_PRIu64"] -> %d", cf_ev_name(ev),
@ -917,7 +900,7 @@ static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
if(*pnread) {
if(stream->closed)
h3_drain_stream(cf, data);
Curl_multi_mark_dirty(data);
}
else {
if(stream->closed) {
@ -1229,9 +1212,9 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf,
{
/* There seems to exist no API in quiche to shrink/enlarge the streams
* windows. As we do in HTTP/2. */
(void)cf;
if(!pause) {
h3_drain_stream(cf, data);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
Curl_multi_mark_dirty(data);
}
return CURLE_OK;
}

View File

@ -1256,15 +1256,9 @@ static int myssh_in_UPLOAD_INIT(struct Curl_easy *data,
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
/* we want to use the _sending_ function even when the socket turns
out readable as the underlying libssh sftp send function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_OUT;
/* since we do not really wait for anything at this point, we want the
state machine to move on as soon as possible so we set a very short
timeout here */
Curl_expire(data, 0, EXPIRE_RUN_NOW);
state machine to move on as soon as possible so we mark this as dirty */
Curl_multi_mark_dirty(data);
#if LIBSSH_VERSION_INT > SSH_VERSION_INT(0, 11, 0)
sshc->sftp_send_state = 0;
#endif
@ -1430,11 +1424,6 @@ static int myssh_in_SFTP_DOWNLOAD_STAT(struct Curl_easy *data,
/* not set by Curl_xfer_setup to preserve keepon bits */
data->conn->writesockfd = data->conn->sockfd;
/* we want to use the _receiving_ function even when the socket turns
out writableable as the underlying libssh recv function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_IN;
sshc->sftp_recv_state = 0;
myssh_to(data, sshc, SSH_STOP);
@ -2258,11 +2247,6 @@ static CURLcode myssh_statemach_act(struct Curl_easy *data,
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
/* we want to use the _sending_ function even when the socket turns
out readable as the underlying libssh scp send function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_OUT;
myssh_to(data, sshc, SSH_STOP);
break;
@ -2298,11 +2282,6 @@ static CURLcode myssh_statemach_act(struct Curl_easy *data,
/* not set by Curl_xfer_setup to preserve keepon bits */
conn->writesockfd = conn->sockfd;
/* we want to use the _receiving_ function even when the socket turns
out writableable as the underlying libssh recv function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_IN;
myssh_to(data, sshc, SSH_STOP);
break;
}

View File

@ -1209,15 +1209,9 @@ sftp_upload_init(struct Curl_easy *data,
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
/* we want to use the _sending_ function even when the socket turns
out readable as the underlying libssh2 sftp send function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_OUT;
/* since we do not really wait for anything at this point, we want the
state machine to move on as soon as possible so we set a very short
timeout here */
Curl_expire(data, 0, EXPIRE_RUN_NOW);
state machine to move on as soon as possible so mark this as dirty */
Curl_multi_mark_dirty(data);
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;
@ -1552,10 +1546,6 @@ sftp_download_stat(struct Curl_easy *data,
/* not set by Curl_xfer_setup to preserve keepon bits */
data->conn->writesockfd = data->conn->sockfd;
/* we want to use the _receiving_ function even when the socket turns
out writableable as the underlying libssh2 recv function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_IN;
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;
@ -2476,11 +2466,6 @@ static CURLcode ssh_state_scp_download_init(struct Curl_easy *data,
/* not set by Curl_xfer_setup to preserve keepon bits */
data->conn->writesockfd = data->conn->sockfd;
/* we want to use the _receiving_ function even when the socket turns
out writableable as the underlying libssh2 recv function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_IN;
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;
}
@ -2634,11 +2619,6 @@ static CURLcode ssh_state_scp_upload_init(struct Curl_easy *data,
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
/* we want to use the _sending_ function even when the socket turns
out readable as the underlying libssh2 scp send function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_OUT;
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;

View File

@ -732,15 +732,9 @@ static CURLcode wssh_statemach_act(struct Curl_easy *data,
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
/* we want to use the _sending_ function even when the socket turns
out readable as the underlying libssh2 sftp send function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_OUT;
/* since we do not really wait for anything at this point, we want the
state machine to move on as soon as possible so we set a very short
timeout here */
Curl_expire(data, 0, EXPIRE_RUN_NOW);
state machine to move on as soon as possible */
Curl_multi_mark_dirty(data);
wssh_state(data, sshc, SSH_STOP);
}
@ -828,11 +822,6 @@ static CURLcode wssh_statemach_act(struct Curl_easy *data,
/* not set by Curl_xfer_setup to preserve keepon bits */
conn->writesockfd = conn->sockfd;
/* we want to use the _receiving_ function even when the socket turns
out writableable as the underlying libssh2 recv function will deal
with both accordingly */
data->state.select_bits = CURL_CSELECT_IN;
if(result) {
/* this should never occur; the close state should be entered
at the time the error occurs */