1
0
mirror of https://github.com/wg/wrk synced 2025-01-08 23:32:54 +08:00

rewrite stats recording and analysis

This commit is contained in:
Will 2013-05-29 09:37:18 +09:00
parent 1b7161cf0e
commit 6fd3ee1080
4 changed files with 100 additions and 33 deletions

View File

@ -8,17 +8,27 @@
#include "zmalloc.h"
stats *stats_alloc(uint64_t samples) {
stats *stats = zcalloc(sizeof(stats) + sizeof(uint64_t) * samples);
stats->samples = samples;
return stats;
stats *s = zcalloc(sizeof(stats) + sizeof(uint64_t) * samples);
s->samples = samples;
s->min = UINT64_MAX;
return s;
}
void stats_free(stats *stats) {
zfree(stats);
}
void stats_reset(stats *stats) {
stats->limit = 0;
stats->index = 0;
stats->min = UINT64_MAX;
stats->max = 0;
}
void stats_record(stats *stats, uint64_t x) {
stats->data[stats->index++] = x;
if (x < stats->min) stats->min = x;
if (x > stats->max) stats->max = x;
if (stats->limit < stats->samples) stats->limit++;
if (stats->index == stats->samples) stats->index = 0;
}
@ -29,12 +39,9 @@ static int stats_compare(const void *a, const void *b) {
return *x - *y;
}
long double stats_summarize(stats *stats, int64_t *min, uint64_t *max) {
long double stats_summarize(stats *stats) {
qsort(stats->data, stats->limit, sizeof(uint64_t), &stats_compare);
if (min) *min = stats->data[0];
if (max) *max = stats->data[stats->limit - 1];
if (stats->limit == 0) return 0.0;
uint64_t sum = 0;
@ -70,3 +77,19 @@ uint64_t stats_percentile(stats *stats, long double p) {
uint64_t rank = round((p / 100.0) * stats->limit + 0.5);
return stats->data[rank - 1];
}
void stats_sample(stats *dst, tinymt64_t *state, uint64_t count, stats *src) {
for (uint64_t i = 0; i < count; i++) {
uint64_t n = rand64(state, src->limit);
stats_record(dst, src->data[n]);
}
}
uint64_t rand64(tinymt64_t *state, uint64_t n) {
uint64_t x, max = ~UINT64_C(0);
max -= max % n;
do {
x = tinymt64_generate_uint64(state);
} while (x >= max);
return x % n;
}

View File

@ -1,20 +1,33 @@
#ifndef STATS_H
#define STATS_H
#include <stdbool.h>
#include "tinymt64.h"
#define MAX(X, Y) ((X) > (Y) ? (X) : (Y))
#define MIN(X, Y) ((X) < (Y) ? (X) : (Y))
typedef struct {
uint64_t samples;
uint64_t index;
uint64_t limit;
uint64_t min;
uint64_t max;
uint64_t data[];
} stats;
stats *stats_alloc(uint64_t);
void stats_free(stats *);
void stats_reset(stats *);
void stats_record(stats *, uint64_t);
long double stats_summarize(stats *, int64_t *, uint64_t *);
long double stats_summarize(stats *);
long double stats_stdev(stats *stats, long double);
long double stats_within_stdev(stats *, long double, long double, uint64_t);
uint64_t stats_percentile(stats *, long double);
#endif /* STATS_H */
void stats_sample(stats *, tinymt64_t *, uint64_t, stats *);
uint64_t rand64(tinymt64_t *, uint64_t);
#endif /* STATS_H */

View File

@ -221,16 +221,16 @@ void *thread_main(void *arg) {
thread->cs = zmalloc(thread->connections * sizeof(connection));
thread->loop = loop;
tinymt64_init(&thread->rand, time_us());
thread->latency = stats_alloc(100000);
connection *c = thread->cs;
for (uint64_t i = 0; i < thread->connections; i++, c++) {
c->thread = thread;
c->latency = 0;
c->thread = thread;
connect_socket(thread, c);
}
aeCreateTimeEvent(loop, SAMPLE_INTERVAL_MS, sample_rate, thread, NULL);
aeCreateTimeEvent(loop, CALIBRATE_DELAY_MS, calibrate, thread, NULL);
aeCreateTimeEvent(loop, TIMEOUT_INTERVAL_MS, check_timeouts, thread, NULL);
thread->start = time_us();
@ -239,6 +239,15 @@ void *thread_main(void *arg) {
aeDeleteEventLoop(loop);
zfree(thread->cs);
uint64_t max = thread->latency->max;
stats_free(thread->latency);
pthread_mutex_lock(&statistics.mutex);
for (uint64_t i = 0; i < thread->missed; i++) {
stats_record(statistics.latency, max);
}
pthread_mutex_unlock(&statistics.mutex);
return NULL;
}
@ -285,20 +294,45 @@ static int reconnect_socket(thread *thread, connection *c) {
return connect_socket(thread, c);
}
static int calibrate(aeEventLoop *loop, long long id, void *data) {
thread *thread = data;
uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
uint64_t req_per_ms = thread->requests / elapsed_ms;
if (!req_per_ms) return CALIBRATE_DELAY_MS / 2;
thread->rate = (req_per_ms * SAMPLE_INTERVAL_MS) / 10;
thread->start = time_us();
thread->requests = 0;
stats_reset(thread->latency);
aeCreateTimeEvent(loop, SAMPLE_INTERVAL_MS, sample_rate, thread, NULL);
return AE_NOMORE;
}
static int sample_rate(aeEventLoop *loop, long long id, void *data) {
thread *thread = data;
uint64_t n = rand64(&thread->rand, thread->connections);
uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
connection *c = thread->cs + n;
uint64_t requests = (thread->complete / elapsed_ms) * 1000;
uint64_t requests = (thread->requests / elapsed_ms) * 1000;
uint64_t missed = thread->rate - MIN(thread->rate, thread->latency->limit);
uint64_t count = thread->rate - missed;
pthread_mutex_lock(&statistics.mutex);
stats_record(statistics.latency, c->latency);
stats_sample(statistics.latency, &thread->rand, count, thread->latency);
stats_record(statistics.requests, requests);
pthread_mutex_unlock(&statistics.mutex);
return SAMPLE_INTERVAL_MS + rand64(&thread->rand, SAMPLE_INTERVAL_MS);
uint64_t max = thread->latency->max;
thread->missed += missed;
thread->requests = 0;
thread->start = time_us();
stats_reset(thread->latency);
thread->latency->max = max;
return SAMPLE_INTERVAL_MS;
}
static int request_complete(http_parser *parser) {
@ -307,7 +341,9 @@ static int request_complete(http_parser *parser) {
uint64_t now = time_us();
thread->complete++;
c->latency = now - c->start;
thread->requests++;
stats_record(thread->latency, now - c->start);
if (parser->status_code > 399) {
thread->errors.status++;
@ -387,15 +423,6 @@ static uint64_t time_us() {
return (t.tv_sec * 1000000) + t.tv_usec;
}
static uint64_t rand64(tinymt64_t *state, uint64_t n) {
uint64_t x, max = ~UINT64_C(0);
max -= max % n;
do {
x = tinymt64_generate_uint64(state);
} while (x >= max);
return x % n;
}
static char *extract_url_part(char *url, struct http_parser_url *parser_url, enum http_parser_url_fields field) {
char *part = NULL;
@ -534,8 +561,8 @@ static void print_units(long double n, char *(*fmt)(long double), int width) {
}
static void print_stats(char *name, stats *stats, char *(*fmt)(long double)) {
uint64_t max;
long double mean = stats_summarize(stats, NULL, &max);
uint64_t max = stats->max;
long double mean = stats_summarize(stats);
long double stdev = stats_stdev(stats, mean);
printf(" %-10s", name);

View File

@ -13,10 +13,11 @@
#define VERSION "2.1.0"
#define RECVBUF 8192
#define SAMPLES 100000
#define SAMPLES 100000000
#define SOCKET_TIMEOUT_MS 2000
#define SAMPLE_INTERVAL_MS 100
#define SAMPLE_INTERVAL_MS 10
#define CALIBRATE_DELAY_MS 500
#define TIMEOUT_INTERVAL_MS 2000
typedef struct {
@ -33,8 +34,12 @@ typedef struct {
uint64_t connections;
uint64_t stop_at;
uint64_t complete;
uint64_t requests;
uint64_t bytes;
uint64_t start;
uint64_t rate;
uint64_t missed;
stats *latency;
tinymt64_t rand;
errors errors;
struct connection *cs;
@ -45,7 +50,6 @@ typedef struct connection {
http_parser parser;
int fd;
uint64_t start;
uint64_t latency;
char buf[RECVBUF];
} connection;
@ -55,6 +59,7 @@ static void *thread_main(void *);
static int connect_socket(thread *, connection *);
static int reconnect_socket(thread *, connection *);
static int calibrate(aeEventLoop *, long long, void *);
static int sample_rate(aeEventLoop *, long long, void *);
static int check_timeouts(aeEventLoop *, long long, void *);
@ -63,7 +68,6 @@ static void socket_readable(aeEventLoop *, int, void *, int);
static int request_complete(http_parser *);
static uint64_t time_us();
static uint64_t rand64(tinymt64_t *, uint64_t);
static char *extract_url_part(char *, struct http_parser_url *, enum http_parser_url_fields);
static char *format_request(char *, char *, char *, char **);