aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Bennett <steveb@workware.net.au>2023-02-06 15:55:56 +1000
committerSteve Bennett <steveb@workware.net.au>2023-02-13 10:44:10 +1000
commitaa18a0d938ca171fcf96616cb7ff011034eb5902 (patch)
tree77861adb8abf52a18b23420b9f813995b71ed850
parenta5ea6b096e9e9f9913ad860a847e3757580dd9e4 (diff)
downloadjimtcl-aa18a0d938ca171fcf96616cb7ff011034eb5902.zip
jimtcl-aa18a0d938ca171fcf96616cb7ff011034eb5902.tar.gz
jimtcl-aa18a0d938ca171fcf96616cb7ff011034eb5902.tar.bz2
redis: Add -async support
Supports communication with redis as part of an event loop Signed-off-by: Steve Bennett <steveb@workware.net.au>
-rw-r--r--README.redis30
-rw-r--r--examples/redis-async.tcl62
-rw-r--r--jim-redis.c44
3 files changed, 131 insertions, 5 deletions
diff --git a/README.redis b/README.redis
index 59fc860..1b66eda 100644
--- a/README.redis
+++ b/README.redis
@@ -108,6 +108,8 @@ If no message is received, the read command will wait forever.
The message is returned as: message <channel> <text>
+The 'read' subcommand is also used in non-blocking mode. See the section below for more details.
+
The readable subcommand
~~~~~~~~~~~~~~~~~~~~~~~
@@ -133,3 +135,31 @@ The 'close' command is supported to close the connection.
This command is equivalent to deleting the command with:
rename $r ""
+
+Async/Non-blocking support
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+It is possible to connect to redis in non-blocking mode by using the '-async' flag. e.g.
+
+ set r [redis -async [socket stream localhost:6379]]
+
+Now commands will return immediately with an empty result and 'read' in a 'readable' should
+be used to retrieve the result. As a simple example:
+
+ $r readable {
+ set result [$r read]
+ if {$result ne ""} {
+ puts $result
+ incr next
+ }
+ }
+ $r SET x 5
+ vwait next
+ $r INCR x
+ vwait next
+
+Note that if a large result is returned, 'read' may return an empty string, in
+which case further calls to 'readable' are required to return the result.
+
+In general the underlying socket should be put into non-blocking mode ($sock ndelay 1)
+and a while loop should be used to read reponses until and empty result is returned.
diff --git a/examples/redis-async.tcl b/examples/redis-async.tcl
new file mode 100644
index 0000000..b9ec27a
--- /dev/null
+++ b/examples/redis-async.tcl
@@ -0,0 +1,62 @@
+#!/usr/bin/env jimsh
+
+# Testing redis client access in non-blocking mode
+
+# Requires the redis extension
+package require redis
+
+# A redis server should be running either on localhost 6379
+# or on the given address (e.g. host:port)
+try {
+ lassign $argv addr
+ if {$addr eq ""} {
+ set addr localhost:6379
+ }
+ set s [socket stream $addr]
+ # socket must be in non-blocking mode
+ $s ndelay 1
+ set r [redis -async $s]
+} on error msg {
+ puts [errorInfo $msg]
+ exit 1
+}
+
+# List of outstanding redis commands
+set cmds {}
+
+$r readable {
+ while {1} {
+ set result [$r -type read]
+ if {$result eq ""} {
+ break
+ }
+ set cmds [lassign $cmds cmd]
+ # Show command and response
+ puts "$cmd => $result"
+ }
+}
+
+# queue a command and remember it
+proc redis_command {r args} {
+ global cmds
+ lappend cmds $args
+ $r {*}$args
+}
+
+redis_command $r SET zz 0
+
+proc periodic {r} {
+ global counter done
+
+ if {[incr counter] > 10} {
+ incr done
+ } else {
+ redis_command $r INCR zz
+ after 100 periodic $r
+ }
+}
+
+set counter 0
+periodic $r
+
+vwait done
diff --git a/jim-redis.c b/jim-redis.c
index 2534347..d7e5770 100644
--- a/jim-redis.c
+++ b/jim-redis.c
@@ -72,6 +72,21 @@ static Jim_Obj *jim_redis_get_result(Jim_Interp *interp, redisReply *reply, int
return obj;
}
+static int jim_redis_write_callback(Jim_Interp *interp, void *clientData, int mask)
+{
+ redisContext *c = clientData;
+
+ int done;
+ if (redisBufferWrite(c, &done) != REDIS_OK) {
+ return JIM_ERR;
+ }
+ if (done) {
+ /* Write has completed, so remove the callback */
+ Jim_DeleteFileHandler(interp, c->fd, mask);
+ }
+ return JIM_OK;
+}
+
/**
* $r readable ?script?
* - set or clear a readable script
@@ -111,7 +126,12 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
return Jim_DeleteCommand(interp, argv[0]);
}
if (Jim_CompareStringImmediate(interp, argv[1], "read")) {
- if (redisGetReply(c, (void **)&reply) != REDIS_OK) {
+ int rc;
+ if (!(c->flags & REDIS_BLOCK)) {
+ redisBufferRead(c);
+ }
+ rc = redisGetReply(c, (void **)&reply);
+ if (rc != REDIS_OK) {
reply = NULL;
}
}
@@ -126,6 +146,13 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
reply = redisCommandArgv(c, nargs, args, arglens);
Jim_Free(args);
Jim_Free(arglens);
+ if (!(c->flags & REDIS_BLOCK)) {
+ int done;
+ if (redisBufferWrite(c, &done) == REDIS_OK && !done) {
+ /* Couldn't write the entire command, so set up a writable callback to complete the job */
+ Jim_CreateFileHandler(interp, c->fd, JIM_EVENT_WRITABLE, jim_redis_write_callback, c, NULL);
+ }
+ }
}
/* sometimes commands return NULL */
if (reply) {
@@ -164,14 +191,18 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
Jim_Obj *objv[2];
long fd;
int ret;
+ int async = 0;
- if (argc != 2) {
- Jim_WrongNumArgs(interp, 1, argv, "socket-stream");
+ if (argc > 2 && Jim_CompareStringImmediate(interp, argv[1], "-async")) {
+ async = 1;
+ }
+ if (argc - async != 2) {
+ Jim_WrongNumArgs(interp, 1, argv, "?-async? socket-stream");
return JIM_ERR;
}
/* Invoke getfd to get the file descriptor */
- objv[0] = argv[1];
+ objv[0] = argv[1 + async];
objv[1] = Jim_NewStringObj(interp, "getfd", -1);
ret = Jim_EvalObjVector(interp, 2, objv);
if (ret == JIM_OK) {
@@ -186,10 +217,13 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
fd = dup(fd);
/* Can't fail */
c = redisConnectFd(fd);
+ if (async) {
+ c->flags &= ~REDIS_BLOCK;
+ }
/* Enable TCP_KEEPALIVE - this is the default for later redis versions */
redisEnableKeepAlive(c);
/* Now delete the original stream */
- Jim_DeleteCommand(interp, argv[1]);
+ Jim_DeleteCommand(interp, argv[1 + async]);
snprintf(buf, sizeof(buf), "redis.handle%ld", Jim_GetId(interp));
Jim_CreateCommand(interp, buf, jim_redis_subcmd, c, jim_redis_del_proc);