ratelimit: redesign

Description of how this works in `docs/internal/RATELIMITS.ms`.

Notable implementation changes:
- KEEP_SEND_PAUSE/KEEP_SEND_HOLD and KEEP_RECV_PAUSE/KEEP_RECV_HOLD
  no longer exist. Pausing is down via blocked the new rlimits.
- KEEP_SEND_TIMED no longer exists. Pausing "100-continue" transfers
  is done in the new `Curl_http_perform_pollset()` method.
- HTTP/2 rate limiting implemented via window updates. When
  transfer initiaiting connection has a ratelimit, adjust the
  initial window size
- HTTP/3 ngtcp2 rate limitin implemnented via ack updates
- HTTP/3 quiche does not seem to support this via its API
- the default progress-meter has been improved for accuracy
  in "current speed" results.

pytest speed tests have been improved.

Closes #19384
This commit is contained in:
Stefan Eissing 2025-11-11 14:26:48 +01:00 committed by Daniel Stenberg
parent bfde781121
commit 24b36fdd15
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
48 changed files with 1146 additions and 675 deletions

View File

@ -63,6 +63,7 @@ INTERNALDOCS = \
internals/MULTI-EV.md \
internals/NEW-PROTOCOL.md \
internals/PORTING.md \
internals/RATELIMITS.md \
internals/README.md \
internals/SCORECARD.md \
internals/SPLAY.md \

View File

@ -0,0 +1,100 @@
<!--
Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
SPDX-License-Identifier: curl
-->
# Rate Limiting Transfers
Rate limiting a transfer means that no more than "n bytes per second"
shall be sent or received. It can be set individually for both directions
via `CURLOPT_MAX_RECV_SPEED_LARGE` and `CURLOPT_MAX_SEND_SPEED_LARGE`. These
options may be adjusted for an ongoing transfer.
### Implementation Base
`ratelimit.[ch]` implements `struct Curl_rlimit` and functions to manage
such limits. It has the following properties:
* `rate_per_sec`: how many "tokens" can be used per second, 0 for infinite.
* `tokens`: the currently available tokens to consume
* `burst_per_sec`: an upper limit on tokens available
* `ts`: the microsecond timestamp of the last tokens update
* `spare_us`: elapsed microseconds that have not counted yet for a token update
* `blocked`: if the limit is blocked
Tokens can be *drained* from an `rlimit`. This reduces `tokens`, even to
negative values. To enforce the limits, tokens should not be drained
further when they reach 0, but such things may happen.
An `rlimit`can be asked how long to wait until `tokens` are positive again.
This is given in milliseconds. When token are available, this wait
time is 0.
Ideally a user of `rlimit` would consume the available tokens to 0, then
get a wait times of 1000ms, after which the set rate of tokens has
regenerated. Rinse and repeat.
Should a user drain twice the amount of the rate, tokens are negative
and the wait time is 2 seconds. The `spare_us` account for the
time that has passed for the consumption. When a user takes 250ms to
consume the rate, the wait time is then 750ms.
When a user drains nothing for two seconds, the available tokens would
grow to twice the rate, unless a burst rate is set.
Finally, an `rlimit` may be set to `blocked` and later unblocked again.
A blocked `rlimit` has no tokens available. This works also when the rate
is unlimited (`rate_per_sec` set to 0).
### Downloads
`rlimit` is in `data->progress.dl.rlimit`. `setopt.c` initializes it whenever
the application sets `CURLOPT_MAX_RECV_SPEED_LARGE`. This may be done
in the middle of a transfer.
`rlimit` tokens are drained in the "protocol" client writer. Checks for
capacity depend on the protocol:
* HTTP and other plain protocols: `transfer.c:sendrecv_dl()` reads only
up to capacity.
* HTTP/2: capacity is used to adjust a stream's window size. Since all
streams start with `64kb`, `rlimit` takes a few seconds to take effect.
* HTTP/3: ngtcp2 acknowledges stream data according to capacity. It
keeps track of bytes not acknowledged yet. This has the same effect as HTTP/2
window sizes.
(The quiche API does not offer control of `ACK`s and `rlimits` for download
do not work in that backend.)
### Uploads
`rlimit` is in `data->progress.ul.rlimit`. `setopt.c` initializes it whenever
the application sets `CURLOPT_MAX_SEND_SPEED_LARGE`. This may be done
in the middle of a transfer.
The upload capacity is checked in `Curl_client_read()` and readers are
only asked to read bytes up to the `rlimit` capacity. This limits upload
of data for all protocols in the same way.
### Pause/Unpause
Pausing of up-/downloads sets the corresponding `rlimit` to blocked. Unpausing
removes that block.
### Suspending transfers
While obeying the `rlimit` for up-/download leads to the desired transfer
rates, the other issue that needs care is CPU consumption.
`rlimits` are inspected when computing the "pollset" of a transfer. When
a transfer wants to send, but not send tokens are available, the `POLLOUT`
is removed from the pollset. Same for receiving.
For a transfer that is, due to `rlimit`, not able to progress, the pollset
is then empty. No socket events are monitored, no CPU activity
happens. For paused transfers, this is sufficient.
Draining `rlimit` happens when a transfer is in `PERFORM` state and
exhausted limits cause the timer `TOOFAST` to be set. When the fires,
the transfer runs again and `rlimit`s are re-evaluated.

View File

@ -234,6 +234,7 @@ LIB_CFILES = \
progress.c \
psl.c \
rand.c \
ratelimit.c \
rename.c \
request.c \
rtsp.c \
@ -249,7 +250,6 @@ LIB_CFILES = \
socks.c \
socks_gssapi.c \
socks_sspi.c \
speedcheck.c \
splay.c \
strcase.c \
strdup.c \
@ -366,6 +366,7 @@ LIB_HFILES = \
progress.h \
psl.h \
rand.h \
ratelimit.h \
rename.h \
request.h \
rtsp.h \
@ -383,7 +384,6 @@ LIB_HFILES = \
sockaddr.h \
socketpair.h \
socks.h \
speedcheck.h \
splay.h \
strcase.h \
strdup.h \

View File

@ -384,8 +384,8 @@ static CURLcode recv_CONNECT_resp(struct Curl_cfilter *cf,
/* socket buffer drained, return */
return CURLE_OK;
if(Curl_pgrsUpdate(data))
return CURLE_ABORTED_BY_CALLBACK;
if(!result)
result = Curl_pgrsUpdate(data);
if(result) {
ts->keepon = KEEPON_DONE;
@ -565,10 +565,8 @@ static CURLcode H1_CONNECT(struct Curl_cfilter *cf,
/* read what is there */
CURL_TRC_CF(data, cf, "CONNECT receive");
result = recv_CONNECT_resp(cf, data, ts, &done);
if(Curl_pgrsUpdate(data)) {
result = CURLE_ABORTED_BY_CALLBACK;
goto out;
}
if(!result)
result = Curl_pgrsUpdate(data);
/* error or not complete yet. return for more multi-multi */
if(result || !done)
goto out;
@ -671,8 +669,7 @@ out:
/* The real request will follow the CONNECT, reset request partially */
Curl_req_soft_reset(&data->req, data);
Curl_client_reset(data);
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsReset(data);
tunnel_free(cf, data);
}

View File

@ -60,7 +60,6 @@
#include "sendf.h"
#include "escape.h"
#include "file.h"
#include "speedcheck.h"
#include "multiif.h"
#include "transfer.h"
#include "url.h"
@ -415,13 +414,10 @@ static CURLcode file_upload(struct Curl_easy *data,
Curl_pgrsSetUploadCounter(data, bytecount);
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
result = Curl_pgrsCheck(data);
}
if(!result && Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
if(!result)
result = Curl_pgrsUpdate(data);
out:
close(fd);
@ -620,10 +616,7 @@ static CURLcode file_do(struct Curl_easy *data, bool *done)
if(result)
goto out;
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
result = Curl_pgrsCheck(data);
if(result)
goto out;
}
@ -657,8 +650,8 @@ static CURLcode file_do(struct Curl_easy *data, bool *done)
#endif
}
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
if(!result)
result = Curl_pgrsUpdate(data);
out:
Curl_multi_xfer_buf_release(data, xfer_buf);

View File

@ -65,7 +65,6 @@
#include "sockaddr.h" /* required for Curl_sockaddr_storage */
#include "multiif.h"
#include "url.h"
#include "speedcheck.h"
#include "curlx/warnless.h"
#include "http_proxy.h"
#include "socks.h"
@ -675,8 +674,7 @@ static CURLcode getftpresponse(struct Curl_easy *data,
return CURLE_RECV_ERROR;
}
else if(ev == 0) {
if(Curl_pgrsUpdate(data))
return CURLE_ABORTED_BY_CALLBACK;
result = Curl_pgrsUpdate(data);
continue; /* just continue in our loop for the timeout duration */
}
}
@ -4344,10 +4342,7 @@ CURLcode ftp_regular_transfer(struct Curl_easy *data,
bool connected = FALSE;
data->req.size = -1; /* make sure this is unknown at this point */
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
Curl_pgrsReset(data);
ftpc->ctl_valid = TRUE; /* starts good */

View File

@ -129,9 +129,9 @@ const struct Curl_handler Curl_handler_http = {
ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */
ZERO_NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */
Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */
Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */
@ -159,9 +159,9 @@ const struct Curl_handler Curl_handler_https = {
NULL, /* connecting */
ZERO_NULL, /* doing */
NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */
Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */
Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */
@ -1560,13 +1560,30 @@ CURLcode Curl_http_connect(struct Curl_easy *data, bool *done)
/* this returns the socket to wait for in the DO and DOING state for the multi
interface and then we are always _sending_ a request and thus we wait for
the single socket to become writable only */
CURLcode Curl_http_do_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
CURLcode Curl_http_doing_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
/* write mode */
return Curl_pollset_add_out(data, ps, data->conn->sock[FIRSTSOCKET]);
}
CURLcode Curl_http_perform_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
CURLcode result = CURLE_OK;
if(CURL_WANT_RECV(data)) {
result = Curl_pollset_add_in(data, ps, conn->sock[FIRSTSOCKET]);
}
/* on a "Expect: 100-continue" timed wait, do not poll for outgoing */
if(!result && Curl_req_want_send(data) && !http_exp100_is_waiting(data)) {
result = Curl_pollset_add_out(data, ps, conn->sock[FIRSTSOCKET]);
}
return result;
}
/*
* Curl_http_done() gets called after a single HTTP request has been
* performed.
@ -4872,8 +4889,6 @@ static void http_exp100_continue(struct Curl_easy *data,
struct cr_exp100_ctx *ctx = reader->ctx;
if(ctx->state > EXP100_SEND_DATA) {
ctx->state = EXP100_SEND_DATA;
data->req.keepon |= KEEP_SEND;
data->req.keepon &= ~KEEP_SEND_TIMED;
Curl_expire_done(data, EXPIRE_100_TIMEOUT);
}
}
@ -4903,8 +4918,6 @@ static CURLcode cr_exp100_read(struct Curl_easy *data,
ctx->state = EXP100_AWAITING_CONTINUE;
ctx->start = curlx_now();
Curl_expire(data, data->set.expect_100_timeout, EXPIRE_100_TIMEOUT);
data->req.keepon &= ~KEEP_SEND;
data->req.keepon |= KEEP_SEND_TIMED;
*nread = 0;
*eos = FALSE;
return CURLE_OK;
@ -4917,8 +4930,6 @@ static CURLcode cr_exp100_read(struct Curl_easy *data,
ms = curlx_timediff_ms(curlx_now(), ctx->start);
if(ms < data->set.expect_100_timeout) {
DEBUGF(infof(data, "cr_exp100_read, AWAITING_CONTINUE, not expired"));
data->req.keepon &= ~KEEP_SEND;
data->req.keepon |= KEEP_SEND_TIMED;
*nread = 0;
*eos = FALSE;
return CURLE_OK;
@ -4938,7 +4949,6 @@ static void cr_exp100_done(struct Curl_easy *data,
{
struct cr_exp100_ctx *ctx = reader->ctx;
ctx->state = premature ? EXP100_FAILED : EXP100_SEND_DATA;
data->req.keepon &= ~KEEP_SEND_TIMED;
Curl_expire_done(data, EXPIRE_100_TIMEOUT);
}

View File

@ -115,8 +115,10 @@ CURLcode Curl_http_setup_conn(struct Curl_easy *data,
CURLcode Curl_http(struct Curl_easy *data, bool *done);
CURLcode Curl_http_done(struct Curl_easy *data, CURLcode, bool premature);
CURLcode Curl_http_connect(struct Curl_easy *data, bool *done);
CURLcode Curl_http_do_pollset(struct Curl_easy *data,
struct easy_pollset *ps);
CURLcode Curl_http_doing_pollset(struct Curl_easy *data,
struct easy_pollset *ps);
CURLcode Curl_http_perform_pollset(struct Curl_easy *data,
struct easy_pollset *ps);
CURLcode Curl_http_write_resp(struct Curl_easy *data,
const char *buf, size_t blen,
bool is_eos);

View File

@ -98,33 +98,6 @@
#define H2_SETTINGS_IV_LEN 3
#define H2_BINSETTINGS_LEN 80
static size_t populate_settings(nghttp2_settings_entry *iv,
struct Curl_easy *data)
{
iv[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
iv[1].value = H2_STREAM_WINDOW_SIZE_INITIAL;
iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
iv[2].value = data->multi->push_cb != NULL;
return 3;
}
static ssize_t populate_binsettings(uint8_t *binsettings,
struct Curl_easy *data)
{
nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
size_t ivlen;
ivlen = populate_settings(iv, data);
/* this returns number of bytes it wrote or a negative number on error. */
return nghttp2_pack_settings_payload(binsettings, H2_BINSETTINGS_LEN,
iv, ivlen);
}
struct cf_h2_ctx {
nghttp2_session *h2;
/* The easy handle used in the current filter call, cleared at return */
@ -137,6 +110,7 @@ struct cf_h2_ctx {
struct uint_hash streams; /* hash of `data->mid` to `h2_stream_ctx` */
size_t drain_total; /* sum of all stream's UrlState drain */
uint32_t initial_win_size; /* current initial window size (settings) */
uint32_t max_concurrent_streams;
uint32_t goaway_error; /* goaway error code from server */
int32_t remote_max_sid; /* max id processed by server */
@ -204,6 +178,60 @@ static void cf_h2_ctx_close(struct cf_h2_ctx *ctx)
}
}
static uint32_t cf_h2_initial_win_size(struct Curl_easy *data)
{
#if NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE
/* If the transfer has a rate-limit lower than the default initial
* stream window size, use that. It needs to be at least 8k or servers
* may be unhappy. */
if(data->progress.dl.rlimit.rate_per_step &&
(data->progress.dl.rlimit.rate_per_step < H2_STREAM_WINDOW_SIZE_INITIAL))
return CURLMAX((uint32_t)data->progress.dl.rlimit.rate_per_step, 8192);
#endif
return H2_STREAM_WINDOW_SIZE_INITIAL;
}
static size_t populate_settings(nghttp2_settings_entry *iv,
struct Curl_easy *data,
struct cf_h2_ctx *ctx)
{
iv[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
iv[1].value = cf_h2_initial_win_size(data);
if(ctx)
ctx->initial_win_size = iv[1].value;
iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
iv[2].value = data->multi->push_cb != NULL;
return 3;
}
static ssize_t populate_binsettings(uint8_t *binsettings,
struct Curl_easy *data)
{
nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
size_t ivlen;
ivlen = populate_settings(iv, data, NULL);
/* this returns number of bytes it wrote or a negative number on error. */
return nghttp2_pack_settings_payload(binsettings, H2_BINSETTINGS_LEN,
iv, ivlen);
}
static CURLcode cf_h2_update_settings(struct cf_h2_ctx *ctx,
uint32_t initial_win_size)
{
nghttp2_settings_entry entry;
entry.settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry.value = initial_win_size;
if(nghttp2_submit_settings(ctx->h2, NGHTTP2_FLAG_NONE, &entry, 1))
return CURLE_SEND_ERROR;
ctx->initial_win_size = initial_win_size;
return CURLE_OK;
}
static CURLcode nw_out_flush(struct Curl_cfilter *cf,
struct Curl_easy *data);
@ -296,16 +324,18 @@ static void h2_stream_hash_free(unsigned int id, void *stream)
static int32_t cf_h2_get_desired_local_win(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
curl_off_t avail = Curl_rlimit_avail(&data->progress.dl.rlimit,
curlx_now());
(void)cf;
if(data->set.max_recv_speed && data->set.max_recv_speed < INT32_MAX) {
/* The transfer should only receive `max_recv_speed` bytes per second.
* We restrict the stream's local window size, so that the server cannot
* send us "too much" at a time.
* This gets less precise the higher the latency. */
return (int32_t)data->set.max_recv_speed;
if(avail < CURL_OFF_T_MAX) { /* limit in place */
if(avail <= 0)
return 0;
else if(avail < INT32_MAX)
return (int32_t)avail;
}
#ifdef DEBUGBUILD
else {
{
struct cf_h2_ctx *ctx = cf->ctx;
CURL_TRC_CF(data, cf, "stream_win_max=%d", ctx->stream_win_max);
return ctx->stream_win_max;
@ -580,7 +610,7 @@ static CURLcode cf_h2_ctx_open(struct Curl_cfilter *cf,
nghttp2_settings_entry iv[H2_SETTINGS_IV_LEN];
size_t ivlen;
ivlen = populate_settings(iv, data);
ivlen = populate_settings(iv, data, ctx);
rc = nghttp2_submit_settings(ctx->h2, NGHTTP2_FLAG_NONE,
iv, ivlen);
if(rc) {
@ -2007,6 +2037,9 @@ static CURLcode stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
(void)len;
*pnread = 0;
if(!stream->xfer_result)
stream->xfer_result = cf_h2_update_local_win(cf, data, stream);
if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%d] xfer write failed", stream->id);
result = stream->xfer_result;
@ -2239,6 +2272,7 @@ static CURLcode h2_submit(struct h2_stream_ctx **pstream,
nghttp2_priority_spec pri_spec;
size_t nwritten;
CURLcode result = CURLE_OK;
uint32_t initial_win_size;
*pnwritten = 0;
Curl_dynhds_init(&h2_headers, 0, DYN_HTTP_REQUEST);
@ -2276,6 +2310,15 @@ static CURLcode h2_submit(struct h2_stream_ctx **pstream,
if(!nghttp2_session_check_request_allowed(ctx->h2))
CURL_TRC_CF(data, cf, "send request NOT allowed (via nghttp2)");
/* Check the initial windows size of the transfer (rate-limits?) and
* send an updated settings on changes from previous value. */
initial_win_size = cf_h2_initial_win_size(data);
if(initial_win_size != ctx->initial_win_size) {
result = cf_h2_update_settings(ctx, initial_win_size);
if(result)
goto out;
}
switch(data->state.httpreq) {
case HTTPREQ_POST:
case HTTPREQ_POST_FORM:

View File

@ -1938,10 +1938,7 @@ static CURLcode imap_regular_transfer(struct Curl_easy *data,
data->req.size = -1;
/* Set the progress data */
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
Curl_pgrsReset(data);
/* Carry out the perform */
result = imap_perform(data, &connected, dophase_done);

View File

@ -509,7 +509,7 @@ static CURLcode ldap_do(struct Curl_easy *data, bool *done)
goto quit;
}
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsReset(data);
rc = ldap_search_s(server, ludp->lud_dn,
(curl_ldap_num_t)ludp->lud_scope,
ludp->lud_filter, ludp->lud_attrs, 0, &ldapmsg);

View File

@ -43,7 +43,6 @@
#include "select.h"
#include "curlx/warnless.h"
#include "curlx/wait.h"
#include "speedcheck.h"
#include "conncache.h"
#include "multihandle.h"
#include "sigpipe.h"
@ -923,79 +922,145 @@ void Curl_attach_connection(struct Curl_easy *data,
conn->handler->attach(data, conn);
}
/* adjust pollset for rate limits/pauses */
static CURLcode multi_adjust_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
CURLcode result = CURLE_OK;
if(ps->n) {
struct curltime now = curlx_now();
bool send_blocked, recv_blocked;
recv_blocked = (Curl_rlimit_avail(&data->progress.dl.rlimit, now) <= 0);
send_blocked = (Curl_rlimit_avail(&data->progress.ul.rlimit, now) <= 0);
if(send_blocked || recv_blocked) {
int i;
for(i = 0; i <= SECONDARYSOCKET; ++i) {
curl_socket_t sock = data->conn->sock[i];
if(sock == CURL_SOCKET_BAD)
continue;
if(recv_blocked && Curl_pollset_want_recv(data, ps, sock)) {
result = Curl_pollset_remove_in(data, ps, sock);
if(result)
break;
}
if(send_blocked && Curl_pollset_want_send(data, ps, sock)) {
result = Curl_pollset_remove_out(data, ps, sock);
if(result)
break;
}
}
}
/* Not blocked and wanting to receive. If there is data pending
* in the connection filters, make transfer run again. */
if(!recv_blocked &&
((Curl_pollset_want_recv(data, ps, data->conn->sock[FIRSTSOCKET]) &&
Curl_conn_data_pending(data, FIRSTSOCKET)) ||
(Curl_pollset_want_recv(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
CURL_TRC_M(data, "pollset[] has POLLIN, but there is still "
"buffered input -> mark as dirty");
Curl_multi_mark_dirty(data);
}
}
return result;
}
static CURLcode mstate_connecting_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
if(data->conn) {
curl_socket_t sockfd = Curl_conn_get_first_socket(data);
if(sockfd != CURL_SOCKET_BAD) {
/* Default is to wait to something from the server */
return Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
}
struct connectdata *conn = data->conn;
curl_socket_t sockfd;
CURLcode result = CURLE_OK;
if(Curl_xfer_recv_is_paused(data))
return CURLE_OK;
/* If a socket is set, receiving is default. If the socket
* has not been determined yet (eyeballing), always ask the
* connection filters for what to monitor. */
sockfd = Curl_conn_get_first_socket(data);
if(sockfd != CURL_SOCKET_BAD) {
result = Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
if(!result)
result = multi_adjust_pollset(data, ps);
}
return CURLE_OK;
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
}
static CURLcode mstate_protocol_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
if(conn) {
curl_socket_t sockfd;
if(conn->handler->proto_pollset)
return conn->handler->proto_pollset(data, ps);
sockfd = conn->sock[FIRSTSOCKET];
CURLcode result = CURLE_OK;
if(conn->handler->proto_pollset)
result = conn->handler->proto_pollset(data, ps);
else {
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
if(sockfd != CURL_SOCKET_BAD) {
/* Default is to wait to something from the server */
return Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
result = Curl_pollset_change(data, ps, sockfd, CURL_POLL_IN, 0);
}
}
return CURLE_OK;
if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
}
static CURLcode mstate_do_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
if(conn) {
if(conn->handler->doing_pollset)
return conn->handler->doing_pollset(data, ps);
else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
/* Default is that we want to send something to the server */
return Curl_pollset_add_out(
data, ps, conn->sock[conn->send_idx]);
}
CURLcode result = CURLE_OK;
if(conn->handler->doing_pollset)
result = conn->handler->doing_pollset(data, ps);
else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
/* Default is that we want to send something to the server */
result = Curl_pollset_add_out(data, ps, conn->sock[conn->send_idx]);
}
return CURLE_OK;
if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
}
static CURLcode mstate_domore_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
if(conn) {
if(conn->handler->domore_pollset)
return conn->handler->domore_pollset(data, ps);
else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
/* Default is that we want to send something to the server */
return Curl_pollset_add_out(
data, ps, conn->sock[conn->send_idx]);
}
CURLcode result = CURLE_OK;
if(conn->handler->domore_pollset)
result = conn->handler->domore_pollset(data, ps);
else if(CONN_SOCK_IDX_VALID(conn->send_idx)) {
/* Default is that we want to send something to the server */
result = Curl_pollset_add_out(data, ps, conn->sock[conn->send_idx]);
}
return CURLE_OK;
if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
}
static CURLcode mstate_perform_pollset(struct Curl_easy *data,
struct easy_pollset *ps)
{
struct connectdata *conn = data->conn;
if(!conn)
return CURLE_OK;
else if(conn->handler->perform_pollset)
return conn->handler->perform_pollset(data, ps);
CURLcode result = CURLE_OK;
if(conn->handler->perform_pollset)
result = conn->handler->perform_pollset(data, ps);
else {
/* Default is to obey the data->req.keepon flags for send/recv */
CURLcode result = CURLE_OK;
if(CURL_WANT_RECV(data) && CONN_SOCK_IDX_VALID(conn->recv_idx)) {
result = Curl_pollset_add_in(
data, ps, conn->sock[conn->recv_idx]);
@ -1006,19 +1071,21 @@ static CURLcode mstate_perform_pollset(struct Curl_easy *data,
result = Curl_pollset_add_out(
data, ps, conn->sock[conn->send_idx]);
}
return result;
}
if(!result)
result = multi_adjust_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, conn, ps);
return result;
}
/* Initializes `poll_set` with the current socket poll actions needed
* for transfer `data`. */
CURLMcode Curl_multi_pollset(struct Curl_easy *data,
struct easy_pollset *ps,
const char *caller)
struct easy_pollset *ps)
{
CURLMcode mresult = CURLM_OK;
CURLcode result = CURLE_OK;
bool expect_sockets = TRUE;
/* If the transfer has no connection, this is fine. Happens when
called via curl_multi_remove_handle() => Curl_multi_ev_assess() =>
@ -1033,70 +1100,49 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
case MSTATE_SETUP:
case MSTATE_CONNECT:
/* nothing to poll for yet */
expect_sockets = FALSE;
break;
case MSTATE_RESOLVING:
result = Curl_resolv_pollset(data, ps);
/* connection filters are not involved in this phase. It is OK if we get no
* sockets to wait for. Resolving can wake up from other sources. */
expect_sockets = FALSE;
break;
case MSTATE_CONNECTING:
case MSTATE_TUNNELING:
if(!Curl_xfer_recv_is_paused(data)) {
result = mstate_connecting_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
}
else
expect_sockets = FALSE;
result = mstate_connecting_pollset(data, ps);
break;
case MSTATE_PROTOCONNECT:
case MSTATE_PROTOCONNECTING:
result = mstate_protocol_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_DO:
case MSTATE_DOING:
result = mstate_do_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_DOING_MORE:
result = mstate_domore_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_DID: /* same as PERFORMING in regard to polling */
case MSTATE_PERFORMING:
result = mstate_perform_pollset(data, ps);
if(!result)
result = Curl_conn_adjust_pollset(data, data->conn, ps);
break;
case MSTATE_RATELIMITING:
/* we need to let time pass, ignore socket(s) */
expect_sockets = FALSE;
break;
case MSTATE_DONE:
case MSTATE_COMPLETED:
case MSTATE_MSGSENT:
/* nothing more to poll for */
expect_sockets = FALSE;
break;
default:
failf(data, "multi_getsock: unexpected multi state %d", data->mstate);
DEBUGASSERT(0);
expect_sockets = FALSE;
break;
}
@ -1110,39 +1156,27 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
goto out;
}
/* Unblocked and waiting to receive with buffered input.
* Make transfer run again at next opportunity. */
if(!Curl_xfer_is_blocked(data) && !Curl_xfer_is_too_fast(data) &&
((Curl_pollset_want_read(data, ps, data->conn->sock[FIRSTSOCKET]) &&
Curl_conn_data_pending(data, FIRSTSOCKET)) ||
(Curl_pollset_want_read(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
CURL_TRC_M(data, "%s pollset[] has POLLIN, but there is still "
"buffered input to consume -> mark as dirty", caller);
Curl_multi_mark_dirty(data);
}
#ifndef CURL_DISABLE_VERBOSE_STRINGS
if(CURL_TRC_M_is_verbose(data)) {
size_t timeout_count = Curl_llist_count(&data->state.timeoutlist);
switch(ps->n) {
case 0:
CURL_TRC_M(data, "%s pollset[], timeouts=%zu, paused %d/%d (r/w)",
caller, timeout_count,
CURL_TRC_M(data, "pollset[], timeouts=%zu, paused %d/%d (r/w)",
timeout_count,
Curl_xfer_send_is_paused(data),
Curl_xfer_recv_is_paused(data));
break;
case 1:
CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
caller, ps->sockets[0],
CURL_TRC_M(data, "pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
ps->sockets[0],
(ps->actions[0] & CURL_POLL_IN) ? "IN" : "",
(ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "",
timeout_count);
break;
case 2:
CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s, "
CURL_TRC_M(data, "pollset[fd=%" FMT_SOCKET_T " %s%s, "
"fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu",
caller, ps->sockets[0],
ps->sockets[0],
(ps->actions[0] & CURL_POLL_IN) ? "IN" : "",
(ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "",
ps->sockets[1],
@ -1151,27 +1185,14 @@ CURLMcode Curl_multi_pollset(struct Curl_easy *data,
timeout_count);
break;
default:
CURL_TRC_M(data, "%s pollset[fds=%u], timeouts=%zu",
caller, ps->n, timeout_count);
CURL_TRC_M(data, "pollset[fds=%u], timeouts=%zu",
ps->n, timeout_count);
break;
}
CURL_TRC_EASY_TIMERS(data);
}
#endif
if(expect_sockets && !ps->n && data->multi &&
!Curl_uint_bset_contains(&data->multi->dirty, data->mid) &&
!Curl_llist_count(&data->state.timeoutlist) &&
!Curl_cwriter_is_paused(data) && !Curl_creader_is_paused(data) &&
Curl_conn_is_ip_connected(data, FIRSTSOCKET)) {
/* We expected sockets for POLL monitoring, but none are set.
* We are not dirty (and run anyway).
* We are not waiting on any timer.
* None of the READ/WRITE directions are paused.
* We are connected to the server on IP level, at least. */
infof(data, "WARNING: no socket in pollset or timer, transfer may stall!");
DEBUGASSERT(0);
}
out:
return mresult;
}
@ -1205,7 +1226,7 @@ CURLMcode curl_multi_fdset(CURLM *m,
continue;
}
Curl_multi_pollset(data, &ps, "curl_multi_fdset");
Curl_multi_pollset(data, &ps);
for(i = 0; i < ps.n; i++) {
if(!FDSET_SOCK(ps.sockets[i]))
/* pretend it does not exist */
@ -1268,7 +1289,7 @@ CURLMcode curl_multi_waitfds(CURLM *m,
Curl_uint_bset_remove(&multi->dirty, mid);
continue;
}
Curl_multi_pollset(data, &ps, "curl_multi_waitfds");
Curl_multi_pollset(data, &ps);
need += Curl_waitfds_add_ps(&cwfds, &ps);
}
while(Curl_uint_bset_next(&multi->process, mid, &mid));
@ -1354,7 +1375,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
Curl_uint_bset_remove(&multi->dirty, mid);
continue;
}
Curl_multi_pollset(data, &ps, "multi_wait");
Curl_multi_pollset(data, &ps);
if(Curl_pollfds_add_ps(&cpfds, &ps)) {
result = CURLM_OUT_OF_MEMORY;
goto out;
@ -1907,35 +1928,28 @@ static CURLcode multi_follow(struct Curl_easy *data,
}
static CURLcode mspeed_check(struct Curl_easy *data,
struct curltime *nowp)
struct curltime now)
{
timediff_t recv_wait_ms = 0;
timediff_t send_wait_ms = 0;
/* check if over send speed */
if(data->set.max_send_speed)
send_wait_ms = Curl_pgrsLimitWaitTime(&data->progress.ul,
data->set.max_send_speed,
*nowp);
/* check if over recv speed */
if(data->set.max_recv_speed)
recv_wait_ms = Curl_pgrsLimitWaitTime(&data->progress.dl,
data->set.max_recv_speed,
*nowp);
/* check if our send/recv limits require idle waits */
send_wait_ms = Curl_rlimit_wait_ms(&data->progress.ul.rlimit, now);
recv_wait_ms = Curl_rlimit_wait_ms(&data->progress.dl.rlimit, now);
if(send_wait_ms || recv_wait_ms) {
if(data->mstate != MSTATE_RATELIMITING) {
Curl_ratelimit(data, *nowp);
multistate(data, MSTATE_RATELIMITING);
}
Curl_expire(data, CURLMAX(send_wait_ms, recv_wait_ms), EXPIRE_TOOFAST);
Curl_multi_clear_dirty(data);
CURL_TRC_M(data, "[RLIMIT] waiting %" FMT_TIMEDIFF_T "ms",
CURLMAX(send_wait_ms, recv_wait_ms));
return CURLE_AGAIN;
}
else if(data->mstate != MSTATE_PERFORMING) {
CURL_TRC_M(data, "[RLIMIT] wait over, continue");
multistate(data, MSTATE_PERFORMING);
Curl_ratelimit(data, *nowp);
}
return CURLE_OK;
}
@ -1951,7 +1965,7 @@ static CURLMcode state_performing(struct Curl_easy *data,
CURLcode result = *resultp = CURLE_OK;
*stream_errorp = FALSE;
if(mspeed_check(data, nowp) == CURLE_AGAIN)
if(mspeed_check(data, *nowp) == CURLE_AGAIN)
return CURLM_OK;
/* read/write data if it is ready to do so */
@ -2073,7 +2087,8 @@ static CURLMcode state_performing(struct Curl_easy *data,
}
}
else { /* not errored, not done */
mspeed_check(data, nowp);
*nowp = curlx_now();
mspeed_check(data, *nowp);
}
free(newurl);
*resultp = result;
@ -2228,10 +2243,7 @@ static CURLMcode state_ratelimiting(struct Curl_easy *data,
CURLMcode rc = CURLM_OK;
DEBUGASSERT(data->conn);
/* if both rates are within spec, resume transfer */
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, *nowp);
result = Curl_pgrsCheck(data);
if(result) {
if(!(data->conn->handler->flags & PROTOPT_DUAL) &&
@ -2242,7 +2254,7 @@ static CURLMcode state_ratelimiting(struct Curl_easy *data,
multi_done(data, result, TRUE);
}
else {
if(!mspeed_check(data, nowp))
if(!mspeed_check(data, *nowp))
rc = CURLM_CALL_MULTI_PERFORM;
}
*resultp = result;
@ -2387,6 +2399,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
(HTTP/2), or the full connection for older protocols */
bool stream_error = FALSE;
rc = CURLM_OK;
/* update at start for continuous increase when looping */
*nowp = curlx_now();
if(multi_ischanged(multi, TRUE)) {
CURL_TRC_M(data, "multi changed, check CONNECT_PEND queue");
@ -2704,16 +2718,18 @@ statemachine_end:
rc = CURLM_CALL_MULTI_PERFORM;
}
/* if there is still a connection to use, call the progress function */
else if(data->conn && Curl_pgrsUpdate(data)) {
/* aborted due to progress callback return code must close the
connection */
result = CURLE_ABORTED_BY_CALLBACK;
streamclose(data->conn, "Aborted by callback");
else if(data->conn) {
result = Curl_pgrsUpdate(data);
if(result) {
/* aborted due to progress callback return code must close the
connection */
streamclose(data->conn, "Aborted by callback");
/* if not yet in DONE state, go there, otherwise COMPLETED */
multistate(data, (data->mstate < MSTATE_DONE) ?
MSTATE_DONE : MSTATE_COMPLETED);
rc = CURLM_CALL_MULTI_PERFORM;
/* if not yet in DONE state, go there, otherwise COMPLETED */
multistate(data, (data->mstate < MSTATE_DONE) ?
MSTATE_DONE : MSTATE_COMPLETED);
rc = CURLM_CALL_MULTI_PERFORM;
}
}
}

View File

@ -508,7 +508,7 @@ static CURLMcode mev_assess(struct Curl_multi *multi,
}
}
else
Curl_multi_pollset(data, &ps, "ev assess");
Curl_multi_pollset(data, &ps);
last_ps = mev_get_last_pollset(data, conn);
if(!last_ps && ps.n) {

View File

@ -73,8 +73,7 @@ CURLMcode Curl_multi_add_perform(struct Curl_multi *multi,
unsigned int Curl_multi_max_concurrent_streams(struct Curl_multi *multi);
CURLMcode Curl_multi_pollset(struct Curl_easy *data,
struct easy_pollset *ps,
const char *caller);
struct easy_pollset *ps);
/**
* Borrow the transfer buffer from the multi, suitable

View File

@ -33,7 +33,6 @@
#include "sendf.h"
#include "select.h"
#include "progress.h"
#include "speedcheck.h"
#include "pingpong.h"
#include "multiif.h"
#include "vtls/vtls.h"
@ -122,11 +121,7 @@ CURLcode Curl_pp_statemach(struct Curl_easy *data,
if(block) {
/* if we did not wait, we do not have to spend time on this now */
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
result = Curl_pgrsCheck(data);
if(result)
return result;
}

View File

@ -1503,10 +1503,7 @@ static CURLcode pop3_regular_transfer(struct Curl_easy *data,
data->req.size = -1;
/* Set the progress data */
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
Curl_pgrsReset(data);
/* Carry out the perform */
result = pop3_perform(data, &connected, dophase_done);

View File

@ -28,6 +28,7 @@
#include "sendf.h"
#include "multiif.h"
#include "progress.h"
#include "transfer.h"
#include "curlx/timeval.h"
/* check rate limits within this many recent milliseconds, at minimum. */
@ -92,6 +93,55 @@ static char *max6data(curl_off_t bytes, char *max6)
}
#endif
static void pgrs_speedinit(struct Curl_easy *data)
{
memset(&data->state.keeps_speed, 0, sizeof(struct curltime));
}
/*
* @unittest: 1606
*/
UNITTEST CURLcode pgrs_speedcheck(struct Curl_easy *data,
struct curltime *pnow)
{
if(!data->set.low_speed_time || !data->set.low_speed_limit ||
Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data))
/* A paused transfer is not qualified for speed checks */
return CURLE_OK;
if((data->progress.current_speed >= 0) && data->set.low_speed_time) {
if(data->progress.current_speed < data->set.low_speed_limit) {
if(!data->state.keeps_speed.tv_sec)
/* under the limit at this moment */
data->state.keeps_speed = *pnow;
else {
/* how long has it been under the limit */
timediff_t howlong = curlx_timediff_ms(*pnow, data->state.keeps_speed);
if(howlong >= data->set.low_speed_time * 1000) {
/* too long */
failf(data,
"Operation too slow. "
"Less than %ld bytes/sec transferred the last %ld seconds",
data->set.low_speed_limit,
data->set.low_speed_time);
return CURLE_OPERATION_TIMEDOUT;
}
}
}
else
/* faster right now */
data->state.keeps_speed.tv_sec = 0;
}
if(data->set.low_speed_limit)
/* if low speed limit is enabled, set the expire timer to make this
connection's speed get checked again in a second */
Curl_expire(data, 1000, EXPIRE_SPEEDCHECK);
return CURLE_OK;
}
/*
New proposed interface, 9th of February 2000:
@ -119,10 +169,19 @@ int Curl_pgrsDone(struct Curl_easy *data)
* hidden */
curl_mfprintf(data->set.err, "\n");
data->progress.speeder_c = 0; /* reset the progress meter display */
return 0;
}
void Curl_pgrsReset(struct Curl_easy *data)
{
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
data->progress.speeder_c = 0; /* reset speed records */
pgrs_speedinit(data);
}
/* reset the known transfer sizes */
void Curl_pgrsResetTransferSizes(struct Curl_easy *data)
{
@ -130,6 +189,14 @@ void Curl_pgrsResetTransferSizes(struct Curl_easy *data)
Curl_pgrsSetUploadSize(data, -1);
}
void Curl_pgrsRecvPause(struct Curl_easy *data, bool enable)
{
if(!enable) {
data->progress.speeder_c = 0; /* reset speed records */
pgrs_speedinit(data); /* reset low speed measurements */
}
}
/*
*
* Curl_pgrsTimeWas(). Store the timestamp time at the given label.
@ -228,72 +295,11 @@ void Curl_pgrsStartNow(struct Curl_easy *data)
p->speeder_c = 0; /* reset the progress meter display */
p->start = curlx_now();
p->is_t_startransfer_set = FALSE;
p->ul.limit.start = p->start;
p->dl.limit.start = p->start;
p->ul.limit.start_size = 0;
p->dl.limit.start_size = 0;
p->dl.cur_size = 0;
p->ul.cur_size = 0;
/* the sizes are unknown at start */
p->dl_size_known = FALSE;
p->ul_size_known = FALSE;
Curl_ratelimit(data, p->start);
}
/*
* This is used to handle speed limits, calculating how many milliseconds to
* wait until we are back under the speed limit, if needed.
*
* The way it works is by having a "starting point" (time & amount of data
* transferred by then) used in the speed computation, to be used instead of
* the start of the transfer. This starting point is regularly moved as
* transfer goes on, to keep getting accurate values (instead of average over
* the entire transfer).
*
* This function takes the current amount of data transferred, the amount at
* the starting point, the limit (in bytes/s), the time of the starting point
* and the current time.
*
* Returns 0 if no waiting is needed or when no waiting is needed but the
* starting point should be reset (to current); or the number of milliseconds
* to wait to get back under the speed limit.
*/
timediff_t Curl_pgrsLimitWaitTime(struct pgrs_dir *d,
curl_off_t bytes_per_sec,
struct curltime now)
{
curl_off_t bytes = d->cur_size - d->limit.start_size;
timediff_t should_ms;
timediff_t took_ms;
/* no limit or we did not get to any bytes yet */
if(!bytes_per_sec || !bytes)
return 0;
/* The time it took us to have `bytes` */
took_ms = curlx_timediff_ceil_ms(now, d->limit.start);
/* The time it *should* have taken us to have `bytes`
* when obeying the bytes_per_sec speed_limit. */
if(bytes < CURL_OFF_T_MAX/1000) {
/* (1000 * bytes / (bytes / sec)) = 1000 * sec = ms */
should_ms = (timediff_t) (1000 * bytes / bytes_per_sec);
}
else {
/* large `bytes`, first calc the seconds it should have taken.
* if that is small enough, convert to milliseconds. */
should_ms = (timediff_t) (bytes / bytes_per_sec);
if(should_ms < TIMEDIFF_T_MAX/1000)
should_ms *= 1000;
else
should_ms = TIMEDIFF_T_MAX;
}
if(took_ms < should_ms) {
/* when gotten to `bytes` too fast, wait the difference */
return should_ms - took_ms;
}
return 0;
}
/*
@ -304,28 +310,6 @@ void Curl_pgrsSetDownloadCounter(struct Curl_easy *data, curl_off_t size)
data->progress.dl.cur_size = size;
}
/*
* Update the timestamp and sizestamp to use for rate limit calculations.
*/
void Curl_ratelimit(struct Curl_easy *data, struct curltime now)
{
/* do not set a new stamp unless the time since last update is long enough */
if(data->set.max_recv_speed) {
if(curlx_timediff_ms(now, data->progress.dl.limit.start) >=
MIN_RATE_LIMIT_PERIOD) {
data->progress.dl.limit.start = now;
data->progress.dl.limit.start_size = data->progress.dl.cur_size;
}
}
if(data->set.max_send_speed) {
if(curlx_timediff_ms(now, data->progress.ul.limit.start) >=
MIN_RATE_LIMIT_PERIOD) {
data->progress.ul.limit.start = now;
data->progress.ul.limit.start_size = data->progress.ul.cur_size;
}
}
}
/*
* Set the number of uploaded bytes so far.
*/
@ -378,75 +362,82 @@ static curl_off_t trspeed(curl_off_t size, /* number of bytes */
}
/* returns TRUE if it is time to show the progress meter */
static bool progress_calc(struct Curl_easy *data, struct curltime now)
static bool progress_calc(struct Curl_easy *data, struct curltime *pnow)
{
bool timetoshow = FALSE;
struct Progress * const p = &data->progress;
int i_next, i_oldest, i_latest;
timediff_t duration_ms;
curl_off_t amount;
/* The time spent so far (from the start) in microseconds */
p->timespent = curlx_timediff_us(now, p->start);
p->timespent = curlx_timediff_us(*pnow, p->start);
p->dl.speed = trspeed(p->dl.cur_size, p->timespent);
p->ul.speed = trspeed(p->ul.cur_size, p->timespent);
/* Calculations done at most once a second, unless end is reached */
if(p->lastshow != now.tv_sec) {
int countindex; /* amount of seconds stored in the speeder array */
int nowindex = p->speeder_c% CURR_TIME;
p->lastshow = now.tv_sec;
timetoshow = TRUE;
/* Let's do the "current speed" thing, with the dl + ul speeds
combined. Store the speed at entry 'nowindex'. */
p->speeder[ nowindex ] = p->dl.cur_size + p->ul.cur_size;
/* remember the exact time for this moment */
p->speeder_time [ nowindex ] = now;
/* advance our speeder_c counter, which is increased every time we get
here and we expect it to never wrap as 2^32 is a lot of seconds! */
if(!p->speeder_c) { /* no previous record exists */
p->speed_amount[0] = p->dl.cur_size + p->ul.cur_size;
p->speed_time[0] = *pnow;
p->speeder_c++;
/* use the overall average at the start */
p->current_speed = p->ul.speed + p->dl.speed;
p->lastshow = pnow->tv_sec;
return TRUE;
}
/* We have at least one record now. Where to put the next and
* where is the latest one? */
i_next = p->speeder_c % CURL_SPEED_RECORDS;
i_latest = (i_next > 0) ? (i_next - 1) : (CURL_SPEED_RECORDS - 1);
/* figure out how many index entries of data we have stored in our speeder
array. With N_ENTRIES filled in, we have about N_ENTRIES-1 seconds of
transfer. Imagine, after one second we have filled in two entries,
after two seconds we have filled in three entries etc. */
countindex = ((p->speeder_c >= CURR_TIME) ? CURR_TIME : p->speeder_c) - 1;
/* first of all, we do not do this if there is no counted seconds yet */
if(countindex) {
int checkindex;
timediff_t span_ms;
curl_off_t amount;
/* Get the index position to compare with the 'nowindex' position.
Get the oldest entry possible. While we have less than CURR_TIME
entries, the first entry will remain the oldest. */
checkindex = (p->speeder_c >= CURR_TIME) ? p->speeder_c%CURR_TIME : 0;
/* Figure out the exact time for the time span */
span_ms = curlx_timediff_ms(now, p->speeder_time[checkindex]);
if(span_ms == 0)
span_ms = 1; /* at least one millisecond MUST have passed */
/* Calculate the average speed the last 'span_ms' milliseconds */
amount = p->speeder[nowindex]- p->speeder[checkindex];
if(amount > (0xffffffff/1000))
/* the 'amount' value is bigger than would fit in 32 bits if
multiplied with 1000, so we use the double math for this */
p->current_speed = (curl_off_t)
((double)amount/((double)span_ms/1000.0));
else
/* the 'amount' value is small enough to fit within 32 bits even
when multiplied with 1000 */
p->current_speed = amount * 1000/span_ms;
/* Make a new record only when some time has passed.
* Too frequent calls otherwise ruin the history. */
if(curlx_timediff_ms(*pnow, p->speed_time[i_latest]) >= 1000) {
p->speeder_c++;
i_latest = i_next;
p->speed_amount[i_latest] = p->dl.cur_size + p->ul.cur_size;
p->speed_time[i_latest] = *pnow;
}
else if(data->req.done) {
/* When a transfer is done, and we did not have a current speed
* already, update the last record. Otherwise, stay at the speed
* we have. The last chunk of data, when rate limiting, would increase
* reported speed since it no longer measures a full second. */
if(!p->current_speed) {
p->speed_amount[i_latest] = p->dl.cur_size + p->ul.cur_size;
p->speed_time[i_latest] = *pnow;
}
else
/* the first second we use the average */
p->current_speed = p->ul.speed + p->dl.speed;
}
else {
/* transfer ongoing, wait for more time to pass. */
return FALSE;
}
} /* Calculations end */
return timetoshow;
i_oldest = (p->speeder_c < CURL_SPEED_RECORDS) ? 0 :
((i_latest + 1) % CURL_SPEED_RECORDS);
/* How much we transferred between oldest and current records */
amount = p->speed_amount[i_latest]- p->speed_amount[i_oldest];
/* How long this took */
duration_ms = curlx_timediff_ms(p->speed_time[i_latest],
p->speed_time[i_oldest]);
if(duration_ms <= 0)
duration_ms = 1;
if(amount > (CURL_OFF_T_MAX/1000)) {
/* the 'amount' value is bigger than would fit in 64 bits if
multiplied with 1000, so we use the double math for this */
p->current_speed = (curl_off_t)
(((double)amount * 1000.0)/(double)duration_ms);
}
else {
/* the 'amount' value is small enough to fit within 32 bits even
when multiplied with 1000 */
p->current_speed = amount * 1000 / duration_ms;
}
if((p->lastshow == pnow->tv_sec) && !data->req.done)
return FALSE;
p->lastshow = pnow->tv_sec;
return TRUE;
}
#ifndef CURL_DISABLE_PROGRESS_METER
@ -568,7 +559,7 @@ static void progress_meter(struct Curl_easy *data)
* Curl_pgrsUpdate() returns 0 for success or the value returned by the
* progress callback!
*/
static int pgrsupdate(struct Curl_easy *data, bool showprogress)
static CURLcode pgrsupdate(struct Curl_easy *data, bool showprogress)
{
if(!data->progress.hide) {
if(data->set.fxferinfo) {
@ -582,9 +573,11 @@ static int pgrsupdate(struct Curl_easy *data, bool showprogress)
data->progress.ul.cur_size);
Curl_set_in_callback(data, FALSE);
if(result != CURL_PROGRESSFUNC_CONTINUE) {
if(result)
if(result) {
failf(data, "Callback aborted");
return result;
return CURLE_ABORTED_BY_CALLBACK;
}
return CURLE_OK;
}
}
else if(data->set.fprogress) {
@ -598,9 +591,11 @@ static int pgrsupdate(struct Curl_easy *data, bool showprogress)
(double)data->progress.ul.cur_size);
Curl_set_in_callback(data, FALSE);
if(result != CURL_PROGRESSFUNC_CONTINUE) {
if(result)
if(result) {
failf(data, "Callback aborted");
return result;
return CURLE_ABORTED_BY_CALLBACK;
}
return CURLE_OK;
}
}
@ -608,14 +603,30 @@ static int pgrsupdate(struct Curl_easy *data, bool showprogress)
progress_meter(data);
}
return 0;
return CURLE_OK;
}
int Curl_pgrsUpdate(struct Curl_easy *data)
static CURLcode pgrs_update(struct Curl_easy *data, struct curltime *pnow)
{
bool showprogress = progress_calc(data, pnow);
return pgrsupdate(data, showprogress);
}
CURLcode Curl_pgrsUpdate(struct Curl_easy *data)
{
struct curltime now = curlx_now(); /* what time is it */
bool showprogress = progress_calc(data, now);
return pgrsupdate(data, showprogress);
return pgrs_update(data, &now);
}
CURLcode Curl_pgrsCheck(struct Curl_easy *data)
{
struct curltime now = curlx_now();
CURLcode result;
result = pgrs_update(data, &now);
if(!result && !data->req.done)
result = pgrs_speedcheck(data, &now);
return result;
}
/*
@ -624,5 +635,5 @@ int Curl_pgrsUpdate(struct Curl_easy *data)
void Curl_pgrsUpdate_nometer(struct Curl_easy *data)
{
struct curltime now = curlx_now(); /* what time is it */
(void)progress_calc(data, now);
(void)progress_calc(data, &now);
}

View File

@ -26,6 +26,7 @@
#include "curlx/timeval.h"
struct Curl_easy;
typedef enum {
TIMER_NONE,
@ -50,15 +51,23 @@ void Curl_pgrsSetUploadSize(struct Curl_easy *data, curl_off_t size);
void Curl_pgrsSetDownloadCounter(struct Curl_easy *data, curl_off_t size);
void Curl_pgrsSetUploadCounter(struct Curl_easy *data, curl_off_t size);
void Curl_ratelimit(struct Curl_easy *data, struct curltime now);
int Curl_pgrsUpdate(struct Curl_easy *data);
void Curl_pgrsUpdate_nometer(struct Curl_easy *data);
/* perform progress update, invoking callbacks at intervals */
CURLcode Curl_pgrsUpdate(struct Curl_easy *data);
/* perform progress update, no callbacks invoked */
void Curl_pgrsUpdate_nometer(struct Curl_easy *data);
/* perform progress update with callbacks and speed checks */
CURLcode Curl_pgrsCheck(struct Curl_easy *data);
/* Inform progress/speedcheck about receive pausing */
void Curl_pgrsRecvPause(struct Curl_easy *data, bool enable);
/* Reset sizes and couners for up- and download. */
void Curl_pgrsReset(struct Curl_easy *data);
/* Reset sizes for up- and download. */
void Curl_pgrsResetTransferSizes(struct Curl_easy *data);
struct curltime Curl_pgrsTime(struct Curl_easy *data, timerid timer);
timediff_t Curl_pgrsLimitWaitTime(struct pgrs_dir *d,
curl_off_t speed_limit,
struct curltime now);
/**
* Update progress timer with the elapsed time from its start to `timestamp`.
* This allows updating timers later and is used by happy eyeballing, where
@ -69,4 +78,9 @@ void Curl_pgrsTimeWas(struct Curl_easy *data, timerid timer,
void Curl_pgrsEarlyData(struct Curl_easy *data, curl_off_t sent);
#ifdef UNITTESTS
UNITTEST CURLcode pgrs_speedcheck(struct Curl_easy *data,
struct curltime *pnow);
#endif
#endif /* HEADER_CURL_PROGRESS_H */

200
lib/ratelimit.c Normal file
View File

@ -0,0 +1,200 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include "curlx/timeval.h"
#include "ratelimit.h"
#define CURL_US_PER_SEC 1000000
#define CURL_RLIMIT_MIN_CHUNK (16 * 1024)
#define CURL_RLIMIT_MAX_STEPS 2 /* 500ms interval */
void Curl_rlimit_init(struct Curl_rlimit *r,
curl_off_t rate_per_s,
curl_off_t burst_per_s,
struct curltime ts)
{
curl_off_t rate_steps;
DEBUGASSERT(rate_per_s >= 0);
DEBUGASSERT(burst_per_s >= rate_per_s || !burst_per_s);
r->step_us = CURL_US_PER_SEC;
r->rate_per_step = rate_per_s;
r->burst_per_step = burst_per_s;
/* On rates that are multiples of CURL_RLIMIT_MIN_CHUNK, we reduce
* the interval `step_us` from 1 second to smaller steps with at
* most CURL_RLIMIT_MAX_STEPS.
* Smaller means more CPU, but also more precision. */
rate_steps = rate_per_s / CURL_RLIMIT_MIN_CHUNK;
rate_steps = CURLMIN(rate_steps, CURL_RLIMIT_MAX_STEPS);
if(rate_steps >= 2) {
r->step_us /= rate_steps;
r->rate_per_step /= rate_steps;
r->burst_per_step /= rate_steps;
}
r->tokens = r->rate_per_step;
r->spare_us = 0;
r->ts = ts;
r->blocked = FALSE;
}
void Curl_rlimit_start(struct Curl_rlimit *r, struct curltime ts)
{
r->tokens = r->rate_per_step;
r->spare_us = 0;
r->ts = ts;
}
bool Curl_rlimit_active(struct Curl_rlimit *r)
{
return (r->rate_per_step > 0) || r->blocked;
}
bool Curl_rlimit_is_blocked(struct Curl_rlimit *r)
{
return r->blocked;
}
static void ratelimit_update(struct Curl_rlimit *r,
struct curltime ts)
{
timediff_t elapsed_us, elapsed_steps;
curl_off_t token_gain;
DEBUGASSERT(r->rate_per_step);
if((r->ts.tv_sec == ts.tv_sec) && (r->ts.tv_usec == ts.tv_usec))
return;
elapsed_us = curlx_timediff_us(ts, r->ts);
if(elapsed_us < 0) { /* not going back in time */
curl_mfprintf(stderr, "rlimit: neg elapsed time %" FMT_TIMEDIFF_T "us\n",
elapsed_us);
DEBUGASSERT(0);
return;
}
elapsed_us += r->spare_us;
if(elapsed_us < r->step_us)
return;
/* we do the update */
r->ts = ts;
elapsed_steps = elapsed_us / r->step_us;
r->spare_us = elapsed_us % r->step_us;
/* How many tokens did we gain since the last update? */
if(r->rate_per_step > (CURL_OFF_T_MAX / elapsed_steps))
token_gain = CURL_OFF_T_MAX;
else {
token_gain = r->rate_per_step * elapsed_steps;
}
/* Limit the token again by the burst rate per second (if set), so we
* do not suddenly have a huge number of tokens after inactivity. */
r->tokens += token_gain;
if(r->burst_per_step && (r->tokens > r->burst_per_step)) {
r->tokens = r->burst_per_step;
}
}
curl_off_t Curl_rlimit_avail(struct Curl_rlimit *r,
struct curltime ts)
{
if(r->blocked)
return 0;
else if(r->rate_per_step) {
ratelimit_update(r, ts);
return r->tokens;
}
else
return CURL_OFF_T_MAX;
}
void Curl_rlimit_drain(struct Curl_rlimit *r,
size_t tokens,
struct curltime ts)
{
if(r->blocked || !r->rate_per_step)
return;
ratelimit_update(r, ts);
#if SIZEOF_CURL_OFF_T <= SIZEOF_SIZE_T
if(tokens > CURL_OFF_T_MAX) {
r->tokens = CURL_OFF_T_MIN;
return;
}
else
#endif
{
curl_off_t val = (curl_off_t)tokens;
if((CURL_OFF_T_MIN + val) < r->tokens)
r->tokens -= val;
else
r->tokens = CURL_OFF_T_MIN;
}
}
timediff_t Curl_rlimit_wait_ms(struct Curl_rlimit *r,
struct curltime ts)
{
timediff_t wait_us, elapsed_us;
if(r->blocked || !r->rate_per_step)
return 0;
ratelimit_update(r, ts);
if(r->tokens > 0)
return 0;
/* How much time will it take tokens to become positive again?
* Deduct `spare_us` and check against already elapsed time */
wait_us = (1 + (-r->tokens / r->rate_per_step)) * r->step_us;
wait_us -= r->spare_us;
elapsed_us = curlx_timediff_us(ts, r->ts);
if(elapsed_us >= wait_us)
return 0;
wait_us -= elapsed_us;
return (wait_us + 999) / 1000; /* in milliseconds */
}
void Curl_rlimit_block(struct Curl_rlimit *r,
bool activate,
struct curltime ts)
{
if(!activate == !r->blocked)
return;
r->ts = ts;
r->blocked = activate;
if(!r->blocked) {
/* Start rate limiting fresh. The amount of time this was blocked
* does not generate extra tokens. */
Curl_rlimit_start(r, ts);
}
else {
r->tokens = 0;
}
}

92
lib/ratelimit.h Normal file
View File

@ -0,0 +1,92 @@
#ifndef HEADER_Curl_rlimit_H
#define HEADER_Curl_rlimit_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curlx/timeval.h"
/* This is a rate limiter that provides "tokens" to be consumed
* per second with a "burst" rate limitation. Example:
* A rate limit of 1 megabyte per second with a burst rate of 1.5MB.
* - initially 1 million tokens are available.
* - these are drained in the first second.
* - checking available tokens before the 2nd second will return 0.
* - at/after the 2nd second, 1 million tokens are available again.
* - nothing happens for a second, the 1 million tokens would grow
* to 2 million, however the burst limit caps those at 1.5 million.
* Thus:
* - setting "burst" to CURL_OFF_T_MAX would average tokens over the
* complete lifetime. E.g. for a download, at the *end* of it, the
* average rate from start to finish would be the rate limit.
* - setting "burst" to the same value as "rate" would make a
* download always try to stay *at/below* the rate and slow times will
* not generate extra tokens.
* A rate limit can be blocked, causing the available tokens to become
* always 0 until unblocked. After unblocking, the rate limiting starts
* again with no history of the past.
* Finally, a rate limiter with rate 0 will always have CURL_OFF_T_MAX
* tokens available, unless blocked.
*/
struct Curl_rlimit {
curl_off_t rate_per_step; /* rate tokens are generated per step us */
curl_off_t burst_per_step; /* burst rate of tokens per step us */
timediff_t step_us; /* microseconds between token increases */
curl_off_t tokens; /* tokens available in the next second */
timediff_t spare_us; /* microseconds unaffecting tokens */
struct curltime ts; /* time of the last update */
BIT(blocked); /* blocking sets available tokens to 0 */
};
void Curl_rlimit_init(struct Curl_rlimit *r,
curl_off_t rate_per_s,
curl_off_t burst_per_s,
struct curltime ts);
/* Start ratelimiting with the given timestamp. Resets available tokens. */
void Curl_rlimit_start(struct Curl_rlimit *r, struct curltime ts);
/* How many milliseconds to wait until token are available again. */
timediff_t Curl_rlimit_wait_ms(struct Curl_rlimit *r,
struct curltime ts);
/* Return if rate limiting of tokens is active */
bool Curl_rlimit_active(struct Curl_rlimit *r);
bool Curl_rlimit_is_blocked(struct Curl_rlimit *r);
/* Return how many tokens are available to spend, may be negative */
curl_off_t Curl_rlimit_avail(struct Curl_rlimit *r,
struct curltime ts);
/* Drain tokens from the ratelimit, return how many are now available. */
void Curl_rlimit_drain(struct Curl_rlimit *r,
size_t tokens,
struct curltime ts);
/* Block/unblock ratelimiting. A blocked ratelimit has 0 tokens available. */
void Curl_rlimit_block(struct Curl_rlimit *r,
bool activate,
struct curltime ts);
#endif /* HEADER_Curl_rlimit_H */

View File

@ -258,7 +258,7 @@ static CURLcode req_set_upload_done(struct Curl_easy *data)
{
DEBUGASSERT(!data->req.upload_done);
data->req.upload_done = TRUE;
data->req.keepon &= ~(KEEP_SEND|KEEP_SEND_TIMED); /* we are done sending */
data->req.keepon &= ~KEEP_SEND; /* we are done sending */
Curl_pgrsTime(data, TIMER_POSTRANSFER);
Curl_creader_done(data, data->req.upload_aborted);
@ -420,9 +420,9 @@ bool Curl_req_want_send(struct Curl_easy *data)
* - or request has buffered data to send
* - or transfer connection has pending data to send */
return !data->req.done &&
(((data->req.keepon & KEEP_SENDBITS) == KEEP_SEND) ||
!Curl_req_sendbuf_empty(data) ||
Curl_xfer_needs_flush(data));
((data->req.keepon & KEEP_SEND) ||
!Curl_req_sendbuf_empty(data) ||
Curl_xfer_needs_flush(data));
}
bool Curl_req_done_sending(struct Curl_easy *data)
@ -458,8 +458,7 @@ CURLcode Curl_req_abort_sending(struct Curl_easy *data)
if(!data->req.upload_done) {
Curl_bufq_reset(&data->req.sendbuf);
data->req.upload_aborted = TRUE;
/* no longer KEEP_SEND and KEEP_SEND_PAUSE */
data->req.keepon &= ~KEEP_SENDBITS;
data->req.keepon &= ~KEEP_SEND;
return req_set_upload_done(data);
}
return CURLE_OK;
@ -470,6 +469,6 @@ CURLcode Curl_req_stop_send_recv(struct Curl_easy *data)
/* stop receiving and ALL sending as well, including PAUSE and HOLD.
* We might still be paused on receive client writes though, so
* keep those bits around. */
data->req.keepon &= ~(KEEP_RECV|KEEP_SENDBITS);
data->req.keepon &= ~(KEEP_RECV|KEEP_SEND);
return Curl_req_abort_sending(data);
}

View File

@ -130,6 +130,7 @@ struct SingleRequest {
BIT(sendbuf_init); /* sendbuf is initialized */
BIT(shutdown); /* request end will shutdown connection */
BIT(shutdown_err_ignore); /* errors in shutdown will not fail request */
BIT(reader_started); /* client reads have started */
};
/**

View File

@ -142,7 +142,7 @@ const struct Curl_handler Curl_handler_rtsp = {
ZERO_NULL, /* proto_pollset */
rtsp_do_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */
Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
rtsp_rtp_write_resp, /* write_resp */
rtsp_rtp_write_resp_hd, /* write_resp_hd */
@ -668,8 +668,7 @@ static CURLcode rtsp_do(struct Curl_easy *data, bool *done)
/* if a request-body has been sent off, we make sure this progress is
noted properly */
Curl_pgrsSetUploadCounter(data, data->req.writebytecount);
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
result = Curl_pgrsUpdate(data);
}
out:
curlx_dyn_free(&req_buffer);

View File

@ -711,7 +711,7 @@ void Curl_pollset_check(struct Curl_easy *data,
*pwant_read = *pwant_write = FALSE;
}
bool Curl_pollset_want_read(struct Curl_easy *data,
bool Curl_pollset_want_recv(struct Curl_easy *data,
struct easy_pollset *ps,
curl_socket_t sock)
{
@ -723,3 +723,16 @@ bool Curl_pollset_want_read(struct Curl_easy *data,
}
return FALSE;
}
bool Curl_pollset_want_send(struct Curl_easy *data,
struct easy_pollset *ps,
curl_socket_t sock)
{
unsigned int i;
(void)data;
for(i = 0; i < ps->n; ++i) {
if((ps->sockets[i] == sock) && (ps->actions[i] & CURL_POLL_OUT))
return TRUE;
}
return FALSE;
}

View File

@ -163,8 +163,12 @@ CURLcode Curl_pollset_set(struct Curl_easy *data,
#define Curl_pollset_add_in(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), CURL_POLL_IN, 0)
#define Curl_pollset_remove_in(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), 0, CURL_POLL_IN)
#define Curl_pollset_add_out(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), CURL_POLL_OUT, 0)
#define Curl_pollset_remove_out(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), 0, CURL_POLL_OUT)
#define Curl_pollset_add_inout(data, ps, sock) \
Curl_pollset_change((data), (ps), (sock), \
CURL_POLL_IN|CURL_POLL_OUT, 0)
@ -188,10 +192,12 @@ void Curl_pollset_check(struct Curl_easy *data,
struct easy_pollset *ps, curl_socket_t sock,
bool *pwant_read, bool *pwant_write);
/**
* Return TRUE if the pollset contains socket with CURL_POLL_IN.
*/
bool Curl_pollset_want_read(struct Curl_easy *data,
/* TRUE if the pollset contains socket with CURL_POLL_IN. */
bool Curl_pollset_want_recv(struct Curl_easy *data,
struct easy_pollset *ps,
curl_socket_t sock);
/* TRUE if the pollset contains socket with CURL_POLL_OUT. */
bool Curl_pollset_want_send(struct Curl_easy *data,
struct easy_pollset *ps,
curl_socket_t sock);

View File

@ -108,6 +108,7 @@ static void cl_reset_writer(struct Curl_easy *data)
static void cl_reset_reader(struct Curl_easy *data)
{
struct Curl_creader *reader = data->req.reader_stack;
data->req.reader_started = FALSE;
while(reader) {
data->req.reader_stack = reader->next;
reader->crt->do_close(data, reader);
@ -231,6 +232,7 @@ static CURLcode cw_download_write(struct Curl_easy *data,
if(!is_connect && !ctx->started_response) {
Curl_pgrsTime(data, TIMER_STARTTRANSFER);
Curl_rlimit_start(&data->progress.dl.rlimit, curlx_now());
ctx->started_response = TRUE;
}
@ -301,7 +303,9 @@ static CURLcode cw_download_write(struct Curl_easy *data,
if(result)
return result;
}
/* Update stats, write and report progress */
Curl_rlimit_drain(&data->progress.dl.rlimit, nwrite, curlx_now());
data->req.bytecount += nwrite;
Curl_pgrsSetDownloadCounter(data, data->req.bytecount);
@ -1198,9 +1202,28 @@ CURLcode Curl_client_read(struct Curl_easy *data, char *buf, size_t blen,
return result;
DEBUGASSERT(data->req.reader_stack);
}
if(!data->req.reader_started) {
Curl_rlimit_start(&data->progress.ul.rlimit, curlx_now());
data->req.reader_started = TRUE;
}
if(Curl_rlimit_active(&data->progress.ul.rlimit)) {
curl_off_t ul_avail =
Curl_rlimit_avail(&data->progress.ul.rlimit, curlx_now());
if(ul_avail <= 0) {
result = CURLE_OK;
*eos = FALSE;
goto out;
}
if(ul_avail < (curl_off_t)blen)
blen = (size_t)ul_avail;
}
result = Curl_creader_read(data, data->req.reader_stack, buf, blen,
nread, eos);
if(!result)
Curl_rlimit_drain(&data->progress.ul.rlimit, *nread, curlx_now());
out:
CURL_TRC_READ(data, "client_read(len=%zu) -> %d, nread=%zu, eos=%d",
blen, result, *nread, *eos);
return result;

View File

@ -2842,6 +2842,7 @@ static CURLcode setopt_offt(struct Curl_easy *data, CURLoption option,
if(offt < 0)
return CURLE_BAD_FUNCTION_ARGUMENT;
s->max_send_speed = offt;
Curl_rlimit_init(&data->progress.ul.rlimit, offt, offt, curlx_now());
break;
case CURLOPT_MAX_RECV_SPEED_LARGE:
/*
@ -2851,6 +2852,7 @@ static CURLcode setopt_offt(struct Curl_easy *data, CURLoption option,
if(offt < 0)
return CURLE_BAD_FUNCTION_ARGUMENT;
s->max_recv_speed = offt;
Curl_rlimit_init(&data->progress.dl.rlimit, offt, offt, curlx_now());
break;
case CURLOPT_RESUME_FROM_LARGE:
/*

View File

@ -1697,10 +1697,7 @@ static CURLcode smtp_regular_transfer(struct Curl_easy *data,
data->req.size = -1;
/* Set the progress data */
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
Curl_pgrsReset(data);
/* Carry out the perform */
result = smtp_perform(data, smtpc, smtp, &connected, dophase_done);

View File

@ -1,80 +0,0 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include <curl/curl.h>
#include "urldata.h"
#include "sendf.h"
#include "transfer.h"
#include "multiif.h"
#include "speedcheck.h"
void Curl_speedinit(struct Curl_easy *data)
{
memset(&data->state.keeps_speed, 0, sizeof(struct curltime));
}
/*
* @unittest: 1606
*/
CURLcode Curl_speedcheck(struct Curl_easy *data,
struct curltime now)
{
if(Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data))
/* A paused transfer is not qualified for speed checks */
return CURLE_OK;
if((data->progress.current_speed >= 0) && data->set.low_speed_time) {
if(data->progress.current_speed < data->set.low_speed_limit) {
if(!data->state.keeps_speed.tv_sec)
/* under the limit at this moment */
data->state.keeps_speed = now;
else {
/* how long has it been under the limit */
timediff_t howlong = curlx_timediff_ms(now, data->state.keeps_speed);
if(howlong >= data->set.low_speed_time * 1000) {
/* too long */
failf(data,
"Operation too slow. "
"Less than %ld bytes/sec transferred the last %ld seconds",
data->set.low_speed_limit,
data->set.low_speed_time);
return CURLE_OPERATION_TIMEDOUT;
}
}
}
else
/* faster right now */
data->state.keeps_speed.tv_sec = 0;
}
if(data->set.low_speed_limit)
/* if low speed limit is enabled, set the expire timer to make this
connection's speed get checked again in a second */
Curl_expire(data, 1000, EXPIRE_SPEEDCHECK);
return CURLE_OK;
}

View File

@ -1,35 +0,0 @@
#ifndef HEADER_CURL_SPEEDCHECK_H
#define HEADER_CURL_SPEEDCHECK_H
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curl_setup.h"
#include "curlx/timeval.h"
struct Curl_easy;
void Curl_speedinit(struct Curl_easy *data);
CURLcode Curl_speedcheck(struct Curl_easy *data,
struct curltime now);
#endif /* HEADER_CURL_SPEEDCHECK_H */

View File

@ -1659,9 +1659,10 @@ static CURLcode telnet_do(struct Curl_easy *data, bool *done)
}
}
if(Curl_pgrsUpdate(data)) {
result = CURLE_ABORTED_BY_CALLBACK;
break;
if(!result) {
result = Curl_pgrsUpdate(data);
if(result)
keepon = FALSE;
}
}
#endif

View File

@ -59,7 +59,6 @@
#include "multiif.h"
#include "url.h"
#include "strcase.h"
#include "speedcheck.h"
#include "select.h"
#include "escape.h"
#include "curlx/strerr.h"
@ -1175,9 +1174,10 @@ static CURLcode tftp_receive_packet(struct Curl_easy *data,
}
/* Update the progress meter */
if(Curl_pgrsUpdate(data)) {
result = Curl_pgrsUpdate(data);
if(result) {
tftp_state_machine(state, TFTP_EVENT_ERROR);
return CURLE_ABORTED_BY_CALLBACK;
return result;
}
}
return result;
@ -1297,10 +1297,7 @@ static CURLcode tftp_doing(struct Curl_easy *data, bool *dophase_done)
/* The multi code does not have this logic for the DOING state so we
provide it for TFTP since it may do the entire transfer in this
state. */
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, curlx_now());
result = Curl_pgrsCheck(data);
}
return result;
}

View File

@ -65,7 +65,6 @@
#include "cw-out.h"
#include "transfer.h"
#include "sendf.h"
#include "speedcheck.h"
#include "progress.h"
#include "http.h"
#include "url.h"
@ -241,10 +240,9 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
char *buf, *xfer_buf;
size_t blen, xfer_blen;
int maxloops = 10;
curl_off_t total_received = 0;
bool is_multiplex = FALSE;
bool rcvd_eagain = FALSE;
bool is_eos = FALSE;
bool is_eos = FALSE, rate_limited = FALSE;
result = Curl_multi_xfer_buf_borrow(data, &xfer_buf, &xfer_blen);
if(result)
@ -265,15 +263,21 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
buf = xfer_buf;
bytestoread = xfer_blen;
if(bytestoread && data->set.max_recv_speed > 0) {
/* In case of speed limit on receiving: if this loop already got
* a quarter of the quota, break out. We want to stutter a bit
* to keep in the limit, but too small receives will just cost
* cpu unnecessarily. */
if(total_received && (total_received >= (data->set.max_recv_speed / 4)))
if(bytestoread && Curl_rlimit_active(&data->progress.dl.rlimit)) {
curl_off_t dl_avail = Curl_rlimit_avail(&data->progress.dl.rlimit,
curlx_now());
/* DEBUGF(infof(data, "dl_rlimit, available=%" FMT_OFF_T, dl_avail));
*/
/* In case of rate limited downloads: if this loop already got
* data and less than 16k is left in the limit, break out.
* We want to stutter a bit to keep in the limit, but too small
* receives will just cost cpu unnecessarily. */
if(dl_avail <= 0) {
rate_limited = TRUE;
break;
if(data->set.max_recv_speed < (curl_off_t)bytestoread)
bytestoread = (size_t)data->set.max_recv_speed;
}
if(dl_avail < (curl_off_t)bytestoread)
bytestoread = (size_t)dl_avail;
}
rcvd_eagain = FALSE;
@ -315,7 +319,6 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
if(k->eos_written) /* already did write this to client, leave */
break;
}
total_received += blen;
result = Curl_xfer_write_resp(data, buf, blen, is_eos);
if(result || data->req.done)
@ -327,13 +330,13 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
if((!is_multiplex && data->req.download_done) || is_eos) {
data->req.keepon &= ~KEEP_RECV;
}
/* if we are PAUSEd or stopped receiving, leave the loop */
if((k->keepon & KEEP_RECV_PAUSE) || !(k->keepon & KEEP_RECV))
/* if we stopped receiving, leave the loop */
if(!(k->keepon & KEEP_RECV))
break;
} while(maxloops--);
if(!is_eos && !Curl_xfer_is_blocked(data) &&
if(!is_eos && !rate_limited && CURL_WANT_RECV(data) &&
(!rcvd_eagain || data_pending(data, rcvd_eagain))) {
/* Did not read until EAGAIN/EOS or there is still data pending
* in buffers. Mark as read-again via simulated SELECT results. */
@ -396,16 +399,13 @@ CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp)
}
/* If we still have writing to do, we check if we have a writable socket. */
if(Curl_req_want_send(data) || (data->req.keepon & KEEP_SEND_TIMED)) {
if(Curl_req_want_send(data)) {
result = sendrecv_ul(data);
if(result)
goto out;
}
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, *nowp);
result = Curl_pgrsCheck(data);
if(result)
goto out;
@ -440,16 +440,14 @@ CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp)
result = CURLE_PARTIAL_FILE;
goto out;
}
if(Curl_pgrsUpdate(data)) {
result = CURLE_ABORTED_BY_CALLBACK;
goto out;
}
}
/* If there is nothing more to send/recv, the request is done */
if((k->keepon & (KEEP_RECVBITS|KEEP_SENDBITS)) == 0)
if((k->keepon & (KEEP_RECV|KEEP_SEND)) == 0)
data->req.done = TRUE;
result = Curl_pgrsUpdate(data);
out:
if(result)
DEBUGF(infof(data, "Curl_sendrecv() -> %d", result));
@ -913,51 +911,30 @@ bool Curl_xfer_is_blocked(struct Curl_easy *data)
bool Curl_xfer_send_is_paused(struct Curl_easy *data)
{
return (data->req.keepon & KEEP_SEND_PAUSE);
return Curl_rlimit_is_blocked(&data->progress.ul.rlimit);
}
bool Curl_xfer_recv_is_paused(struct Curl_easy *data)
{
return (data->req.keepon & KEEP_RECV_PAUSE);
return Curl_rlimit_is_blocked(&data->progress.dl.rlimit);
}
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable)
{
CURLcode result = CURLE_OK;
if(enable) {
data->req.keepon |= KEEP_SEND_PAUSE;
}
else {
data->req.keepon &= ~KEEP_SEND_PAUSE;
if(Curl_creader_is_paused(data))
result = Curl_creader_unpause(data);
}
Curl_rlimit_block(&data->progress.ul.rlimit, enable, curlx_now());
if(!enable && Curl_creader_is_paused(data))
result = Curl_creader_unpause(data);
return result;
}
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable)
{
CURLcode result = CURLE_OK;
if(enable) {
data->req.keepon |= KEEP_RECV_PAUSE;
}
else {
data->req.keepon &= ~KEEP_RECV_PAUSE;
if(Curl_cwriter_is_paused(data))
result = Curl_cwriter_unpause(data);
}
Curl_rlimit_block(&data->progress.dl.rlimit, enable, curlx_now());
if(!enable && Curl_cwriter_is_paused(data))
result = Curl_cwriter_unpause(data);
Curl_conn_ev_data_pause(data, enable);
Curl_pgrsRecvPause(data, enable);
return result;
}
bool Curl_xfer_is_too_fast(struct Curl_easy *data)
{
struct Curl_llist_node *e = Curl_llist_head(&data->state.timeoutlist);
while(e) {
struct time_node *n = Curl_node_elem(e);
e = Curl_node_next(e);
if(n->eid == EXPIRE_TOOFAST)
return TRUE;
}
return FALSE;
}

View File

@ -143,7 +143,4 @@ bool Curl_xfer_recv_is_paused(struct Curl_easy *data);
CURLcode Curl_xfer_pause_send(struct Curl_easy *data, bool enable);
CURLcode Curl_xfer_pause_recv(struct Curl_easy *data, bool enable);
/* Query if transfer has expire timeout TOOFAST set. */
bool Curl_xfer_is_too_fast(struct Curl_easy *data);
#endif /* HEADER_CURL_TRANSFER_H */

View File

@ -88,7 +88,6 @@
#include "select.h"
#include "multiif.h"
#include "easyif.h"
#include "speedcheck.h"
#include "curlx/warnless.h"
#include "getinfo.h"
#include "pop3.h"
@ -3884,7 +3883,6 @@ CURLcode Curl_connect(struct Curl_easy *data,
CURLcode Curl_init_do(struct Curl_easy *data, struct connectdata *conn)
{
/* if this is a pushed stream, we need this: */
CURLcode result;
if(conn) {
@ -3904,9 +3902,7 @@ CURLcode Curl_init_do(struct Curl_easy *data, struct connectdata *conn)
result = Curl_req_start(&data->req, data);
if(!result) {
Curl_speedinit(data);
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsReset(data);
}
return result;
}

View File

@ -156,6 +156,7 @@ typedef unsigned int curl_prot_t;
#include "curlx/dynbuf.h"
#include "dynhds.h"
#include "request.h"
#include "ratelimit.h"
#include "netrc.h"
/* On error return, the value of `pnwritten` has no meaning */
@ -426,30 +427,11 @@ struct hostname {
#define KEEP_NONE 0
#define KEEP_RECV (1<<0) /* there is or may be data to read */
#define KEEP_SEND (1<<1) /* there is or may be data to write */
#define KEEP_RECV_HOLD (1<<2) /* when set, no reading should be done but there
might still be data to read */
#define KEEP_SEND_HOLD (1<<3) /* when set, no writing should be done but there
might still be data to write */
#define KEEP_RECV_PAUSE (1<<4) /* reading is paused */
#define KEEP_SEND_PAUSE (1<<5) /* writing is paused */
/* KEEP_SEND_TIMED is set when the transfer should attempt sending
* at timer (or other) events. A transfer waiting on a timer will
* remove KEEP_SEND to suppress POLLOUTs of the connection.
* Adding KEEP_SEND_TIMED will then attempt to send whenever the transfer
* enters the "readwrite" loop, e.g. when a timer fires.
* This is used in HTTP for 'Expect: 100-continue' waiting. */
#define KEEP_SEND_TIMED (1<<6)
#define KEEP_RECVBITS (KEEP_RECV | KEEP_RECV_HOLD | KEEP_RECV_PAUSE)
#define KEEP_SENDBITS (KEEP_SEND | KEEP_SEND_HOLD | KEEP_SEND_PAUSE)
/* transfer wants to send is not PAUSE or HOLD */
#define CURL_WANT_SEND(data) \
(((data)->req.keepon & KEEP_SENDBITS) == KEEP_SEND)
/* transfer receive is not on PAUSE or HOLD */
#define CURL_WANT_RECV(data) \
(((data)->req.keepon & KEEP_RECVBITS) == KEEP_RECV)
/* transfer wants to send */
#define CURL_WANT_SEND(data) ((data)->req.keepon & KEEP_SEND)
/* transfer wants to receive */
#define CURL_WANT_RECV(data) ((data)->req.keepon & KEEP_RECV)
#define FIRSTSOCKET 0
#define SECONDARYSOCKET 1
@ -805,16 +787,11 @@ struct PureInfo {
BIT(used_proxy); /* the transfer used a proxy */
};
struct pgrs_measure {
struct curltime start; /* when measure started */
curl_off_t start_size; /* the 'cur_size' the measure started at */
};
struct pgrs_dir {
curl_off_t total_size; /* total expected bytes */
curl_off_t cur_size; /* transferred bytes so far */
curl_off_t speed; /* bytes per second transferred */
struct pgrs_measure limit;
struct Curl_rlimit rlimit; /* speed limiting / pausing */
};
struct Progress {
@ -843,10 +820,10 @@ struct Progress {
struct curltime t_startqueue;
struct curltime t_acceptdata;
#define CURR_TIME (5 + 1) /* 6 entries for 5 seconds */
#define CURL_SPEED_RECORDS (5 + 1) /* 6 entries for 5 seconds */
curl_off_t speeder[ CURR_TIME ];
struct curltime speeder_time[ CURR_TIME ];
curl_off_t speed_amount[ CURL_SPEED_RECORDS ];
struct curltime speed_time[ CURL_SPEED_RECORDS ];
unsigned char speeder_c;
BIT(hide);
BIT(ul_size_known);

View File

@ -80,10 +80,9 @@
#define QUIC_HANDSHAKE_TIMEOUT (10*NGTCP2_SECONDS)
/* A stream window is the maximum amount we need to buffer for
* each active transfer. We use HTTP/3 flow control and only ACK
* when we take things out of the buffer.
* each active transfer.
* Chunk size is large enough to take a full DATA frame */
#define H3_STREAM_WINDOW_SIZE (128 * 1024)
#define H3_STREAM_WINDOW_SIZE (64 * 1024)
#define H3_STREAM_CHUNK_SIZE (16 * 1024)
#if H3_STREAM_CHUNK_SIZE < NGTCP2_MAX_UDP_PAYLOAD_SIZE
#error H3_STREAM_CHUNK_SIZE smaller than NGTCP2_MAX_UDP_PAYLOAD_SIZE
@ -242,6 +241,7 @@ struct h3_stream_ctx {
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
curl_uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */
uint64_t download_unacked; /* bytes not acknowledged yet */
int status_code; /* HTTP status code */
CURLcode xfer_result; /* result from xfer_resp_write(_hd) */
BIT(resp_hds_complete); /* we have a complete, final response */
@ -472,7 +472,7 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx,
s->handshake_timeout = (data->set.connecttimeout > 0) ?
data->set.connecttimeout * NGTCP2_MILLISECONDS : QUIC_HANDSHAKE_TIMEOUT;
s->max_window = 100 * ctx->max_stream_window;
s->max_stream_window = 10 * ctx->max_stream_window;
s->max_stream_window = ctx->max_stream_window;
s->no_pmtud = FALSE;
#ifdef NGTCP2_SETTINGS_V3
/* try ten times the ngtcp2 defaults here for problems with Caddy */
@ -1057,6 +1057,35 @@ static void h3_xfer_write_resp(struct Curl_cfilter *cf,
}
}
static void cf_ngtcp2_ack_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct h3_stream_ctx *stream)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct curltime now = curlx_now();
curl_off_t avail;
uint64_t ack_len = 0;
/* How many byte to ack on the stream? */
/* how much does rate limiting allow us to acknowledge? */
avail = Curl_rlimit_avail(&data->progress.dl.rlimit, now);
if(avail == CURL_OFF_T_MAX) { /* no rate limit, ack all */
ack_len = stream->download_unacked;
}
else if(avail > 0) {
ack_len = CURLMIN(stream->download_unacked, (uint64_t)avail);
}
if(ack_len) {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] ACK %" PRIu64
"/%" PRIu64 " bytes of DATA", stream->id,
ack_len, stream->download_unacked);
ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, ack_len);
stream->download_unacked -= ack_len;
}
}
static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
const uint8_t *buf, size_t blen,
void *user_data, void *stream_user_data)
@ -1073,13 +1102,15 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
return NGHTTP3_ERR_CALLBACK_FAILURE;
h3_xfer_write_resp(cf, data, stream, (const char *)buf, blen, FALSE);
if(blen) {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] ACK %zu bytes of DATA",
stream->id, blen);
ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, blen);
ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
}
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu", stream->id, blen);
ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
if(UINT64_MAX - blen < stream->download_unacked)
stream->download_unacked = UINT64_MAX; /* unlikely */
else
stream->download_unacked += blen;
cf_ngtcp2_ack_stream(cf, data, stream);
return 0;
}
@ -1374,6 +1405,8 @@ static CURLcode cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
cf_ngtcp2_ack_stream(cf, data, stream);
if(cf_progress_ingress(cf, data, &pktx)) {
result = CURLE_RECV_ERROR;
goto out;

View File

@ -54,7 +54,6 @@
#include "../http.h" /* for HTTP proxy tunnel stuff */
#include "ssh.h"
#include "../url.h"
#include "../speedcheck.h"
#include "../vtls/vtls.h"
#include "../cfilters.h"
#include "../connect.h"
@ -2481,17 +2480,13 @@ static CURLcode myssh_block_statemach(struct Curl_easy *data,
while((sshc->state != SSH_STOP) && !result) {
bool block;
timediff_t left_ms = 1000;
struct curltime now = curlx_now();
result = myssh_statemach_act(data, sshc, sshp, &block);
if(result)
break;
if(!disconnect) {
if(Curl_pgrsUpdate(data))
return CURLE_ABORTED_BY_CALLBACK;
result = Curl_speedcheck(data, now);
result = Curl_pgrsCheck(data);
if(result)
break;
@ -2746,10 +2741,7 @@ static CURLcode myssh_do_it(struct Curl_easy *data, bool *done)
sshc->secondCreateDirs = 0; /* reset the create directory attempt state
variable */
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
Curl_pgrsReset(data);
if(conn->handler->protocol & CURLPROTO_SCP)
result = scp_perform(data, &connected, done);

View File

@ -53,7 +53,6 @@
#include "../http.h" /* for HTTP proxy tunnel stuff */
#include "ssh.h"
#include "../url.h"
#include "../speedcheck.h"
#include "../vtls/vtls.h"
#include "../cfilters.h"
#include "../connect.h"
@ -3135,10 +3134,7 @@ static CURLcode ssh_block_statemach(struct Curl_easy *data,
break;
if(!disconnect) {
if(Curl_pgrsUpdate(data))
return CURLE_ABORTED_BY_CALLBACK;
result = Curl_speedcheck(data, now);
result = Curl_pgrsCheck(data);
if(result)
break;
@ -3534,10 +3530,7 @@ static CURLcode ssh_do(struct Curl_easy *data, bool *done)
sshc->secondCreateDirs = 0; /* reset the create directory attempt state
variable */
Curl_pgrsSetUploadCounter(data, 0);
Curl_pgrsSetDownloadCounter(data, 0);
Curl_pgrsSetUploadSize(data, -1);
Curl_pgrsSetDownloadSize(data, -1);
Curl_pgrsReset(data);
if(conn->handler->protocol & CURLPROTO_SCP)
result = scp_perform(data, &connected, done);

View File

@ -1811,10 +1811,9 @@ schannel_recv_renegotiate(struct Curl_cfilter *cf, struct Curl_easy *data,
int what;
timediff_t timeout_ms, remaining;
if(Curl_pgrsUpdate(data)) {
result = CURLE_ABORTED_BY_CALLBACK;
result = Curl_pgrsUpdate(data);
if(result)
break;
}
elapsed = curlx_timediff_ms(curlx_now(), rs->start_time);
if(elapsed >= MAX_RENEG_BLOCK_TIME) {

View File

@ -1927,9 +1927,9 @@ const struct Curl_handler Curl_handler_ws = {
ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */
ZERO_NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */
Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */
Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */
@ -1954,9 +1954,9 @@ const struct Curl_handler Curl_handler_wss = {
NULL, /* connecting */
ZERO_NULL, /* doing */
NULL, /* proto_pollset */
Curl_http_do_pollset, /* doing_pollset */
Curl_http_doing_pollset, /* doing_pollset */
ZERO_NULL, /* domore_pollset */
ZERO_NULL, /* perform_pollset */
Curl_http_perform_pollset, /* perform_pollset */
ZERO_NULL, /* disconnect */
Curl_http_write_resp, /* write_resp */
Curl_http_write_resp_hd, /* write_resp_hd */

View File

@ -280,7 +280,7 @@ test3032 test3033 test3034 test3035 \
test3100 test3101 test3102 test3103 test3104 test3105 \
\
test3200 test3201 test3202 test3203 test3204 test3205 test3206 test3207 test3208 \
test3209 test3210 test3211 test3212 test3213 test3214 test3215 \
test3209 test3210 test3211 test3212 test3213 test3214 test3215 test3216 \
test4000 test4001
EXTRA_DIST = $(TESTCASES) DISABLED data-xml1

19
tests/data/test3216 Normal file
View File

@ -0,0 +1,19 @@
<testcase>
<info>
<keywords>
unittest
ratelimit
</keywords>
</info>
#
# Client-side
<client>
<features>
unittest
</features>
<name>
ratelimit unit tests
</name>
</client>
</testcase>

View File

@ -27,11 +27,9 @@
import difflib
import filecmp
import logging
import math
import os
import re
import sys
from datetime import timedelta
import pytest
from testenv import Env, CurlClient, LocalClient
@ -424,15 +422,17 @@ class TestDownload:
count = 1
url = f'https://{env.authority_for(env.domain1, proto)}/data-1m'
curl = CurlClient(env=env)
speed_limit = 384 * 1024
min_duration = math.floor((1024 * 1024)/speed_limit)
speed_limit = 256 * 1024
r = curl.http_download(urls=[url], alpn_proto=proto, extra_args=[
'--limit-rate', f'{speed_limit}'
])
r.check_response(count=count, http_status=200)
assert r.duration > timedelta(seconds=min_duration), \
f'rate limited transfer should take more than {min_duration}s, '\
f'not {r.duration}'
dl_speed = r.stats[0]['speed_download']
# speed limit is only exact on long durations. Ideally this transfer
# would take 4 seconds, but it may end just after 3 because then
# we have downloaded the rest and will not wait for the rate
# limit to increase again.
assert dl_speed <= ((1024*1024)/3), f'{r.stats[0]}'
# make extreme parallel h2 upgrades, check invalid conn reuse
# before protocol switch has happened

View File

@ -557,7 +557,7 @@ class TestUpload:
r.check_response(count=count, http_status=200)
assert r.responses[0]['header']['received-length'] == f'{up_len}', f'{r.responses[0]}'
up_speed = r.stats[0]['speed_upload']
assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
assert up_speed <= (speed_limit * 1.1), f'{r.stats[0]}'
# speed limited on echo handler
@pytest.mark.parametrize("proto", Env.http_protos())
@ -573,7 +573,7 @@ class TestUpload:
])
r.check_response(count=count, http_status=200)
up_speed = r.stats[0]['speed_upload']
assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
assert up_speed <= (speed_limit * 1.1), f'{r.stats[0]}'
# upload larger data, triggering "Expect: 100-continue" code paths
@pytest.mark.parametrize("proto", ['http/1.1'])

View File

@ -42,4 +42,4 @@ TESTS_C = \
unit1979.c unit1980.c \
unit2600.c unit2601.c unit2602.c unit2603.c unit2604.c unit2605.c \
unit3200.c unit3205.c \
unit3211.c unit3212.c unit3213.c unit3214.c
unit3211.c unit3212.c unit3213.c unit3214.c unit3216.c

View File

@ -23,7 +23,7 @@
***************************************************************************/
#include "unitcheck.h"
#include "speedcheck.h"
#include "progress.h"
#include "urldata.h"
static CURLcode t1606_setup(struct Curl_easy **easy)
@ -58,12 +58,12 @@ static int runawhile(struct Curl_easy *easy,
curl_easy_setopt(easy, CURLOPT_LOW_SPEED_LIMIT, speed_limit);
curl_easy_setopt(easy, CURLOPT_LOW_SPEED_TIME, time_limit);
Curl_speedinit(easy);
Curl_pgrsReset(easy);
do {
/* fake the current transfer speed */
easy->progress.current_speed = speed;
res = Curl_speedcheck(easy, now);
res = pgrs_speedcheck(easy, &now);
if(res)
break;
/* step the time */

103
tests/unit/unit3216.c Normal file
View File

@ -0,0 +1,103 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "unitcheck.h"
#include "ratelimit.h"
static CURLcode test_unit3216(const char *arg)
{
UNITTEST_BEGIN_SIMPLE
struct Curl_rlimit r;
struct curltime ts;
/* A ratelimit that is unlimited */
ts = curlx_now();
Curl_rlimit_init(&r, 0, 0, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX, "inf");
Curl_rlimit_drain(&r, 1000000, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX, "drain keep inf");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 0, "inf never waits");
Curl_rlimit_block(&r, TRUE, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "inf blocked to 0");
Curl_rlimit_drain(&r, 1000000, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "blocked inf");
Curl_rlimit_block(&r, FALSE, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == CURL_OFF_T_MAX,
"unblocked unlimited");
/* A ratelimit that give 10 tokens per second */
ts = curlx_now();
Curl_rlimit_init(&r, 10, 0, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 10, "initial 10");
Curl_rlimit_drain(&r, 5, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 5, "drain to 5");
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 2, "drain to 2");
ts.tv_usec += 1000; /* 1ms */
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == -1, "drain to -1");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 999, "wait 999ms");
ts.tv_usec += 1000; /* 1ms */
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 998, "wait 998ms");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 9, "10 inc per sec");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 19, "10 inc per sec(2)");
Curl_rlimit_block(&r, TRUE, curlx_now());
fail_unless(Curl_rlimit_avail(&r, curlx_now()) == 0, "10 blocked to 0");
Curl_rlimit_block(&r, FALSE, curlx_now());
fail_unless(Curl_rlimit_avail(&r, curlx_now()) == 10, "unblocked 10");
/* A ratelimit that give 10 tokens per second, max burst 15/s */
ts = curlx_now();
Curl_rlimit_init(&r, 10, 15, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 10, "initial 10");
Curl_rlimit_drain(&r, 5, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 5, "drain to 5");
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 2, "drain to 2");
Curl_rlimit_drain(&r, 3, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == -1, "drain to -1");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 9, "10 inc per sec");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10/15 burst limit");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10/15 burst limit(2)");
Curl_rlimit_drain(&r, 15, ts);
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "drain to 0");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 1000, "wait 1 sec");
ts.tv_usec += 500000; /* half a sec, cheating on second carry */
fail_unless(Curl_rlimit_avail(&r, ts) == 0, "0 after 0.5 sec");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 500, "wait 0.5 sec");
ts.tv_sec += 1;
fail_unless(Curl_rlimit_avail(&r, ts) == 10, "10 after 1.5 sec");
fail_unless(Curl_rlimit_wait_ms(&r, ts) == 0, "wait 0");
ts.tv_usec += 500000; /* half a sec, cheating on second carry */
fail_unless(Curl_rlimit_avail(&r, ts) == 15, "10 after 2 sec");
UNITTEST_END_SIMPLE
}