diff options
Diffstat (limited to 'jim-aio.c')
-rw-r--r-- | jim-aio.c | 250 |
1 files changed, 177 insertions, 73 deletions
@@ -85,8 +85,8 @@ #include "jimiocompat.h" #define AIO_CMD_LEN 32 /* e.g. aio.handleXXXXXX */ -#define AIO_BUF_LEN 256 /* read size for gets, read */ -#define AIO_WBUF_FULL_SIZE (64 * 1024) /* This could be configurable */ +#define AIO_DEFAULT_RBUF_LEN 256 /* read size for gets, read */ +#define AIO_DEFAULT_WBUF_LIMIT (64 * 1024) /* max size of writebuf before flushing */ #define AIO_KEEPOPEN 1 /* don't set O_CLOEXEC, don't close on command delete */ #define AIO_NODELETE 2 /* don't delete AF_UNIX path on close */ @@ -94,6 +94,8 @@ #define AIO_WBUF_NONE 8 /* default to buffering=none */ #define AIO_NONBLOCK 16 /* socket is non-blocking */ +#define AIO_ONEREAD 32 /* passed to aio_read_len() to return after a single read */ + enum wbuftype { WBUF_OPT_NONE, /* write immediately */ WBUF_OPT_LINE, /* write if NL is seen */ @@ -114,10 +116,6 @@ enum wbuftype { #define UNIX_SOCKETS 0 #endif -#ifndef MAXPATHLEN -#define MAXPATHLEN JIM_PATH_LEN -#endif - #if defined(HAVE_SOCKETS) && !defined(JIM_BOOTSTRAP) /* Avoid type punned pointers */ union sockaddr_any { @@ -192,11 +190,22 @@ typedef struct AioFile const JimAioFopsType *fops; Jim_Obj *readbuf; /* Contains any buffered read data. NULL if empty. refcount=0 */ Jim_Obj *writebuf; /* Contains any buffered write data. refcount=1 */ + char *rbuf; /* Temporary read buffer (NULL if not yet allocated) */ + size_t rbuf_len; /* Length of rbuf */ + size_t wbuf_limit; /* Max size of writebuf before flushing */ } AioFile; +static void aio_consume(Jim_Obj *objPtr, int n); + static int stdio_writer(struct AioFile *af, const char *buf, int len) { - return write(af->fd, buf, len); + int ret = write(af->fd, buf, len); + if (ret < 0 && errno == EPIPE) { + /* Also discard the write buffer since otherwise when + * we try to flush on shutdown we may get SIGPIPE */ + aio_consume(af->writebuf, Jim_Length(af->writebuf)); + } + return ret; } static int stdio_reader(struct AioFile *af, char *buf, int len, int nb) @@ -710,7 +719,26 @@ static void aio_consume(Jim_Obj *objPtr, int n) } /* forward declaration */ -static int aio_autoflush(Jim_Interp *interp, void *clientData, int mask); +static int aio_flush(Jim_Interp *interp, AioFile *af); + +#ifdef jim_ext_eventloop +/** + * Called when the channel is writable. + * Write what we can and return -1 when the write buffer is empty to remove the handler. + */ +static int aio_autoflush(Jim_Interp *interp, void *clientData, int mask) +{ + AioFile *af = clientData; + + aio_flush(interp, af); + if (Jim_Length(af->writebuf) == 0) { + /* Done, so remove the handler */ + return -1; + } + return 0; +} +#endif + /** * Flushes af->writebuf to the channel and removes that data @@ -759,30 +787,18 @@ static int aio_flush(Jim_Interp *interp, AioFile *af) } /** - * Called when the channel is writable. - * Write what we can and return -1 when the write buffer is empty to remove the handler. - */ -static int aio_autoflush(Jim_Interp *interp, void *clientData, int mask) -{ - AioFile *af = clientData; - - aio_flush(interp, af); - if (Jim_Length(af->writebuf) == 0) { - /* Done, so remove the handler */ - return -1; - } - return 0; -} - -/** * Read until 'len' bytes are available in readbuf. * + * If flags contains AIO_NONBLOCK, indicates a nonblocking read. + * If flags contains AIO_ONEREAD, return after a single read. + * (In this case JIM_ERR is also returned on timeout) + * * If nonblocking or timeout, may return early. * 'len' may be -1 to read until eof (or until no more data if nonblocking) * * Returns JIM_OK if data was read or JIM_ERR on error. */ -static int aio_read_len(Jim_Interp *interp, AioFile *af, int nb, char *buf, size_t buflen, int neededLen) +static int aio_read_len(Jim_Interp *interp, AioFile *af, unsigned flags, int neededLen) { if (!af->readbuf) { af->readbuf = Jim_NewStringObj(interp, NULL, 0); @@ -800,20 +816,29 @@ static int aio_read_len(Jim_Interp *interp, AioFile *af, int nb, char *buf, size int readlen; if (neededLen == -1) { - readlen = AIO_BUF_LEN; + readlen = af->rbuf_len; } else { - readlen = (neededLen > AIO_BUF_LEN ? AIO_BUF_LEN : neededLen); + readlen = (neededLen > af->rbuf_len ? af->rbuf_len : neededLen); } - retval = af->fops->reader(af, buf, readlen, nb); + /* Allocate buffer if not already allocated */ + if (!af->rbuf) { + af->rbuf = Jim_Alloc(af->rbuf_len); + } + retval = af->fops->reader(af, af->rbuf, readlen, flags & AIO_NONBLOCK); if (retval > 0) { - Jim_AppendString(interp, af->readbuf, buf, retval); + if (retval) { + Jim_AppendString(interp, af->readbuf, af->rbuf, retval); + } if (neededLen != -1) { neededLen -= retval; } + if (flags & AIO_ONEREAD) { + return JIM_OK; + } continue; } - if (JimCheckStreamError(interp, af)) { + if ((flags & AIO_ONEREAD) || JimCheckStreamError(interp, af)) { return JIM_ERR; } break; @@ -892,6 +917,7 @@ static void JimAioDelProc(Jim_Interp *interp, void *privData) Jim_FreeNewObj(interp, af->readbuf); } + Jim_Free(af->rbuf); Jim_Free(af); } @@ -905,7 +931,6 @@ static int aio_cmd_read(Jim_Interp *interp, int argc, Jim_Obj *const *argv) int option; int nb; Jim_Obj *objPtr; - char buf[AIO_BUF_LEN]; if (argc) { if (*Jim_String(argv[0]) == '-') { @@ -939,7 +964,7 @@ static int aio_cmd_read(Jim_Interp *interp, int argc, Jim_Obj *const *argv) /* reads are nonblocking if a timeout is given */ nb = aio_start_nonblocking(af); - if (aio_read_len(interp, af, nb, buf, sizeof(buf), neededLen) != JIM_OK) { + if (aio_read_len(interp, af, nb ? AIO_NONBLOCK : 0, neededLen) != JIM_OK) { aio_set_nonblocking(af, nb); return JIM_ERR; } @@ -997,11 +1022,6 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv) AioFile *af = Jim_CmdPrivData(interp); jim_wide count = 0; jim_wide maxlen = JIM_WIDE_MAX; - /* Small, static buffer for small files */ - char buf[AIO_BUF_LEN]; - /* Will be allocated if the file is large */ - char *bufp = buf; - int buflen = sizeof(buf); int ok = 1; Jim_Obj *objv[4]; @@ -1031,10 +1051,10 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv) while (count < maxlen) { jim_wide len = maxlen - count; - if (len > buflen) { - len = buflen; + if (len > af->rbuf_len) { + len = af->rbuf_len; } - if (aio_read_len(interp, af, 0, bufp, buflen, len) != JIM_OK) { + if (aio_read_len(interp, af, 0, len) != JIM_OK) { ok = 0; break; } @@ -1047,17 +1067,13 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv) if (aio_eof(af)) { break; } - if (count >= 16384 && bufp == buf) { + if (count >= 16384 && af->rbuf_len < 65536) { /* Heuristic check - for large copy speed-up */ - buflen = 65536; - bufp = Jim_Alloc(buflen); + af->rbuf_len = 65536; + af->rbuf = Jim_Realloc(af->rbuf, af->rbuf_len); } } - if (bufp != buf) { - Jim_Free(bufp); - } - Jim_DecrRefCount(interp, objv[1]); Jim_DecrRefCount(interp, objv[2]); @@ -1073,10 +1089,10 @@ static int aio_cmd_copy(Jim_Interp *interp, int argc, Jim_Obj *const *argv) static int aio_cmd_gets(Jim_Interp *interp, int argc, Jim_Obj *const *argv) { AioFile *af = Jim_CmdPrivData(interp); - char buf[AIO_BUF_LEN]; Jim_Obj *objPtr = NULL; int len; int nb; + unsigned flags = AIO_ONEREAD; char *nl = NULL; int offset = 0; @@ -1084,33 +1100,33 @@ static int aio_cmd_gets(Jim_Interp *interp, int argc, Jim_Obj *const *argv) /* reads are non-blocking if a timeout has been given */ nb = aio_start_nonblocking(af); - - if (!af->readbuf) { - af->readbuf = Jim_NewStringObj(interp, NULL, 0); + if (nb) { + flags |= AIO_NONBLOCK; } while (!aio_eof(af)) { - const char *pt = Jim_GetString(af->readbuf, &len); - nl = memchr(pt + offset, '\n', len - offset); - if (nl) { - /* got a line */ - objPtr = Jim_NewStringObj(interp, pt, nl - pt); - /* And consume it plus the newline */ - aio_consume(af->readbuf, nl - pt + 1); - break; + if (af->readbuf) { + const char *pt = Jim_GetString(af->readbuf, &len); + nl = memchr(pt + offset, '\n', len - offset); + if (nl) { + /* got a line */ + objPtr = Jim_NewStringObj(interp, pt, nl - pt); + /* And consume it plus the newline */ + aio_consume(af->readbuf, nl - pt + 1); + break; + } + offset = len; } - offset = len; - len = af->fops->reader(af, buf, AIO_BUF_LEN, nb); - if (len <= 0) { + /* Not got a line yet, so read more */ + if (aio_read_len(interp, af, flags, -1) != JIM_OK) { break; } - Jim_AppendString(interp, af->readbuf, buf, len); } aio_set_nonblocking(af, nb); - if (!nl && aio_eof(af)) { + if (!nl && aio_eof(af) && af->readbuf) { /* Just take what we have as the line */ objPtr = af->readbuf; af->readbuf = NULL; @@ -1162,6 +1178,15 @@ static int aio_cmd_puts(Jim_Interp *interp, int argc, Jim_Obj *const *argv) /* Keep it simple and always go via the writebuf instead of trying to optimise * the case that we can write immediately */ +#ifdef JIM_MAINTAINER + if (Jim_IsShared(af->writebuf)) { + /* This should generally never happen since this object isn't accessible, + * but it is possible with 'debug objects' */ + Jim_DecrRefCount(interp, af->writebuf); + af->writebuf = Jim_DuplicateObj(interp, af->writebuf); + Jim_IncrRefCount(af->writebuf); + } +#endif Jim_AppendObj(interp, af->writebuf, strObj); if (nl) { Jim_AppendString(interp, af->writebuf, "\n", 1); @@ -1183,7 +1208,7 @@ static int aio_cmd_puts(Jim_Interp *interp, int argc, Jim_Obj *const *argv) break; case WBUF_OPT_FULL: - if (wlen >= AIO_WBUF_FULL_SIZE) { + if (wlen >= af->wbuf_limit) { wnow = 1; } break; @@ -1597,6 +1622,7 @@ static int aio_cmd_sync(Jim_Interp *interp, int argc, Jim_Obj *const *argv) static int aio_cmd_buffering(Jim_Interp *interp, int argc, Jim_Obj *const *argv) { AioFile *af = Jim_CmdPrivData(interp); + Jim_Obj *resultObj; static const char * const options[] = { "none", @@ -1605,17 +1631,79 @@ static int aio_cmd_buffering(Jim_Interp *interp, int argc, Jim_Obj *const *argv) NULL }; - if (Jim_GetEnum(interp, argv[0], options, &af->wbuft, NULL, JIM_ERRMSG) != JIM_OK) { - return JIM_ERR; + if (argc) { + if (Jim_GetEnum(interp, argv[0], options, &af->wbuft, NULL, JIM_ERRMSG) != JIM_OK) { + return JIM_ERR; + } + + if (af->wbuft == WBUF_OPT_FULL && argc == 2) { + long l; + if (Jim_GetLong(interp, argv[1], &l) != JIM_OK || l <= 0) { + return JIM_ERR; + } + af->wbuf_limit = l; + } + + if (af->wbuft == WBUF_OPT_NONE) { + if (aio_flush(interp, af) != JIM_OK) { + return JIM_ERR; + } + } + /* don't bother flushing when switching from full to line */ } - if (af->wbuft == WBUF_OPT_NONE) { - return aio_flush(interp, af); + resultObj = Jim_NewListObj(interp, NULL, 0); + Jim_ListAppendElement(interp, resultObj, Jim_NewStringObj(interp, options[af->wbuft], -1)); + if (af->wbuft == WBUF_OPT_FULL) { + Jim_ListAppendElement(interp, resultObj, Jim_NewIntObj(interp, af->wbuf_limit)); + } + Jim_SetResult(interp, resultObj); + + return JIM_OK; +} + +static int aio_cmd_translation(Jim_Interp *interp, int argc, Jim_Obj *const *argv) +{ + enum {OPT_BINARY, OPT_TEXT}; + static const char * const options[] = { + "binary", + "text", + NULL + }; + int opt; + + if (Jim_GetEnum(interp, argv[0], options, &opt, NULL, JIM_ERRMSG) != JIM_OK) { + return JIM_ERR; + } +#if defined(Jim_SetMode) + else { + AioFile *af = Jim_CmdPrivData(interp); + Jim_SetMode(af->fd, opt == OPT_BINARY ? O_BINARY : O_TEXT); + } +#endif + return JIM_OK; +} + +static int aio_cmd_readsize(Jim_Interp *interp, int argc, Jim_Obj *const *argv) +{ + AioFile *af = Jim_CmdPrivData(interp); + + if (argc) { + long l; + if (Jim_GetLong(interp, argv[0], &l) != JIM_OK || l <= 0) { + return JIM_ERR; + } + af->rbuf_len = l; + if (af->rbuf) { + af->rbuf = Jim_Realloc(af->rbuf, af->rbuf_len); + } } - /* don't bother flushing when switching from full to line */ + Jim_SetResultInt(interp, af->rbuf_len); + return JIM_OK; } +#ifdef jim_ext_eventloop static int aio_cmd_timeout(Jim_Interp *interp, int argc, Jim_Obj *const *argv) { #ifdef HAVE_SELECT @@ -1633,7 +1721,6 @@ static int aio_cmd_timeout(Jim_Interp *interp, int argc, Jim_Obj *const *argv) #endif } -#ifdef jim_ext_eventloop static int aio_eventinfo(Jim_Interp *interp, AioFile * af, unsigned mask, int argc, Jim_Obj * const *argv) { @@ -2107,11 +2194,25 @@ static const jim_subcmd_type aio_command_table[] = { }, #endif { "buffering", - "none|line|full", + "?none|line|full? ?size?", aio_cmd_buffering, + 0, + 2, + /* Description: Sets or returns write buffering */ + }, + { "translation", + "binary|text", + aio_cmd_translation, 1, 1, - /* Description: Sets buffering */ + /* Description: Sets output translation mode */ + }, + { "readsize", + "?size?", + aio_cmd_readsize, + 0, + 1, + /* Description: Sets or returns read size */ }, #if defined(jim_ext_file) && defined(Jim_FileStat) { "stat", @@ -2448,6 +2549,9 @@ static AioFile *JimMakeChannel(Jim_Interp *interp, int fd, Jim_Obj *filename, /* Create an empty write buf */ af->writebuf = Jim_NewStringObj(interp, NULL, 0); Jim_IncrRefCount(af->writebuf); + af->wbuf_limit = AIO_DEFAULT_WBUF_LIMIT; + af->rbuf_len = AIO_DEFAULT_RBUF_LEN; + /* Don't allocate rbuf or readbuf until we need it */ Jim_CreateCommand(interp, buf, JimAioSubCmdProc, af, JimAioDelProc); |