diff options
-rw-r--r-- | README.redis | 30 | ||||
-rw-r--r-- | examples/redis-async.tcl | 62 | ||||
-rw-r--r-- | jim-redis.c | 44 |
3 files changed, 131 insertions, 5 deletions
diff --git a/README.redis b/README.redis index 59fc860..1b66eda 100644 --- a/README.redis +++ b/README.redis @@ -108,6 +108,8 @@ If no message is received, the read command will wait forever. The message is returned as: message <channel> <text> +The 'read' subcommand is also used in non-blocking mode. See the section below for more details. + The readable subcommand ~~~~~~~~~~~~~~~~~~~~~~~ @@ -133,3 +135,31 @@ The 'close' command is supported to close the connection. This command is equivalent to deleting the command with: rename $r "" + +Async/Non-blocking support +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +It is possible to connect to redis in non-blocking mode by using the '-async' flag. e.g. + + set r [redis -async [socket stream localhost:6379]] + +Now commands will return immediately with an empty result and 'read' in a 'readable' should +be used to retrieve the result. As a simple example: + + $r readable { + set result [$r read] + if {$result ne ""} { + puts $result + incr next + } + } + $r SET x 5 + vwait next + $r INCR x + vwait next + +Note that if a large result is returned, 'read' may return an empty string, in +which case further calls to 'readable' are required to return the result. + +In general the underlying socket should be put into non-blocking mode ($sock ndelay 1) +and a while loop should be used to read reponses until and empty result is returned. diff --git a/examples/redis-async.tcl b/examples/redis-async.tcl new file mode 100644 index 0000000..b9ec27a --- /dev/null +++ b/examples/redis-async.tcl @@ -0,0 +1,62 @@ +#!/usr/bin/env jimsh + +# Testing redis client access in non-blocking mode + +# Requires the redis extension +package require redis + +# A redis server should be running either on localhost 6379 +# or on the given address (e.g. host:port) +try { + lassign $argv addr + if {$addr eq ""} { + set addr localhost:6379 + } + set s [socket stream $addr] + # socket must be in non-blocking mode + $s ndelay 1 + set r [redis -async $s] +} on error msg { + puts [errorInfo $msg] + exit 1 +} + +# List of outstanding redis commands +set cmds {} + +$r readable { + while {1} { + set result [$r -type read] + if {$result eq ""} { + break + } + set cmds [lassign $cmds cmd] + # Show command and response + puts "$cmd => $result" + } +} + +# queue a command and remember it +proc redis_command {r args} { + global cmds + lappend cmds $args + $r {*}$args +} + +redis_command $r SET zz 0 + +proc periodic {r} { + global counter done + + if {[incr counter] > 10} { + incr done + } else { + redis_command $r INCR zz + after 100 periodic $r + } +} + +set counter 0 +periodic $r + +vwait done diff --git a/jim-redis.c b/jim-redis.c index 2534347..d7e5770 100644 --- a/jim-redis.c +++ b/jim-redis.c @@ -72,6 +72,21 @@ static Jim_Obj *jim_redis_get_result(Jim_Interp *interp, redisReply *reply, int return obj; } +static int jim_redis_write_callback(Jim_Interp *interp, void *clientData, int mask) +{ + redisContext *c = clientData; + + int done; + if (redisBufferWrite(c, &done) != REDIS_OK) { + return JIM_ERR; + } + if (done) { + /* Write has completed, so remove the callback */ + Jim_DeleteFileHandler(interp, c->fd, mask); + } + return JIM_OK; +} + /** * $r readable ?script? * - set or clear a readable script @@ -111,7 +126,12 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv) return Jim_DeleteCommand(interp, argv[0]); } if (Jim_CompareStringImmediate(interp, argv[1], "read")) { - if (redisGetReply(c, (void **)&reply) != REDIS_OK) { + int rc; + if (!(c->flags & REDIS_BLOCK)) { + redisBufferRead(c); + } + rc = redisGetReply(c, (void **)&reply); + if (rc != REDIS_OK) { reply = NULL; } } @@ -126,6 +146,13 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv) reply = redisCommandArgv(c, nargs, args, arglens); Jim_Free(args); Jim_Free(arglens); + if (!(c->flags & REDIS_BLOCK)) { + int done; + if (redisBufferWrite(c, &done) == REDIS_OK && !done) { + /* Couldn't write the entire command, so set up a writable callback to complete the job */ + Jim_CreateFileHandler(interp, c->fd, JIM_EVENT_WRITABLE, jim_redis_write_callback, c, NULL); + } + } } /* sometimes commands return NULL */ if (reply) { @@ -164,14 +191,18 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv) Jim_Obj *objv[2]; long fd; int ret; + int async = 0; - if (argc != 2) { - Jim_WrongNumArgs(interp, 1, argv, "socket-stream"); + if (argc > 2 && Jim_CompareStringImmediate(interp, argv[1], "-async")) { + async = 1; + } + if (argc - async != 2) { + Jim_WrongNumArgs(interp, 1, argv, "?-async? socket-stream"); return JIM_ERR; } /* Invoke getfd to get the file descriptor */ - objv[0] = argv[1]; + objv[0] = argv[1 + async]; objv[1] = Jim_NewStringObj(interp, "getfd", -1); ret = Jim_EvalObjVector(interp, 2, objv); if (ret == JIM_OK) { @@ -186,10 +217,13 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv) fd = dup(fd); /* Can't fail */ c = redisConnectFd(fd); + if (async) { + c->flags &= ~REDIS_BLOCK; + } /* Enable TCP_KEEPALIVE - this is the default for later redis versions */ redisEnableKeepAlive(c); /* Now delete the original stream */ - Jim_DeleteCommand(interp, argv[1]); + Jim_DeleteCommand(interp, argv[1 + async]); snprintf(buf, sizeof(buf), "redis.handle%ld", Jim_GetId(interp)); Jim_CreateCommand(interp, buf, jim_redis_subcmd, c, jim_redis_del_proc); |