22 Star 36 Fork 15

蔡东赟 / beanstalkd-win

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
testserv.c 27.66 KB
一键复制 编辑 原始数据 按行查看 历史
Keith Rarick 提交于 2013-06-06 00:47 . update ct; test throughput in MB/s
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#include "ct/ct.h"
#include "dat.h"
static int srvpid, port, fd, size;
static int64 timeout = 5000000000LL; // 5s
static byte fallocpat[3];
static int
wrapfalloc(int fd, int size)
{
static int c = 0;
printf("\nwrapfalloc: fd=%d size=%d\n", fd, size);
if (c >= sizeof(fallocpat) || !fallocpat[c++]) {
return ENOSPC;
}
return rawfalloc(fd, size);
}
static void
muststart(char *a0, char *a1, char *a2, char *a3, char *a4)
{
srvpid = fork();
if (srvpid < 0) {
twarn("fork");
exit(1);
}
if (srvpid > 0) {
printf("%s %s %s %s %s\n", a0, a1, a2, a3, a4);
printf("start server pid=%d\n", srvpid);
usleep(100000); // .1s; time for the child to bind to its port
return;
}
/* now in child */
execlp(a0, a0, a1, a2, a3, a4, NULL);
}
static int
mustdiallocal(int port)
{
int r, fd;
struct sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
r = inet_aton("127.0.0.1", &addr.sin_addr);
if (!r) {
errno = EINVAL;
twarn("inet_aton");
exit(1);
}
fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd == -1) {
twarn("socket");
exit(1);
}
r = connect(fd, (struct sockaddr*)&addr, sizeof addr);
if (r == -1) {
twarn("connect");
exit(1);
}
return fd;
}
#define SERVER() (progname=__func__, mustforksrv())
static int
mustforksrv()
{
int r, len, port, ok;
struct sockaddr_in addr;
srv.sock.fd = make_server_socket("127.0.0.1", "0");
if (srv.sock.fd == -1) {
puts("mustforksrv failed");
exit(1);
}
len = sizeof(addr);
r = getsockname(srv.sock.fd, (struct sockaddr*)&addr, (socklen_t*)&len);
if (r == -1 || len > sizeof(addr)) {
puts("mustforksrv failed");
exit(1);
}
port = ntohs(addr.sin_port);
srvpid = fork();
if (srvpid < 0) {
twarn("fork");
exit(1);
}
if (srvpid > 0) {
printf("start server port=%d pid=%d\n", port, srvpid);
return port;
}
/* now in child */
prot_init();
if (srv.wal.use) {
struct job list = {};
// We want to make sure that only one beanstalkd tries
// to use the wal directory at a time. So acquire a lock
// now and never release it.
if (!waldirlock(&srv.wal)) {
twarnx("failed to lock wal dir %s", srv.wal.dir);
exit(10);
}
list.prev = list.next = &list;
walinit(&srv.wal, &list);
ok = prot_replay(&srv, &list);
if (!ok) {
twarnx("failed to replay log");
exit(11);
}
}
srvserve(&srv); /* does not return */
exit(1); /* satisfy the compiler */
}
static char *
readline(int fd)
{
int r, i = 0;
char c = 0, p = 0;
static char buf[1024];
fd_set rfd;
struct timeval tv;
printf("<%d ", fd);
fflush(stdout);
for (;;) {
FD_ZERO(&rfd);
FD_SET(fd, &rfd);
tv.tv_sec = timeout / 1000000000;
tv.tv_usec = (timeout/1000) % 1000000;
r = select(fd+1, &rfd, NULL, NULL, &tv);
switch (r) {
case 1:
break;
case 0:
fputs("timeout", stderr);
exit(8);
case -1:
perror("select");
exit(1);
default:
fputs("unknown error", stderr);
exit(3);
}
r = read(fd, &c, 1);
if (r == -1) {
perror("write");
exit(1);
}
if (i >= sizeof(buf)-1) {
fputs("response too big", stderr);
exit(4);
}
putc(c, stdout);
fflush(stdout);
buf[i++] = c;
if (p == '\r' && c == '\n') {
break;
}
p = c;
}
buf[i] = '\0';
return buf;
}
static void
ckresp(int fd, char *exp)
{
char *line;
line = readline(fd);
assertf(strcmp(exp, line) == 0, "\"%s\" != \"%s\"", exp, line);
}
static void
ckrespsub(int fd, char *sub)
{
char *line;
line = readline(fd);
assertf(strstr(line, sub), "\"%s\" not in \"%s\"", sub, line);
}
static void
writefull(int fd, char *s, int n)
{
int c;
for (; n; n -= c) {
c = write(fd, s, n);
if (c == -1) {
perror("write");
exit(1);
}
s += c;
}
}
static void
mustsend(int fd, char *s)
{
writefull(fd, s, strlen(s));
printf(">%d %s", fd, s);
fflush(stdout);
}
static int
filesize(char *path)
{
int r;
struct stat s;
r = stat(path, &s);
if (r == -1) {
twarn("stat");
exit(1);
}
return s.st_size;
}
static int
exist(char *path)
{
int r;
struct stat s;
r = stat(path, &s);
return r != -1;
}
void
cttestpause()
{
int64 s;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 0 1\r\n");
mustsend(fd, "x\r\n");
ckresp(fd, "INSERTED 1\r\n");
s = nanoseconds();
mustsend(fd, "pause-tube default 1\r\n");
ckresp(fd, "PAUSED\r\n");
mustsend(fd, "reserve\r\n");
ckresp(fd, "RESERVED 1 1\r\n");
ckresp(fd, "x\r\n");
assert(nanoseconds() - s >= 1000000000); // 1s
}
void
cttestunderscore()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "use x_y\r\n");
ckresp(fd, "USING x_y\r\n");
}
void
cttest2cmdpacket()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "use a\r\nuse b\r\n");
ckresp(fd, "USING a\r\n");
ckresp(fd, "USING b\r\n");
}
void
cttesttoobig()
{
job_data_size_limit = 10;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 0 11\r\n");
mustsend(fd, "delete 9999\r\n");
mustsend(fd, "put 0 0 0 1\r\n");
mustsend(fd, "x\r\n");
ckresp(fd, "JOB_TOO_BIG\r\n");
ckresp(fd, "INSERTED 1\r\n");
}
void
cttestdeleteready()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 0 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "delete 1\r\n");
ckresp(fd, "DELETED\r\n");
}
void
cttestmultitube()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "use abc\r\n");
ckresp(fd, "USING abc\r\n");
mustsend(fd, "put 999999 0 0 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "use def\r\n");
ckresp(fd, "USING def\r\n");
mustsend(fd, "put 99 0 0 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 2\r\n");
mustsend(fd, "watch abc\r\n");
ckresp(fd, "WATCHING 2\r\n");
mustsend(fd, "watch def\r\n");
ckresp(fd, "WATCHING 3\r\n");
mustsend(fd, "reserve\r\n");
ckresp(fd, "RESERVED 2 0\r\n");
}
void
cttestnonegativedelay()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 512 -1 100 0\r\n");
ckresp(fd, "BAD_FORMAT\r\n");
}
void
cttestomittimeleft()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 5 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntime-left: 0\n");
}
void
cttestsmalldelay()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 1 1 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 1\r\n");
}
void
ctteststatstube()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "use tubea\r\n");
ckresp(fd, "USING tubea\r\n");
mustsend(fd, "put 0 0 0 1\r\n");
mustsend(fd, "x\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "delete 1\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nname: tubea\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-urgent: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-ready: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-reserved: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-delayed: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-buried: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntotal-jobs: 1\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-using: 1\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-watching: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-waiting: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncmd-delete: 1\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncmd-pause-tube: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npause: 0\n");
mustsend(fd, "stats-tube tubea\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npause-time-left: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nname: default\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-urgent: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-ready: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-reserved: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-delayed: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-jobs-buried: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntotal-jobs: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-using: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-watching: 1\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncurrent-waiting: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncmd-delete: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ncmd-pause-tube: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npause: 0\n");
mustsend(fd, "stats-tube default\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npause-time-left: 0\n");
}
void
cttestttrlarge()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 120 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "put 0 0 4294 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 2\r\n");
mustsend(fd, "put 0 0 4295 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 3\r\n");
mustsend(fd, "put 0 0 4296 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 4\r\n");
mustsend(fd, "put 0 0 4297 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 5\r\n");
mustsend(fd, "put 0 0 5000 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 6\r\n");
mustsend(fd, "put 0 0 21600 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 7\r\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 120\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 4294\n");
mustsend(fd, "stats-job 3\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 4295\n");
mustsend(fd, "stats-job 4\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 4296\n");
mustsend(fd, "stats-job 5\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 4297\n");
mustsend(fd, "stats-job 6\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 5000\n");
mustsend(fd, "stats-job 7\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 21600\n");
}
void
cttestttrsmall()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 0 1\r\n");
mustsend(fd, "a\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 1\n");
}
void
cttestzerodelay()
{
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 1 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 1\r\n");
}
void
cttestreservewithtimeout2conn()
{
int fd0, fd1;
job_data_size_limit = 10;
port = SERVER();
fd0 = mustdiallocal(port);
fd1 = mustdiallocal(port);
mustsend(fd0, "watch foo\r\n");
ckresp(fd0, "WATCHING 2\r\n");
mustsend(fd0, "reserve-with-timeout 1\r\n");
mustsend(fd1, "watch foo\r\n");
ckresp(fd1, "WATCHING 2\r\n");
timeout = 1100000000; // 1.1s
ckresp(fd0, "TIMED_OUT\r\n");
}
void
cttestunpausetube()
{
int fd0, fd1;
port = SERVER();
fd0 = mustdiallocal(port);
fd1 = mustdiallocal(port);
mustsend(fd0, "put 0 0 0 0\r\n");
mustsend(fd0, "\r\n");
ckresp(fd0, "INSERTED 1\r\n");
mustsend(fd0, "pause-tube default 86400\r\n");
ckresp(fd0, "PAUSED\r\n");
mustsend(fd1, "reserve\r\n");
mustsend(fd0, "pause-tube default 0\r\n");
ckresp(fd0, "PAUSED\r\n");
// ckresp will time out if this takes too long, so the
// test will not pass.
ckresp(fd1, "RESERVED 1 0\r\n");
ckresp(fd1, "\r\n");
}
void
cttestbinlogemptyexit()
{
srv.wal.dir = ctdir();
srv.wal.use = 1;
job_data_size_limit = 10;
port = SERVER();
kill(srvpid, 9);
waitpid(srvpid, NULL, 0);
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 0 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 1\r\n");
}
void
cttestbinlogbury()
{
srv.wal.dir = ctdir();
srv.wal.use = 1;
job_data_size_limit = 10;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 100 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "reserve\r\n");
ckresp(fd, "RESERVED 1 0\r\n");
ckresp(fd, "\r\n");
mustsend(fd, "bury 1 0\r\n");
ckresp(fd, "BURIED\r\n");
}
void
cttestbinlogbasic()
{
srv.wal.dir = ctdir();
srv.wal.use = 1;
job_data_size_limit = 10;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 100 0\r\n");
mustsend(fd, "\r\n");
ckresp(fd, "INSERTED 1\r\n");
kill(srvpid, 9);
waitpid(srvpid, NULL, 0);
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "delete 1\r\n");
ckresp(fd, "DELETED\r\n");
}
void
cttestbinlogsizelimit()
{
int i = 0;
char *b2;
int gotsize;
size = 1024;
srv.wal.dir = ctdir();
srv.wal.use = 1;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;
port = SERVER();
fd = mustdiallocal(port);
b2 = fmtalloc("%s/binlog.2", ctdir());
while (!exist(b2)) {
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, fmtalloc("INSERTED %d\r\n", ++i));
}
gotsize = filesize(fmtalloc("%s/binlog.1", ctdir()));
assertf(gotsize == size, "binlog.1 %d != %d", gotsize, size);
gotsize = filesize(b2);
assertf(gotsize == size, "binlog.2 %d != %d", gotsize, size);
}
void
cttestbinlogallocation()
{
int i = 0;
size = 601;
srv.wal.dir = ctdir();
srv.wal.use = 1;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;
port = SERVER();
fd = mustdiallocal(port);
for (i = 1; i <= 96; i++) {
mustsend(fd, "put 0 0 120 22\r\n");
mustsend(fd, "job payload xxxxxxxxxx\r\n");
ckresp(fd, fmtalloc("INSERTED %d\r\n", i));
}
for (i = 1; i <= 96; i++) {
mustsend(fd, fmtalloc("delete %d\r\n", i));
ckresp(fd, "DELETED\r\n");
}
}
void
cttestbinlogread()
{
srv.wal.dir = ctdir();
srv.wal.use = 1;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "use test\r\n");
ckresp(fd, "USING test\r\n");
mustsend(fd, "put 0 0 120 4\r\n");
mustsend(fd, "test\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "put 0 0 120 4\r\n");
mustsend(fd, "tes1\r\n");
ckresp(fd, "INSERTED 2\r\n");
mustsend(fd, "watch test\r\n");
ckresp(fd, "WATCHING 2\r\n");
mustsend(fd, "reserve\r\n");
ckresp(fd, "RESERVED 1 4\r\n");
ckresp(fd, "test\r\n");
mustsend(fd, "release 1 1 1\r\n");
ckresp(fd, "RELEASED\r\n");
mustsend(fd, "reserve\r\n");
ckresp(fd, "RESERVED 2 4\r\n");
ckresp(fd, "tes1\r\n");
mustsend(fd, "delete 2\r\n");
ckresp(fd, "DELETED\r\n");
kill(srvpid, 9);
waitpid(srvpid, NULL, 0);
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "watch test\r\n");
ckresp(fd, "WATCHING 2\r\n");
mustsend(fd, "reserve\r\n");
ckresp(fd, "RESERVED 1 4\r\n");
ckresp(fd, "test\r\n");
mustsend(fd, "delete 1\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 2\r\n");
ckresp(fd, "NOT_FOUND\r\n");
}
void
cttestbinlogdiskfull()
{
size = 1000;
falloc = &wrapfalloc;
fallocpat[0] = 1;
fallocpat[2] = 1;
srv.wal.dir = ctdir();
srv.wal.use = 1;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 2\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 3\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 4\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "OUT_OF_MEMORY\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 6\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 7\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 8\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 9\r\n");
mustsend(fd, "delete 1\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 2\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 3\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 4\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 6\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 7\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 8\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 9\r\n");
ckresp(fd, "DELETED\r\n");
}
void
cttestbinlogdiskfulldelete()
{
size = 1000;
falloc = &wrapfalloc;
fallocpat[0] = 1;
fallocpat[1] = 1;
srv.wal.dir = ctdir();
srv.wal.use = 1;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 2\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 3\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 4\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 5\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 6\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 7\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "INSERTED 8\r\n");
mustsend(fd, "put 0 0 100 50\r\n");
mustsend(fd, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
ckresp(fd, "OUT_OF_MEMORY\r\n");
assert(exist(fmtalloc("%s/binlog.1", ctdir())));
mustsend(fd, "delete 1\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 2\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 3\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 4\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 5\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 6\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 7\r\n");
ckresp(fd, "DELETED\r\n");
mustsend(fd, "delete 8\r\n");
ckresp(fd, "DELETED\r\n");
}
void
cttestbinlogv5()
{
char portstr[10];
if (system("which beanstalkd-1.4.6") != 0) {
puts("beanstalkd 1.4.6 not found, skipping");
exit(0);
}
progname=__func__;
port = (rand()&0xfbff) + 1024;
sprintf(portstr, "%d", port);
muststart("beanstalkd-1.4.6", "-b", ctdir(), "-p", portstr);
fd = mustdiallocal(port);
mustsend(fd, "use test\r\n");
ckresp(fd, "USING test\r\n");
mustsend(fd, "put 1 2 3 4\r\n");
mustsend(fd, "test\r\n");
ckresp(fd, "INSERTED 1\r\n");
mustsend(fd, "put 4 3 2 1\r\n");
mustsend(fd, "x\r\n");
ckresp(fd, "INSERTED 2\r\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nid: 1\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntube: test\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nstate: delayed\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npri: 1\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ndelay: 2\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 3\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreserves: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntimeouts: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreleases: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nburies: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nkicks: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nid: 2\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntube: test\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nstate: delayed\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npri: 4\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ndelay: 3\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 2\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreserves: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntimeouts: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreleases: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nburies: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nkicks: 0\n");
kill(srvpid, 9);
waitpid(srvpid, NULL, 0);
srv.wal.dir = ctdir();
srv.wal.use = 1;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;
port = SERVER();
fd = mustdiallocal(port);
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nid: 1\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntube: test\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nstate: delayed\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npri: 1\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ndelay: 2\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 3\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreserves: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntimeouts: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreleases: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nburies: 0\n");
mustsend(fd, "stats-job 1\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nkicks: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nid: 2\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntube: test\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nstate: delayed\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\npri: 4\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ndelay: 3\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nttr: 2\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreserves: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\ntimeouts: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nreleases: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nburies: 0\n");
mustsend(fd, "stats-job 2\r\n");
ckrespsub(fd, "OK ");
ckrespsub(fd, "\nkicks: 0\n");
}
static void
benchputdeletesize(int n, int size)
{
port = SERVER();
fd = mustdiallocal(port);
char buf[50], put[50];
char body[size+1];
memset(body, 'a', size);
body[size] = 0;
ctsetbytes(size);
sprintf(put, "put 0 0 0 %d\r\n", size);
int i;
for (i = 0; i < n; i++) {
mustsend(fd, put);
mustsend(fd, body);
mustsend(fd, "\r\n");
ckrespsub(fd, "INSERTED ");
sprintf(buf, "delete %d\r\n", i+1);
mustsend(fd, buf);
ckresp(fd, "DELETED\r\n");
}
}
void
ctbenchputdelete8byte(int n)
{
benchputdeletesize(n, 8);
}
void
ctbenchputdelete1k(int n)
{
benchputdeletesize(n, 1024);
}
void
ctbenchputdelete8k(int n)
{
benchputdeletesize(n, 8192);
}
C
1
https://gitee.com/lomox/beanstalkd-win.git
git@gitee.com:lomox/beanstalkd-win.git
lomox
beanstalkd-win
beanstalkd-win
master

搜索帮助