aboutsummaryrefslogtreecommitdiff
path: root/jim-aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'jim-aio.c')
-rw-r--r--jim-aio.c250
1 files changed, 177 insertions, 73 deletions
diff --git a/jim-aio.c b/jim-aio.c
index 4c59e1d..221d7f5 100644
--- a/jim-aio.c
+++ b/jim-aio.c
@@ -85,8 +85,8 @@
#include "jimiocompat.h"
#define AIO_CMD_LEN 32 /* e.g. aio.handleXXXXXX */
-#define AIO_BUF_LEN 256 /* read size for gets, read */
-#define AIO_WBUF_FULL_SIZE (64 * 1024) /* This could be configurable */
+#define AIO_DEFAULT_RBUF_LEN 256 /* read size for gets, read */
+#define AIO_DEFAULT_WBUF_LIMIT (64 * 1024) /* max size of writebuf before flushing */
#define AIO_KEEPOPEN 1 /* don't set O_CLOEXEC, don't close on command delete */
#define AIO_NODELETE 2 /* don't delete AF_UNIX path on close */
@@ -94,6 +94,8 @@
#define AIO_WBUF_NONE 8 /* default to buffering=none */
#define AIO_NONBLOCK 16 /* socket is non-blocking */
+#define AIO_ONEREAD 32 /* passed to aio_read_len() to return after a single read */
+
enum wbuftype {
WBUF_OPT_NONE, /* write immediately */
WBUF_OPT_LINE, /* write if NL is seen */
@@ -114,10 +116,6 @@ enum wbuftype {
#define UNIX_SOCKETS 0
#endif
-#ifndef MAXPATHLEN
-#define MAXPATHLEN JIM_PATH_LEN
-#endif
-
#if defined(HAVE_SOCKETS) && !defined(JIM_BOOTSTRAP)
/* Avoid type punned pointers */
union sockaddr_any {
@@ -192,11 +190,22 @@ typedef struct AioFile
const JimAioFopsType *fops;
Jim_Obj *readbuf; /* Contains any buffered read data. NULL if empty. refcount=0 */
Jim_Obj *writebuf; /* Contains any buffered write data. refcount=1 */
+ char *rbuf; /* Temporary read buffer (NULL if not yet allocated) */
+ size_t rbuf_len; /* Length of rbuf */
+ size_t wbuf_limit; /* Max size of writebuf before flushing */
} AioFile;
+static void aio_consume(Jim_Obj *objPtr, int n);
+
static int stdio_writer(struct AioFile *af, const char *buf, int len)
{
- return write(af->fd, buf, len);
+ int ret = write(af->fd, buf, len);
+ if (ret < 0 && errno == EPIPE) {
+ /* Also discard the write buffer since otherwise when
+ * we try to flush on shutdown we may get SIGPIPE */
+ aio_consume(af->writebuf, Jim_Length(af->writebuf));
+ }
+ return ret;
}
static int stdio_reader(struct AioFile *af, char *buf, int len, int nb)
@@ -710,7 +719,26 @@ static void aio_consume(Jim_Obj *objPtr, int n)
}
/* forward declaration */
-static int aio_autoflush(Jim_Interp *interp, void *clientData, int mask);
+static int aio_flush(Jim_Interp *interp, AioFile *af);
+
+#ifdef jim_ext_eventloop
+/**
+ * Called when the channel is writable.
+ * Write what we can and return -1 when the write buffer is empty to remove the handler.
+ */
+static int aio_autoflush(Jim_Interp *interp, void *clientData, int mask)
+{
+ AioFile *af = clientData;
+
+ aio_flush(interp, af);
+ if (Jim_Length(af->writebuf) == 0) {
+ /* Done, so remove the handler */
+ return -1;
+ }
+ return 0;
+}
+#endif
+
/**
* Flushes af->writebuf to the channel and removes that data
@@ -759,30 +787,18 @@ static int aio_flush(Jim_Interp *interp, AioFile *af)
}
/**
- * Called when the channel is writable.
- * Write what we can and return -1 when the write buffer is empty to remove the handler.
- */
-static int aio_autoflush(Jim_Interp *interp, void *clientData, int mask)
-{
- AioFile *af = clientData;
-
- aio_flush(interp, af);
- if (Jim_Length(af->writebuf) == 0) {
- /* Done, so remove the handler */
- return -1;
- }
- return 0;
-}
-
-/**
* Read until 'len' bytes are available in readbuf.
*
+ * If flags contains AIO_NONBLOCK, indicates a nonblocking read.
+ * If flags contains AIO_ONEREAD, return after a single read.
+ * (In this case JIM_ERR is also returned on timeout)
+ *
* If nonblocking or timeout, may return early.
* 'len' may be -1 to read until eof (or until no more data if nonblocking)
*
* Returns JIM_OK if data was read or JIM_ERR on error.
*/
-static int aio_read_len(Jim_Interp *interp, AioFile *af, int nb, char *buf, size_t buflen, int neededLen)
+static int aio_read_len(Jim_Interp *interp, AioFile *af, unsigned flags, int neededLen)
{
if (!af->readbuf) {
af->readbuf = Jim_NewStringObj(interp, NULL, 0);
@@ -800,20 +816,29 @@ static int aio_read_len(Jim_Interp *interp, AioFile *af, int nb, char *buf, size
int readlen;
if (neededLen == -1) {
- readlen = AIO_BUF_LEN;
+ readlen = af->rbuf_len;
}
else {
- readlen = (neededLen > AIO_BUF_LEN ? AIO_BUF_LEN : neededLen);
+ readlen = (neededLen > af->rbuf_len ? af->rbuf_len : neededLen);
}
- retval = af->fops->reader(af, buf, readlen, nb);
+ /* Allocate buffer if not already allocated */
+ if (!af->rbuf) {
+ af->rbuf = Jim_Alloc(af->rbuf_len);
+ }
+ retval = af->fops->reader(af, af->rbuf, readlen, flags & AIO_NONBLOCK);
if (retval > 0) {
- Jim_AppendString(interp, af->readbuf, buf, retval);
+ if (retval) {
+ Jim_AppendString(interp, af->readbuf, af->rbuf, retval);
+ }
if (neededLen != -1) {
neededLen -= retval;
}
+ if (flags & AIO_ONEREAD) {
+ return JIM_OK;
+ }
continue;
}
- if (JimCheckStreamError(interp, af)) {
+ if ((flags & AIO_ONEREAD) || JimCheckStreamError(interp, af)) {
return JIM_ERR;
}
break;
@@ -892,6 +917,7 @@ static void JimAioDelProc(Jim_Interp *interp, void *privData)
Jim_FreeNewObj(interp, af->readbuf);
}
+ Jim_Free(af->rbuf);
Jim_Free(af);
}
@@ -905,7 +931,6 @@ static int aio_cmd_read(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
int option;
int nb;
Jim_Obj *objPtr;
- char buf[AIO_BUF_LEN];
if (argc) {
if (*Jim_String(argv[0]) == '-') {
@@ -939,7 +964,7 @@ static int aio_cmd_read(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
/* reads are nonblocking if a timeout is given */
nb = aio_start_nonblocking(af);
- if (aio_read_len(interp, af, nb, buf, sizeof(buf), neededLen) != JIM_OK) {
+ if (aio_read_len(interp, af, nb ? AIO_NONBLOCK : 0, neededLen) != JIM_OK) {
aio_set_nonblocking(af, nb);
return JIM_ERR;
}
@@ -997,11 +1022,6 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
AioFile *af = Jim_CmdPrivData(interp);
jim_wide count = 0;
jim_wide maxlen = JIM_WIDE_MAX;
- /* Small, static buffer for small files */
- char buf[AIO_BUF_LEN];
- /* Will be allocated if the file is large */
- char *bufp = buf;
- int buflen = sizeof(buf);
int ok = 1;
Jim_Obj *objv[4];
@@ -1031,10 +1051,10 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
while (count < maxlen) {
jim_wide len = maxlen - count;
- if (len > buflen) {
- len = buflen;
+ if (len > af->rbuf_len) {
+ len = af->rbuf_len;
}
- if (aio_read_len(interp, af, 0, bufp, buflen, len) != JIM_OK) {
+ if (aio_read_len(interp, af, 0, len) != JIM_OK) {
ok = 0;
break;
}
@@ -1047,17 +1067,13 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
if (aio_eof(af)) {
break;
}
- if (count >= 16384 && bufp == buf) {
+ if (count >= 16384 && af->rbuf_len < 65536) {
/* Heuristic check - for large copy speed-up */
- buflen = 65536;
- bufp = Jim_Alloc(buflen);
+ af->rbuf_len = 65536;
+ af->rbuf = Jim_Realloc(af->rbuf, af->rbuf_len);
}
}
- if (bufp != buf) {
- Jim_Free(bufp);
- }
-
Jim_DecrRefCount(interp, objv[1]);
Jim_DecrRefCount(interp, objv[2]);
@@ -1073,10 +1089,10 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
static int aio_cmd_gets(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
{
AioFile *af = Jim_CmdPrivData(interp);
- char buf[AIO_BUF_LEN];
Jim_Obj *objPtr = NULL;
int len;
int nb;
+ unsigned flags = AIO_ONEREAD;
char *nl = NULL;
int offset = 0;
@@ -1084,33 +1100,33 @@ static int aio_cmd_gets(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
/* reads are non-blocking if a timeout has been given */
nb = aio_start_nonblocking(af);
-
- if (!af->readbuf) {
- af->readbuf = Jim_NewStringObj(interp, NULL, 0);
+ if (nb) {
+ flags |= AIO_NONBLOCK;
}
while (!aio_eof(af)) {
- const char *pt = Jim_GetString(af->readbuf, &len);
- nl = memchr(pt + offset, '\n', len - offset);
- if (nl) {
- /* got a line */
- objPtr = Jim_NewStringObj(interp, pt, nl - pt);
- /* And consume it plus the newline */
- aio_consume(af->readbuf, nl - pt + 1);
- break;
+ if (af->readbuf) {
+ const char *pt = Jim_GetString(af->readbuf, &len);
+ nl = memchr(pt + offset, '\n', len - offset);
+ if (nl) {
+ /* got a line */
+ objPtr = Jim_NewStringObj(interp, pt, nl - pt);
+ /* And consume it plus the newline */
+ aio_consume(af->readbuf, nl - pt + 1);
+ break;
+ }
+ offset = len;
}
- offset = len;
- len = af->fops->reader(af, buf, AIO_BUF_LEN, nb);
- if (len <= 0) {
+ /* Not got a line yet, so read more */
+ if (aio_read_len(interp, af, flags, -1) != JIM_OK) {
break;
}
- Jim_AppendString(interp, af->readbuf, buf, len);
}
aio_set_nonblocking(af, nb);
- if (!nl && aio_eof(af)) {
+ if (!nl && aio_eof(af) && af->readbuf) {
/* Just take what we have as the line */
objPtr = af->readbuf;
af->readbuf = NULL;
@@ -1162,6 +1178,15 @@ static int aio_cmd_puts(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
/* Keep it simple and always go via the writebuf instead of trying to optimise
* the case that we can write immediately
*/
+#ifdef JIM_MAINTAINER
+ if (Jim_IsShared(af->writebuf)) {
+ /* This should generally never happen since this object isn't accessible,
+ * but it is possible with 'debug objects' */
+ Jim_DecrRefCount(interp, af->writebuf);
+ af->writebuf = Jim_DuplicateObj(interp, af->writebuf);
+ Jim_IncrRefCount(af->writebuf);
+ }
+#endif
Jim_AppendObj(interp, af->writebuf, strObj);
if (nl) {
Jim_AppendString(interp, af->writebuf, "\n", 1);
@@ -1183,7 +1208,7 @@ static int aio_cmd_puts(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
break;
case WBUF_OPT_FULL:
- if (wlen >= AIO_WBUF_FULL_SIZE) {
+ if (wlen >= af->wbuf_limit) {
wnow = 1;
}
break;
@@ -1597,6 +1622,7 @@ static int aio_cmd_sync(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
static int aio_cmd_buffering(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
{
AioFile *af = Jim_CmdPrivData(interp);
+ Jim_Obj *resultObj;
static const char * const options[] = {
"none",
@@ -1605,17 +1631,79 @@ static int aio_cmd_buffering(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
NULL
};
- if (Jim_GetEnum(interp, argv[0], options, &af->wbuft, NULL, JIM_ERRMSG) != JIM_OK) {
- return JIM_ERR;
+ if (argc) {
+ if (Jim_GetEnum(interp, argv[0], options, &af->wbuft, NULL, JIM_ERRMSG) != JIM_OK) {
+ return JIM_ERR;
+ }
+
+ if (af->wbuft == WBUF_OPT_FULL && argc == 2) {
+ long l;
+ if (Jim_GetLong(interp, argv[1], &l) != JIM_OK || l <= 0) {
+ return JIM_ERR;
+ }
+ af->wbuf_limit = l;
+ }
+
+ if (af->wbuft == WBUF_OPT_NONE) {
+ if (aio_flush(interp, af) != JIM_OK) {
+ return JIM_ERR;
+ }
+ }
+ /* don't bother flushing when switching from full to line */
}
- if (af->wbuft == WBUF_OPT_NONE) {
- return aio_flush(interp, af);
+ resultObj = Jim_NewListObj(interp, NULL, 0);
+ Jim_ListAppendElement(interp, resultObj, Jim_NewStringObj(interp, options[af->wbuft], -1));
+ if (af->wbuft == WBUF_OPT_FULL) {
+ Jim_ListAppendElement(interp, resultObj, Jim_NewIntObj(interp, af->wbuf_limit));
+ }
+ Jim_SetResult(interp, resultObj);
+
+ return JIM_OK;
+}
+
+static int aio_cmd_translation(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
+{
+ enum {OPT_BINARY, OPT_TEXT};
+ static const char * const options[] = {
+ "binary",
+ "text",
+ NULL
+ };
+ int opt;
+
+ if (Jim_GetEnum(interp, argv[0], options, &opt, NULL, JIM_ERRMSG) != JIM_OK) {
+ return JIM_ERR;
+ }
+#if defined(Jim_SetMode)
+ else {
+ AioFile *af = Jim_CmdPrivData(interp);
+ Jim_SetMode(af->fd, opt == OPT_BINARY ? O_BINARY : O_TEXT);
+ }
+#endif
+ return JIM_OK;
+}
+
+static int aio_cmd_readsize(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
+{
+ AioFile *af = Jim_CmdPrivData(interp);
+
+ if (argc) {
+ long l;
+ if (Jim_GetLong(interp, argv[0], &l) != JIM_OK || l <= 0) {
+ return JIM_ERR;
+ }
+ af->rbuf_len = l;
+ if (af->rbuf) {
+ af->rbuf = Jim_Realloc(af->rbuf, af->rbuf_len);
+ }
}
- /* don't bother flushing when switching from full to line */
+ Jim_SetResultInt(interp, af->rbuf_len);
+
return JIM_OK;
}
+#ifdef jim_ext_eventloop
static int aio_cmd_timeout(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
{
#ifdef HAVE_SELECT
@@ -1633,7 +1721,6 @@ static int aio_cmd_timeout(Jim_Interp *interp, int argc, Jim_Obj *const *argv)
#endif
}
-#ifdef jim_ext_eventloop
static int aio_eventinfo(Jim_Interp *interp, AioFile * af, unsigned mask,
int argc, Jim_Obj * const *argv)
{
@@ -2107,11 +2194,25 @@ static const jim_subcmd_type aio_command_table[] = {
},
#endif
{ "buffering",
- "none|line|full",
+ "?none|line|full? ?size?",
aio_cmd_buffering,
+ 0,
+ 2,
+ /* Description: Sets or returns write buffering */
+ },
+ { "translation",
+ "binary|text",
+ aio_cmd_translation,
1,
1,
- /* Description: Sets buffering */
+ /* Description: Sets output translation mode */
+ },
+ { "readsize",
+ "?size?",
+ aio_cmd_readsize,
+ 0,
+ 1,
+ /* Description: Sets or returns read size */
},
#if defined(jim_ext_file) && defined(Jim_FileStat)
{ "stat",
@@ -2448,6 +2549,9 @@ static AioFile *JimMakeChannel(Jim_Interp *interp, int fd, Jim_Obj *filename,
/* Create an empty write buf */
af->writebuf = Jim_NewStringObj(interp, NULL, 0);
Jim_IncrRefCount(af->writebuf);
+ af->wbuf_limit = AIO_DEFAULT_WBUF_LIMIT;
+ af->rbuf_len = AIO_DEFAULT_RBUF_LEN;
+ /* Don't allocate rbuf or readbuf until we need it */
Jim_CreateCommand(interp, buf, JimAioSubCmdProc, af, JimAioDelProc);