diff --git a/CHANGES b/CHANGES index 522da49..e6c513a 100644 --- a/CHANGES +++ b/CHANGES @@ -9,3 +9,5 @@ wrk next * Record latency for every request instead of random samples. * Latency and requests in done() are now callable, not indexable. * Only record timeouts when a response is actually received. + * Remove calibration phase and record rate at fixed interval. + * Improve correction of coordinated omission. diff --git a/src/main.h b/src/main.h index 12ad584..0cd7322 100644 --- a/src/main.h +++ b/src/main.h @@ -31,7 +31,6 @@ static void *thread_main(void *); static int connect_socket(thread *, connection *); static int reconnect_socket(thread *, connection *); -static int calibrate(aeEventLoop *, long long, void *); static int record_rate(aeEventLoop *, long long, void *); static void socket_connected(aeEventLoop *, int, void *, int); diff --git a/src/stats.c b/src/stats.c index db37275..92409ec 100644 --- a/src/stats.c +++ b/src/stats.c @@ -30,6 +30,18 @@ int stats_record(stats *stats, uint64_t n) { return 1; } +void stats_correct(stats *stats, int64_t expected) { + for (uint64_t n = expected * 2; n <= stats->max; n++) { + uint64_t count = stats->data[n]; + int64_t m = (int64_t) n - expected; + while (count && m > expected) { + stats->data[m] += count; + stats->count += count; + m -= expected; + } + } +} + long double stats_mean(stats *stats) { if (stats->count == 0) return 0.0; diff --git a/src/stats.h b/src/stats.h index 024579a..92dfc42 100644 --- a/src/stats.h +++ b/src/stats.h @@ -27,6 +27,7 @@ stats *stats_alloc(uint64_t); void stats_free(stats *); int stats_record(stats *, uint64_t); +void stats_correct(stats *, int64_t); long double stats_mean(stats *); long double stats_stdev(stats *stats, long double); diff --git a/src/wrk.c b/src/wrk.c index 9dfa3c7..cd99fa9 100644 --- a/src/wrk.c +++ b/src/wrk.c @@ -101,7 +101,6 @@ int main(int argc, char **argv) { thread *t = &threads[i]; t->loop = aeCreateEventLoop(10 + cfg.connections * 3); t->connections = cfg.connections / cfg.threads; - t->latency = stats_alloc(cfg.timeout * 1000); t->L = script_create(cfg.script, url, headers); script_init(L, t, argc - optind, &argv[optind]); @@ -161,6 +160,11 @@ int main(int argc, char **argv) { long double req_per_s = complete / runtime_s; long double bytes_per_s = bytes / runtime_s; + if (complete / cfg.connections > 0) { + int64_t interval = runtime_us / (complete / cfg.connections); + stats_correct(statistics.latency, interval); + } + print_stats_header(); print_stats("Latency", statistics.latency, format_time_us); print_stats("Req/Sec", statistics.requests, format_metric); @@ -212,7 +216,7 @@ void *thread_main(void *arg) { } aeEventLoop *loop = thread->loop; - aeCreateTimeEvent(loop, CALIBRATE_DELAY_MS, calibrate, thread, NULL); + aeCreateTimeEvent(loop, RECORD_INTERVAL_MS, record_rate, thread, NULL); thread->start = time_us(); aeMain(loop); @@ -220,11 +224,6 @@ void *thread_main(void *arg) { aeDeleteEventLoop(loop); zfree(thread->cs); - uint64_t max = thread->latency->max; - for (uint64_t i = 0; i < thread->missed; i++) { - stats_record(statistics.latency, max); - } - return NULL; } @@ -265,44 +264,22 @@ static int reconnect_socket(thread *thread, connection *c) { return connect_socket(thread, c); } -static int calibrate(aeEventLoop *loop, long long id, void *data) { - thread *thread = data; - - long double latency = stats_percentile(thread->latency, 90.0) / 1000.0L; - long double interval = MAX(latency * 2, 10); - long double rate = (interval / latency) * thread->connections; - - if (latency == 0 && !stop) return CALIBRATE_DELAY_MS; - - stats_free(thread->latency); - - thread->interval = interval; - thread->rate = ceil(rate / 10); - thread->start = time_us(); - thread->requests = 0; - thread->latency = statistics.latency; - - aeCreateTimeEvent(loop, thread->interval, record_rate, thread, NULL); - - return AE_NOMORE; -} - static int record_rate(aeEventLoop *loop, long long id, void *data) { thread *thread = data; - uint64_t elapsed_ms = (time_us() - thread->start) / 1000; - uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000; - uint64_t missed = thread->rate - MIN(thread->rate, thread->latency->count); + if (thread->requests > 0) { + uint64_t elapsed_ms = (time_us() - thread->start) / 1000; + uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000; - stats_record(statistics.requests, requests); + stats_record(statistics.requests, requests); - thread->missed += missed; - thread->requests = 0; - thread->start = time_us(); + thread->requests = 0; + thread->start = time_us(); + } if (stop) aeStop(loop); - return thread->interval; + return RECORD_INTERVAL_MS; } static int header_field(http_parser *parser, const char *at, size_t len) { @@ -351,7 +328,7 @@ static int response_complete(http_parser *parser) { } if (--c->pending == 0) { - if (!stats_record(thread->latency, now - c->start)) { + if (!stats_record(statistics.latency, now - c->start)) { thread->errors.timeout++; } aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c); diff --git a/src/wrk.h b/src/wrk.h index 98121cd..7d1bba3 100644 --- a/src/wrk.h +++ b/src/wrk.h @@ -21,21 +21,17 @@ #define MAX_THREAD_RATE_S 10000000 #define SOCKET_TIMEOUT_MS 2000 -#define CALIBRATE_DELAY_MS 500 +#define RECORD_INTERVAL_MS 100 typedef struct { pthread_t thread; aeEventLoop *loop; struct addrinfo *addr; uint64_t connections; - int interval; uint64_t complete; uint64_t requests; uint64_t bytes; uint64_t start; - uint64_t rate; - uint64_t missed; - stats *latency; lua_State *L; errors errors; struct connection *cs;