aboutsummaryrefslogtreecommitdiff
path: root/jim-redis.c
diff options
context:
space:
mode:
authorSteve Bennett <steveb@workware.net.au>2023-02-06 15:55:56 +1000
committerSteve Bennett <steveb@workware.net.au>2023-02-13 10:44:10 +1000
commitaa18a0d938ca171fcf96616cb7ff011034eb5902 (patch)
tree77861adb8abf52a18b23420b9f813995b71ed850 /jim-redis.c
parenta5ea6b096e9e9f9913ad860a847e3757580dd9e4 (diff)
downloadjimtcl-aa18a0d938ca171fcf96616cb7ff011034eb5902.zip
jimtcl-aa18a0d938ca171fcf96616cb7ff011034eb5902.tar.gz
jimtcl-aa18a0d938ca171fcf96616cb7ff011034eb5902.tar.bz2
redis: Add -async support
Supports communication with redis as part of an event loop Signed-off-by: Steve Bennett <steveb@workware.net.au>
Diffstat (limited to 'jim-redis.c')
-rw-r--r--jim-redis.c44
1 files changed, 39 insertions, 5 deletions
diff --git a/jim-redis.c b/jim-redis.c
index 2534347..d7e5770 100644
--- a/jim-redis.c
+++ b/jim-redis.c
@@ -72,6 +72,21 @@ static Jim_Obj *jim_redis_get_result(Jim_Interp *interp, redisReply *reply, int
return obj;
}
+static int jim_redis_write_callback(Jim_Interp *interp, void *clientData, int mask)
+{
+ redisContext *c = clientData;
+
+ int done;
+ if (redisBufferWrite(c, &done) != REDIS_OK) {
+ return JIM_ERR;
+ }
+ if (done) {
+ /* Write has completed, so remove the callback */
+ Jim_DeleteFileHandler(interp, c->fd, mask);
+ }
+ return JIM_OK;
+}
+
/**
* $r readable ?script?
* - set or clear a readable script
@@ -111,7 +126,12 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
return Jim_DeleteCommand(interp, argv[0]);
}
if (Jim_CompareStringImmediate(interp, argv[1], "read")) {
- if (redisGetReply(c, (void **)&reply) != REDIS_OK) {
+ int rc;
+ if (!(c->flags & REDIS_BLOCK)) {
+ redisBufferRead(c);
+ }
+ rc = redisGetReply(c, (void **)&reply);
+ if (rc != REDIS_OK) {
reply = NULL;
}
}
@@ -126,6 +146,13 @@ static int jim_redis_subcmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
reply = redisCommandArgv(c, nargs, args, arglens);
Jim_Free(args);
Jim_Free(arglens);
+ if (!(c->flags & REDIS_BLOCK)) {
+ int done;
+ if (redisBufferWrite(c, &done) == REDIS_OK && !done) {
+ /* Couldn't write the entire command, so set up a writable callback to complete the job */
+ Jim_CreateFileHandler(interp, c->fd, JIM_EVENT_WRITABLE, jim_redis_write_callback, c, NULL);
+ }
+ }
}
/* sometimes commands return NULL */
if (reply) {
@@ -164,14 +191,18 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
Jim_Obj *objv[2];
long fd;
int ret;
+ int async = 0;
- if (argc != 2) {
- Jim_WrongNumArgs(interp, 1, argv, "socket-stream");
+ if (argc > 2 && Jim_CompareStringImmediate(interp, argv[1], "-async")) {
+ async = 1;
+ }
+ if (argc - async != 2) {
+ Jim_WrongNumArgs(interp, 1, argv, "?-async? socket-stream");
return JIM_ERR;
}
/* Invoke getfd to get the file descriptor */
- objv[0] = argv[1];
+ objv[0] = argv[1 + async];
objv[1] = Jim_NewStringObj(interp, "getfd", -1);
ret = Jim_EvalObjVector(interp, 2, objv);
if (ret == JIM_OK) {
@@ -186,10 +217,13 @@ static int jim_redis_cmd(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
fd = dup(fd);
/* Can't fail */
c = redisConnectFd(fd);
+ if (async) {
+ c->flags &= ~REDIS_BLOCK;
+ }
/* Enable TCP_KEEPALIVE - this is the default for later redis versions */
redisEnableKeepAlive(c);
/* Now delete the original stream */
- Jim_DeleteCommand(interp, argv[1]);
+ Jim_DeleteCommand(interp, argv[1 + async]);
snprintf(buf, sizeof(buf), "redis.handle%ld", Jim_GetId(interp));
Jim_CreateCommand(interp, buf, jim_redis_subcmd, c, jim_redis_del_proc);