diff options
author | Nicolas Koenig <koenigni@gcc.gnu.org> | 2018-08-21 18:48:59 +0000 |
---|---|---|
committer | Nicolas Koenig <koenigni@gcc.gnu.org> | 2018-08-21 18:48:59 +0000 |
commit | 2b4c90656132abb8b8ad155d345c7d4fbf1687c9 (patch) | |
tree | 15008b4ee6a44100a4cee36683749d369449af7e /libgfortran/io | |
parent | 774fb6c4eb205eaf9d3b6667e7de9c90cc1784ad (diff) | |
download | gcc-2b4c90656132abb8b8ad155d345c7d4fbf1687c9.zip gcc-2b4c90656132abb8b8ad155d345c7d4fbf1687c9.tar.gz gcc-2b4c90656132abb8b8ad155d345c7d4fbf1687c9.tar.bz2 |
re PR fortran/25829 ([F03] Asynchronous IO support)
2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* gfortran.texi: Add description of asynchronous I/O.
* trans-decl.c (gfc_finish_var_decl): Treat asynchronous variables
as volatile.
* trans-io.c (gfc_build_io_library_fndecls): Rename st_wait to
st_wait_async and change argument spec from ".X" to ".w".
(gfc_trans_wait): Pass ID argument via reference.
2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* gfortran.dg/f2003_inquire_1.f03: Add write statement.
* gfortran.dg/f2003_io_1.f03: Add wait statement.
2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* Makefile.am: Add async.c to gfor_io_src.
Add async.h to gfor_io_headers.
* Makefile.in: Regenerated.
* gfortran.map: Add _gfortran_st_wait_async.
* io/async.c: New file.
* io/async.h: New file.
* io/close.c: Include async.h.
(st_close): Call async_wait for an asynchronous unit.
* io/file_pos.c (st_backspace): Likewise.
(st_endfile): Likewise.
(st_rewind): Likewise.
(st_flush): Likewise.
* io/inquire.c: Add handling for asynchronous PENDING
and ID arguments.
* io/io.h (st_parameter_dt): Add async bit.
(st_parameter_wait): Correct.
(gfc_unit): Add au pointer.
(st_wait_async): Add prototype.
(transfer_array_inner): Likewise.
(st_write_done_worker): Likewise.
* io/open.c: Include async.h.
(new_unit): Initialize asynchronous unit.
* io/transfer.c (async_opt): New struct.
(wrap_scalar_transfer): New function.
(transfer_integer): Call wrap_scalar_transfer to do the work.
(transfer_real): Likewise.
(transfer_real_write): Likewise.
(transfer_character): Likewise.
(transfer_character_wide): Likewise.
(transfer_complex): Likewise.
(transfer_array_inner): New function.
(transfer_array): Call transfer_array_inner.
(transfer_derived): Call wrap_scalar_transfer.
(data_transfer_init): Check for asynchronous I/O.
Perform a wait operation on any pending asynchronous I/O
if the data transfer is synchronous. Copy PDT and enqueue
thread for data transfer.
(st_read_done_worker): New function.
(st_read_done): Enqueue transfer or call st_read_done_worker.
(st_write_done_worker): New function.
(st_write_done): Enqueue transfer or call st_read_done_worker.
(st_wait): Document as no-op for compatibility reasons.
(st_wait_async): New function.
* io/unit.c (insert_unit): Use macros LOCK, UNLOCK and TRYLOCK;
add NOTE where necessary.
(get_gfc_unit): Likewise.
(init_units): Likewise.
(close_unit_1): Likewise. Call async_close if asynchronous.
(close_unit): Use macros LOCK and UNLOCK.
(finish_last_advance_record): Likewise.
(newunit_alloc): Likewise.
* io/unix.c (find_file): Likewise.
(flush_all_units_1): Likewise.
(flush_all_units): Likewise.
* libgfortran.h (generate_error_common): Add prototype.
* runtime/error.c: Include io.h and async.h.
(generate_error_common): New function.
2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* testsuite/libgomp.fortran/async_io_1.f90: New test.
* testsuite/libgomp.fortran/async_io_2.f90: New test.
* testsuite/libgomp.fortran/async_io_3.f90: New test.
* testsuite/libgomp.fortran/async_io_4.f90: New test.
* testsuite/libgomp.fortran/async_io_5.f90: New test.
* testsuite/libgomp.fortran/async_io_6.f90: New test.
* testsuite/libgomp.fortran/async_io_7.f90: New test.
Co-Authored-By: Thomas Koenig <tkoenig@gcc.gnu.org>
From-SVN: r263750
Diffstat (limited to 'libgfortran/io')
-rw-r--r-- | libgfortran/io/async.c | 569 | ||||
-rw-r--r-- | libgfortran/io/async.h | 400 | ||||
-rw-r--r-- | libgfortran/io/close.c | 11 | ||||
-rw-r--r-- | libgfortran/io/file_pos.c | 70 | ||||
-rw-r--r-- | libgfortran/io/inquire.c | 49 | ||||
-rw-r--r-- | libgfortran/io/io.h | 29 | ||||
-rw-r--r-- | libgfortran/io/open.c | 9 | ||||
-rw-r--r-- | libgfortran/io/read.c | 2 | ||||
-rw-r--r-- | libgfortran/io/transfer.c | 309 | ||||
-rw-r--r-- | libgfortran/io/unit.c | 54 | ||||
-rw-r--r-- | libgfortran/io/unix.c | 29 |
11 files changed, 1412 insertions, 119 deletions
diff --git a/libgfortran/io/async.c b/libgfortran/io/async.c new file mode 100644 index 0000000..a07b831 --- /dev/null +++ b/libgfortran/io/async.c @@ -0,0 +1,569 @@ +/* Copyright (C) 2018 Free Software Foundation, Inc. + Contributed by Nicolas Koenig + + This file is part of the GNU Fortran runtime library (libgfortran). + + Libgfortran is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3, or (at your option) + any later version. + + Libgfortran is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + Under Section 7 of GPL version 3, you are granted additional + permissions described in the GCC Runtime Library Exception, version + 3.1, as published by the Free Software Foundation. + + You should have received a copy of the GNU General Public License and + a copy of the GCC Runtime Library Exception along with this program; + see the files COPYING3 and COPYING.RUNTIME respectively. If not, see + <http://www.gnu.org/licenses/>. */ + +#include "libgfortran.h" + +#define _GTHREAD_USE_COND_INIT_FUNC +#include "../../libgcc/gthr.h" +#include "io.h" +#include "fbuf.h" +#include "format.h" +#include "unix.h" +#include <string.h> +#include <assert.h> + +#include <sys/types.h> + +#include "async.h" +#if ASYNC_IO + +DEBUG_LINE (__thread const char *aio_prefix = MPREFIX); + +DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;) +DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;) + +/* Current unit for asynchronous I/O. Needed for error reporting. */ + +__thread gfc_unit *thread_unit = NULL; + +/* Queue entry for the asynchronous I/O entry. */ +typedef struct transfer_queue +{ + enum aio_do type; + struct transfer_queue *next; + struct st_parameter_dt *new_pdt; + transfer_args arg; + _Bool has_id; + int read_flag; +} transfer_queue; + +struct error { + st_parameter_dt *dtp; + int id; +}; + +/* Helper function to exchange the old vs. a new PDT. */ + +static void +update_pdt (st_parameter_dt **old, st_parameter_dt *new) { + st_parameter_dt *temp; + NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit)); + temp = *old; + *old = new; + if (temp) + free (temp); +} + +/* Destroy an adv_cond structure. */ + +static void +destroy_adv_cond (struct adv_cond *ac) +{ + T_ERROR (__gthread_mutex_destroy, &ac->lock); + T_ERROR (__gthread_cond_destroy, &ac->signal); +} + +/* Function invoked as start routine for a new asynchronous I/O unit. + Contains the main loop for accepting requests and handling them. */ + +static void * +async_io (void *arg) +{ + DEBUG_LINE (aio_prefix = TPREFIX); + transfer_queue *ctq = NULL, *prev = NULL; + gfc_unit *u = (gfc_unit *) arg; + async_unit *au = u->au; + LOCK (&au->lock); + thread_unit = u; + au->thread = __gthread_self (); + while (true) + { + /* Main loop. At this point, au->lock is always held. */ + WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock); + LOCK (&au->lock); + ctq = au->head; + prev = NULL; + /* Loop over the queue entries until they are finished. */ + while (ctq) + { + if (prev) + free (prev); + prev = ctq; + if (!au->error.has_error) + { + UNLOCK (&au->lock); + + switch (ctq->type) + { + case AIO_WRITE_DONE: + NOTE ("Finalizing write"); + st_write_done_worker (au->pdt); + UNLOCK (&au->io_lock); + break; + + case AIO_READ_DONE: + NOTE ("Finalizing read"); + st_read_done_worker (au->pdt); + UNLOCK (&au->io_lock); + break; + + case AIO_DATA_TRANSFER_INIT: + NOTE ("Data transfer init"); + LOCK (&au->io_lock); + update_pdt (&au->pdt, ctq->new_pdt); + data_transfer_init_worker (au->pdt, ctq->read_flag); + break; + + case AIO_TRANSFER_SCALAR: + NOTE ("Starting scalar transfer"); + ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt, + ctq->arg.scalar.data, + ctq->arg.scalar.i, + ctq->arg.scalar.s1, + ctq->arg.scalar.s2); + break; + + case AIO_TRANSFER_ARRAY: + NOTE ("Starting array transfer"); + NOTE ("ctq->arg.array.desc = %p", + (void *) (ctq->arg.array.desc)); + transfer_array_inner (au->pdt, ctq->arg.array.desc, + ctq->arg.array.kind, + ctq->arg.array.charlen); + free (ctq->arg.array.desc); + break; + + case AIO_CLOSE: + NOTE ("Received AIO_CLOSE"); + goto finish_thread; + + default: + internal_error (NULL, "Invalid queue type"); + break; + } + LOCK (&au->lock); + if (unlikely (au->error.has_error)) + au->error.last_good_id = au->id.low - 1; + } + else + { + if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE) + { + UNLOCK (&au->io_lock); + } + else if (ctq->type == AIO_CLOSE) + { + NOTE ("Received AIO_CLOSE during error condition"); + UNLOCK (&au->lock); + goto finish_thread; + } + } + + NOTE ("Next ctq, current id: %d", au->id.low); + if (ctq->has_id && au->id.waiting == au->id.low++) + SIGNAL (&au->id.done); + + ctq = ctq->next; + } + au->tail = NULL; + au->head = NULL; + au->empty = 1; + UNLOCK (&au->lock); + SIGNAL (&au->emptysignal); + LOCK (&au->lock); + } + finish_thread: + au->tail = NULL; + au->head = NULL; + au->empty = 1; + SIGNAL (&au->emptysignal); + free (ctq); + return NULL; +} + +/* Free an asynchronous unit. */ + +static void +free_async_unit (async_unit *au) +{ + if (au->tail) + internal_error (NULL, "Trying to free nonempty asynchronous unit"); + + destroy_adv_cond (&au->work); + destroy_adv_cond (&au->emptysignal); + destroy_adv_cond (&au->id.done); + T_ERROR (__gthread_mutex_destroy, &au->lock); + free (au); +} + +/* Initialize an adv_cond structure. */ + +static void +init_adv_cond (struct adv_cond *ac) +{ + ac->pending = 0; + __GTHREAD_MUTEX_INIT_FUNCTION (&ac->lock); + __gthread_cond_init_function (&ac->signal); +} + +/* Initialize an asyncronous unit, returning zero on success, + nonzero on failure. It also sets u->au. */ + +void +init_async_unit (gfc_unit *u) +{ + async_unit *au; + if (!__gthread_active_p ()) + { + u->au = NULL; + return; + } + + au = (async_unit *) xmalloc (sizeof (async_unit)); + u->au = au; + init_adv_cond (&au->work); + init_adv_cond (&au->emptysignal); + __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock); + __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock); + LOCK (&au->lock); + T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u); + au->pdt = NULL; + au->head = NULL; + au->tail = NULL; + au->empty = true; + au->id.waiting = -1; + au->id.low = 0; + au->id.high = 0; + au->error.fatal_error = 0; + au->error.has_error = 0; + au->error.last_good_id = 0; + init_adv_cond (&au->id.done); + UNLOCK (&au->lock); +} + +/* Enqueue a transfer statement. */ + +void +enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + tq->arg = *arg; + tq->type = type; + tq->has_id = 0; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* Enqueue an st_write_done or st_read_done which contains an ID. */ + +int +enqueue_done_id (async_unit *au, enum aio_do type) +{ + int ret; + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + + tq->type = type; + tq->has_id = 1; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + ret = au->id.high++; + NOTE ("Enqueue id: %d", ret); + UNLOCK (&au->lock); + SIGNAL (&au->work); + return ret; +} + +/* Enqueue an st_write_done or st_read_done without an ID. */ + +void +enqueue_done (async_unit *au, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + tq->type = type; + tq->has_id = 0; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* Enqueue a CLOSE statement. */ + +void +enqueue_close (async_unit *au) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + + tq->type = AIO_CLOSE; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* The asynchronous unit keeps the currently active PDT around. + This function changes that to the current one. */ + +void +enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag) +{ + st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt)); + transfer_queue *tq = xmalloc (sizeof (transfer_queue)); + + memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt)); + + NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc); + NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK); + tq->next = NULL; + tq->type = AIO_DATA_TRANSFER_INIT; + tq->read_flag = read_flag; + tq->has_id = 0; + tq->new_pdt = new; + LOCK (&au->lock); + + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = 0; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* Collect the errors that may have happened asynchronously. Return true if + an error has been encountered. */ + +bool +collect_async_errors (st_parameter_common *cmp, async_unit *au) +{ + bool has_error = au->error.has_error; + + if (has_error) + { + if (generate_error_common (cmp, au->error.family, au->error.message)) + { + au->error.has_error = 0; + au->error.cmp = NULL; + } + else + { + /* The program will exit later. */ + au->error.fatal_error = true; + } + } + return has_error; +} + +/* Perform a wait operation on an asynchronous unit with an ID specified, + which means collecting the errors that may have happened asynchronously. + Return true if an error has been encountered. */ + +bool +async_wait_id (st_parameter_common *cmp, async_unit *au, int i) +{ + bool ret; + + if (au == NULL) + return false; + + if (cmp == NULL) + cmp = au->error.cmp; + + if (au->error.has_error) + { + if (i <= au->error.last_good_id) + return false; + + return collect_async_errors (cmp, au); + } + + LOCK (&au->lock); + NOTE ("Waiting for id %d", i); + if (au->id.waiting < i) + au->id.waiting = i; + UNLOCK (&au->lock); + SIGNAL (&(au->work)); + LOCK (&au->lock); + WAIT_SIGNAL_MUTEX (&(au->id.done), + (au->id.low >= au->id.waiting || au->empty), &au->lock); + LOCK (&au->lock); + ret = collect_async_errors (cmp, au); + UNLOCK (&au->lock); + return ret; +} + +/* Perform a wait operation an an asynchronous unit without an ID. */ + +bool +async_wait (st_parameter_common *cmp, async_unit *au) +{ + bool ret; + + if (au == NULL) + return false; + + if (cmp == NULL) + cmp = au->error.cmp; + + SIGNAL (&(au->work)); + LOCK (&(au->lock)); + + if (au->empty) + { + ret = collect_async_errors (cmp, au); + UNLOCK (&au->lock); + return ret; + } + + WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock); + ret = collect_async_errors (cmp, au); + return ret; +} + +/* Close an asynchronous unit. */ + +void +async_close (async_unit *au) +{ + if (au == NULL) + return; + + NOTE ("Closing async unit"); + enqueue_close (au); + T_ERROR (__gthread_join, au->thread, NULL); + free_async_unit (au); +} + +#else + +/* Only set u->au to NULL so no async I/O will happen. */ + +void +init_async_unit (gfc_unit *u) +{ + u->au = NULL; + return; +} + +/* Do-nothing function, which will not be called. */ + +void +enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type) +{ + return; +} + +/* Do-nothing function, which will not be called. */ + +int +enqueue_done_id (async_unit *au, enum aio_do type) +{ + return 0; +} + +/* Do-nothing function, which will not be called. */ + +void +enqueue_done (async_unit *au, enum aio_do type) +{ + return; +} + +/* Do-nothing function, which will not be called. */ + +void +enqueue_close (async_unit *au) +{ + return; +} + +/* Do-nothing function, which will not be called. */ + +void +enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag) +{ + return; +} + +/* Do-nothing function, which will not be called. */ + +bool +collect_async_errors (st_parameter_common *cmp, async_unit *au) +{ + return false; +} + +/* Do-nothing function, which will not be called. */ + +bool +async_wait_id (st_parameter_common *cmp, async_unit *au, int i) +{ + return false; +} + +/* Do-nothing function, which will not be called. */ + +bool +async_wait (st_parameter_common *cmp, async_unit *au) +{ + return false; +} + +/* Do-nothing function, which will not be called. */ + +void +async_close (async_unit *au) +{ + return; +} + +#endif diff --git a/libgfortran/io/async.h b/libgfortran/io/async.h new file mode 100644 index 0000000..7dfbc8b --- /dev/null +++ b/libgfortran/io/async.h @@ -0,0 +1,400 @@ +/* Copyright (C) 2018 Free Software Foundation, Inc. + Contributed by Nicolas Koenig + + This file is part of the GNU Fortran runtime library (libgfortran). + + Libgfortran is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3, or (at your option) + any later version. + + Libgfortran is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + Under Section 7 of GPL version 3, you are granted additional + permissions described in the GCC Runtime Library Exception, version + 3.1, as published by the Free Software Foundation. + + You should have received a copy of the GNU General Public License and + a copy of the GCC Runtime Library Exception along with this program; + see the files COPYING3 and COPYING.RUNTIME respectively. If not, see + <http://www.gnu.org/licenses/>. */ + +#ifndef ASYNC_H +#define ASYNC_H + +/* Async I/O will not work on targets which do not support + __gthread_cond_t and __gthread_equal / __gthread_self. Check + this. */ + +#if defined(__GTHREAD_HAS_COND) && defined(__GTHREADS_CXX0X) +#define ASYNC_IO 1 +#else +#define ASYNC_IO 0 +#endif + +/* Defining DEBUG_ASYNC will enable somewhat verbose debugging + output for async I/O. */ + +#define DEBUG_ASYNC +#undef DEBUG_ASYNC + +#ifdef DEBUG_ASYNC + +/* Define this if you want to use ANSI color escape sequences in your + debugging output. */ + +#define DEBUG_COLOR + +#ifdef DEBUG_COLOR +#define MPREFIX "\033[30;46mM:\033[0m " +#define TPREFIX "\033[37;44mT:\033[0m " +#define RPREFIX "\033[37;41mR:\033[0m " +#define DEBUG_RED "\033[31m" +#define DEBUG_ORANGE "\033[33m" +#define DEBUG_GREEN "\033[32m" +#define DEBUG_DARKRED "\033[31;2m" +#define DEBUG_PURPLE "\033[35m" +#define DEBUG_NORM "\033[0m" +#define DEBUG_REVERSE_RED "\033[41;37m" +#define DEBUG_BLUE "\033[34m" + +#else + +#define MPREFIX "M: " +#define TPREFIX "T: " +#define RPREFIX "" +#define DEBUG_RED "" +#define DEBUG_ORANGE "" +#define DEBUG_GREEN "" +#define DEBUG_DARKRED "" +#define DEBUG_PURPLE "" +#define DEBUG_NORM "" +#define DEBUG_REVERSE_RED "" +#define DEBUG_BLUE "" + +#endif + +#define DEBUG_PRINTF(...) fprintf (stderr,__VA_ARGS__) + +#define IN_DEBUG_QUEUE(mutex) ({ \ + __label__ end; \ + aio_lock_debug *curr = aio_debug_head; \ + while (curr) { \ + if (curr->m == mutex) { \ + goto end; \ + } \ + curr = curr->next; \ + } \ + end:; \ + curr; \ + }) + +#define TAIL_DEBUG_QUEUE ({ \ + aio_lock_debug *curr = aio_debug_head; \ + while (curr && curr->next) { \ + curr = curr->next; \ + } \ + curr; \ + }) + +#define CHECK_LOCK(mutex, status) do { \ + aio_lock_debug *curr; \ + INTERN_LOCK (&debug_queue_lock); \ + if (__gthread_mutex_trylock (mutex)) { \ + if ((curr = IN_DEBUG_QUEUE (mutex))) { \ + sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line); \ + } else \ + sprintf (status, DEBUG_RED "unknown" DEBUG_NORM); \ + } \ + else { \ + __gthread_mutex_unlock (mutex); \ + sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM); \ + } \ + INTERN_UNLOCK (&debug_queue_lock); \ + }while (0) + +#define T_ERROR(func, ...) do { \ + int t_error_temp; \ + t_error_temp = func(__VA_ARGS__); \ + if (t_error_temp) \ + ERROR (t_error_temp, "args: " #__VA_ARGS__ "\n"); \ + } while (0) + +#define NOTE(str, ...) do{ \ + char note_str[200]; \ + sprintf (note_str, "%s" DEBUG_PURPLE "NOTE: " DEBUG_NORM str, aio_prefix, ##__VA_ARGS__); \ + DEBUG_PRINTF ("%-90s %20s():%-5d\n", note_str, __FUNCTION__, __LINE__); \ + }while (0); + +#define ERROR(errnum, str, ...) do{ \ + char note_str[200]; \ + sprintf (note_str, "%s" DEBUG_REVERSE_RED "ERROR:" DEBUG_NORM " [%d] " str, aio_prefix, \ + errnum, ##__VA_ARGS__); \ + DEBUG_PRINTF ("%-68s %s():%-5d\n", note_str, __FUNCTION__, __LINE__); \ + }while (0) + +#define MUTEX_DEBUG_ADD(mutex) do { \ + aio_lock_debug *n; \ + n = malloc (sizeof(aio_lock_debug)); \ + n->prev = TAIL_DEBUG_QUEUE; \ + if (n->prev) \ + n->prev->next = n; \ + n->next = NULL; \ + n->line = __LINE__; \ + n->func = __FUNCTION__; \ + n->m = mutex; \ + if (!aio_debug_head) { \ + aio_debug_head = n; \ + } \ + } while (0) + +#define UNLOCK(mutex) do { \ + aio_lock_debug *curr; \ + DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_GREEN "UNLOCK: " DEBUG_NORM #mutex, \ + __FUNCTION__, __LINE__, (void *) mutex); \ + INTERN_LOCK (&debug_queue_lock); \ + curr = IN_DEBUG_QUEUE (mutex); \ + if (curr) \ + { \ + if (curr->prev) \ + curr->prev->next = curr->next; \ + if (curr->next) { \ + curr->next->prev = curr->prev; \ + if (curr == aio_debug_head) \ + aio_debug_head = curr->next; \ + } else { \ + if (curr == aio_debug_head) \ + aio_debug_head = NULL; \ + } \ + free (curr); \ + } \ + INTERN_UNLOCK (&debug_queue_lock); \ + INTERN_UNLOCK (mutex); \ + }while (0) + +#define TRYLOCK(mutex) ({ \ + char status[200]; \ + int res; \ + aio_lock_debug *curr; \ + res = __gthread_mutex_trylock (mutex); \ + INTERN_LOCK (&debug_queue_lock); \ + if (res) { \ + if ((curr = IN_DEBUG_QUEUE (mutex))) { \ + sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line); \ + } else \ + sprintf (status, DEBUG_RED "unknown" DEBUG_NORM); \ + } \ + else { \ + sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM); \ + MUTEX_DEBUG_ADD (mutex); \ + } \ + DEBUG_PRINTF ("%s%-44s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \ + DEBUG_DARKRED "TRYLOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, \ + (void *) mutex); \ + INTERN_UNLOCK (&debug_queue_lock); \ + res; \ + }) + +#define LOCK(mutex) do { \ + char status[200]; \ + CHECK_LOCK (mutex, status); \ + DEBUG_PRINTF ("%s%-42s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \ + DEBUG_RED "LOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, (void *) mutex); \ + INTERN_LOCK (mutex); \ + INTERN_LOCK (&debug_queue_lock); \ + MUTEX_DEBUG_ADD (mutex); \ + INTERN_UNLOCK (&debug_queue_lock); \ + DEBUG_PRINTF ("%s" DEBUG_RED "ACQ:" DEBUG_NORM " %-30s %78p\n", aio_prefix, #mutex, mutex); \ + } while (0) + +#define DEBUG_LINE(...) __VA_ARGS__ + +#else +#define DEBUG_PRINTF(...) {} +#define CHECK_LOCK(au, mutex, status) {} +#define NOTE(str, ...) {} +#define DEBUG_LINE(...) +#define T_ERROR(func, ...) func(__VA_ARGS__) +#define LOCK(mutex) INTERN_LOCK (mutex) +#define UNLOCK(mutex) INTERN_UNLOCK (mutex) +#define TRYLOCK(mutex) (__gthread_mutex_trylock (mutex)) +#endif + +#define INTERN_LOCK(mutex) T_ERROR (__gthread_mutex_lock, mutex); + +#define INTERN_UNLOCK(mutex) T_ERROR (__gthread_mutex_unlock, mutex); + +#if ASYNC_IO + +#define SIGNAL(advcond) do{ \ + INTERN_LOCK (&(advcond)->lock); \ + (advcond)->pending = 1; \ + DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE "SIGNAL: " DEBUG_NORM \ + #advcond, __FUNCTION__, __LINE__, (void *) advcond); \ + T_ERROR (__gthread_cond_broadcast, &(advcond)->signal); \ + INTERN_UNLOCK (&(advcond)->lock); \ + } while (0) + +#define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{ \ + __label__ finish; \ + INTERN_LOCK (&((advcond)->lock)); \ + DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_BLUE "WAITING: " DEBUG_NORM \ + #advcond, __FUNCTION__, __LINE__, (void *) advcond); \ + if ((advcond)->pending || (condition)){ \ + UNLOCK (mutex); \ + goto finish; \ + } \ + UNLOCK (mutex); \ + while (!__gthread_cond_wait(&(advcond)->signal, &(advcond)->lock)) { \ + { int cond; \ + LOCK (mutex); cond = condition; UNLOCK (mutex); \ + if (cond){ \ + DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE "REC: " DEBUG_NORM \ + #advcond, __FUNCTION__, __LINE__, (void *)advcond); \ + break; \ + } \ + } \ + } \ + finish: \ + (advcond)->pending = 0; \ + INTERN_UNLOCK (&((advcond)->lock)); \ + } while (0) + +#define REVOKE_SIGNAL(advcond) do{ \ + INTERN_LOCK (&(advcond)->lock); \ + (advcond)->pending = 0; \ + INTERN_UNLOCK (&(advcond)->lock); \ + } while (0) + +#else + +#define SIGNAL(advcond) do{} while(0) +#define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{} while(0) +#define REVOKE_SIGNAL(advcond) do{} while(0) + +#endif + +#if ASYNC_IO +DEBUG_LINE (extern __thread const char *aio_prefix); + +DEBUG_LINE (typedef struct aio_lock_debug{ + __gthread_mutex_t *m; + int line; + const char *func; + struct aio_lock_debug *next; + struct aio_lock_debug *prev; +} aio_lock_debug;) + +DEBUG_LINE (extern aio_lock_debug *aio_debug_head;) +DEBUG_LINE (extern __gthread_mutex_t debug_queue_lock;) + +/* Thread - local storage of the current unit we are looking at. Needed for + error reporting. */ + +extern __thread gfc_unit *thread_unit; +#endif + +enum aio_do { + AIO_INVALID = 0, + AIO_DATA_TRANSFER_INIT, + AIO_TRANSFER_SCALAR, + AIO_TRANSFER_ARRAY, + AIO_WRITE_DONE, + AIO_READ_DONE, + AIO_CLOSE +}; + +typedef union transfer_args +{ + struct + { + void (*transfer) (struct st_parameter_dt *, bt, void *, int, size_t, size_t); + bt arg_bt; + void *data; + int i; + size_t s1; + size_t s2; + } scalar; + struct + { + gfc_array_char *desc; + int kind; + gfc_charlen_type charlen; + } array; +} transfer_args; + +struct adv_cond +{ + int pending; + __gthread_mutex_t lock; + __gthread_cond_t signal; +}; + +typedef struct async_unit +{ + pthread_mutex_t lock; /* Lock for manipulating the queue structure. */ + pthread_mutex_t io_lock; /* Lock for doing actual I/O. */ + struct adv_cond work; + struct adv_cond emptysignal; + struct st_parameter_dt *pdt; + pthread_t thread; + struct transfer_queue *head; + struct transfer_queue *tail; + struct + { + int waiting; + int low; + int high; + struct adv_cond done; + } id; + + bool empty; + + struct { + const char *message; + st_parameter_common *cmp; + bool has_error; + int last_good_id; + int family; + bool fatal_error; + } error; + +} async_unit; + +void init_async_unit (gfc_unit *); +internal_proto (init_async_unit); + +bool async_wait (st_parameter_common *, async_unit *); +internal_proto (async_wait); + +bool async_wait_id (st_parameter_common *, async_unit *, int); +internal_proto (async_wait_id); + +bool collect_async_errors (st_parameter_common *, async_unit *); +internal_proto (collect_async_errors); + +void async_close (async_unit *); +internal_proto (async_close); + +void enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do); +internal_proto (enqueue_transfer); + +void enqueue_done (async_unit *, enum aio_do type); +internal_proto (enqueue_done); + +int enqueue_done_id (async_unit *, enum aio_do type); +internal_proto (enqueue_done_id); + +void enqueue_init (async_unit *); +internal_proto (enqueue_init); + +void enqueue_data_transfer_init (async_unit *, st_parameter_dt *, int); +internal_proto (enqueue_data_transfer_init); + +void enqueue_close (async_unit *); +internal_proto (enqueue_close); + +#endif diff --git a/libgfortran/io/close.c b/libgfortran/io/close.c index 2117c40..cec7a08 100644 --- a/libgfortran/io/close.c +++ b/libgfortran/io/close.c @@ -24,6 +24,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see #include "io.h" #include "unix.h" +#include "async.h" #include <limits.h> typedef enum @@ -57,13 +58,21 @@ st_close (st_parameter_close *clp) find_option (&clp->common, clp->status, clp->status_len, status_opt, "Bad STATUS parameter in CLOSE statement"); + u = find_unit (clp->common.unit); + + if (ASYNC_IO && u && u->au) + if (async_wait (&(clp->common), u->au)) + { + library_end (); + return; + } + if ((clp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) { library_end (); return; } - u = find_unit (clp->common.unit); if (u != NULL) { if (close_share (u) < 0) diff --git a/libgfortran/io/file_pos.c b/libgfortran/io/file_pos.c index 75f58f0..b6bcecc 100644 --- a/libgfortran/io/file_pos.c +++ b/libgfortran/io/file_pos.c @@ -25,6 +25,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see #include "io.h" #include "fbuf.h" #include "unix.h" +#include "async.h" #include <string.h> /* file_pos.c-- Implement the file positioning statements, i.e. BACKSPACE, @@ -187,6 +188,7 @@ void st_backspace (st_parameter_filepos *fpp) { gfc_unit *u; + bool needs_unlock = false; library_start (&fpp->common); @@ -214,6 +216,17 @@ st_backspace (st_parameter_filepos *fpp) goto done; } + if (ASYNC_IO && u->au) + { + if (async_wait (&(fpp->common), u->au)) + return; + else + { + needs_unlock = true; + LOCK (&u->au->io_lock); + } + } + /* Make sure format buffer is flushed and reset. */ if (u->flags.form == FORM_FORMATTED) { @@ -267,7 +280,12 @@ st_backspace (st_parameter_filepos *fpp) done: if (u != NULL) - unlock_unit (u); + { + unlock_unit (u); + + if (ASYNC_IO && u->au && needs_unlock) + UNLOCK (&u->au->io_lock); + } library_end (); } @@ -280,6 +298,7 @@ void st_endfile (st_parameter_filepos *fpp) { gfc_unit *u; + bool needs_unlock = false; library_start (&fpp->common); @@ -294,6 +313,17 @@ st_endfile (st_parameter_filepos *fpp) goto done; } + if (ASYNC_IO && u->au) + { + if (async_wait (&(fpp->common), u->au)) + return; + else + { + needs_unlock = true; + LOCK (&u->au->io_lock); + } + } + if (u->flags.access == ACCESS_SEQUENTIAL && u->endfile == AFTER_ENDFILE) { @@ -376,8 +406,11 @@ st_endfile (st_parameter_filepos *fpp) } } - done: - unlock_unit (u); + done: + if (ASYNC_IO && u->au && needs_unlock) + UNLOCK (&u->au->io_lock); + + unlock_unit (u); library_end (); } @@ -390,6 +423,7 @@ void st_rewind (st_parameter_filepos *fpp) { gfc_unit *u; + bool needs_unlock = true; library_start (&fpp->common); @@ -401,6 +435,17 @@ st_rewind (st_parameter_filepos *fpp) "Cannot REWIND a file opened for DIRECT access"); else { + if (ASYNC_IO && u->au) + { + if (async_wait (&(fpp->common), u->au)) + return; + else + { + needs_unlock = true; + LOCK (&u->au->io_lock); + } + } + /* If there are previously written bytes from a write with ADVANCE="no", add a record marker before performing the ENDFILE. */ @@ -436,6 +481,10 @@ st_rewind (st_parameter_filepos *fpp) } /* Update position for INQUIRE. */ u->flags.position = POSITION_REWIND; + + if (ASYNC_IO && u->au && needs_unlock) + UNLOCK (&u->au->io_lock); + unlock_unit (u); } @@ -450,12 +499,24 @@ void st_flush (st_parameter_filepos *fpp) { gfc_unit *u; + bool needs_unlock = false; library_start (&fpp->common); u = find_unit (fpp->common.unit); if (u != NULL) { + if (ASYNC_IO && u->au) + { + if (async_wait (&(fpp->common), u->au)) + return; + else + { + needs_unlock = true; + LOCK (&u->au->io_lock); + } + } + /* Make sure format buffer is flushed. */ if (u->flags.form == FORM_FORMATTED) fbuf_flush (u, u->mode); @@ -469,5 +530,8 @@ st_flush (st_parameter_filepos *fpp) generate_error (&fpp->common, LIBERROR_BAD_OPTION, "Specified UNIT in FLUSH is not connected"); + if (needs_unlock) + UNLOCK (&u->au->io_lock); + library_end (); } diff --git a/libgfortran/io/inquire.c b/libgfortran/io/inquire.c index 047be39..7c96e5f 100644 --- a/libgfortran/io/inquire.c +++ b/libgfortran/io/inquire.c @@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see /* Implement the non-IOLENGTH variant of the INQUIRY statement */ #include "io.h" +#include "async.h" #include "unix.h" #include <string.h> @@ -281,12 +282,6 @@ inquire_via_unit (st_parameter_inquire *iqp, gfc_unit *u) { GFC_INTEGER_4 cf2 = iqp->flags2; - if ((cf2 & IOPARM_INQUIRE_HAS_PENDING) != 0) - *iqp->pending = 0; - - if ((cf2 & IOPARM_INQUIRE_HAS_ID) != 0) - *iqp->id = 0; - if ((cf2 & IOPARM_INQUIRE_HAS_ENCODING) != 0) { if (u == NULL || u->flags.form != FORM_FORMATTED) @@ -332,21 +327,43 @@ inquire_via_unit (st_parameter_inquire *iqp, gfc_unit *u) if (u == NULL) p = undefined; else - switch (u->flags.async) { - case ASYNC_YES: - p = yes; - break; - case ASYNC_NO: - p = no; - break; - default: - internal_error (&iqp->common, "inquire_via_unit(): Bad async"); + switch (u->flags.async) + { + case ASYNC_YES: + p = yes; + break; + case ASYNC_NO: + p = no; + break; + default: + internal_error (&iqp->common, "inquire_via_unit(): Bad async"); + } } - cf_strcpy (iqp->asynchronous, iqp->asynchronous_len, p); } + if ((cf2 & IOPARM_INQUIRE_HAS_PENDING) != 0) + { + if (!ASYNC_IO || u->au == NULL) + *(iqp->pending) = 0; + else + { + LOCK (&(u->au->lock)); + if ((cf2 & IOPARM_INQUIRE_HAS_ID) != 0) + { + int id; + id = *(iqp->id); + *(iqp->pending) = id > u->au->id.low; + } + else + { + *(iqp->pending) = ! u->au->empty; + } + UNLOCK (&(u->au->lock)); + } + } + if ((cf2 & IOPARM_INQUIRE_HAS_SIGN) != 0) { if (u == NULL) diff --git a/libgfortran/io/io.h b/libgfortran/io/io.h index ccbaf47..d312131 100644 --- a/libgfortran/io/io.h +++ b/libgfortran/io/io.h @@ -531,7 +531,9 @@ typedef struct st_parameter_dt /* A flag used to identify when a non-standard expanded namelist read has occurred. */ unsigned expanded_read : 1; - /* 13 unused bits. */ + /* Flag to indicate if the statement has async="YES". */ + unsigned async : 1; + /* 12 unused bits. */ int child_saved_iostat; int nml_delim; @@ -590,7 +592,7 @@ extern char check_st_parameter_dt[sizeof (((st_parameter_dt *) 0)->u.pad) typedef struct { st_parameter_common common; - CHARACTER1 (id); + GFC_INTEGER_4 *id; } st_parameter_wait; @@ -659,6 +661,9 @@ typedef struct gfc_unit int continued; + /* Contains the pointer to the async unit. */ + struct async_unit *au; + __gthread_mutex_t lock; /* Number of threads waiting to acquire this unit's lock. When non-zero, close_unit doesn't only removes the unit @@ -815,11 +820,18 @@ extern void next_record (st_parameter_dt *, int); internal_proto(next_record); extern void st_wait (st_parameter_wait *); -export_proto(st_wait); +export_proto (st_wait); + +extern void st_wait_async (st_parameter_wait *); +export_proto (st_wait_async); extern void hit_eof (st_parameter_dt *); internal_proto(hit_eof); +extern void transfer_array_inner (st_parameter_dt *, gfc_array_char *, int, + gfc_charlen_type); +internal_proto (transfer_array_inner); + /* read.c */ extern void set_integer (void *, GFC_INTEGER_LARGEST, int); @@ -988,3 +1000,14 @@ memset4 (gfc_char4_t *p, gfc_char4_t c, int k) #endif +extern void +st_write_done_worker (st_parameter_dt *); +internal_proto (st_write_done_worker); + +extern void +st_read_done_worker (st_parameter_dt *); +internal_proto (st_read_done_worker); + +extern void +data_transfer_init_worker (st_parameter_dt *, int); +internal_proto (data_transfer_init_worker); diff --git a/libgfortran/io/open.c b/libgfortran/io/open.c index 05aac8f..2660338 100644 --- a/libgfortran/io/open.c +++ b/libgfortran/io/open.c @@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see #include "io.h" #include "fbuf.h" #include "unix.h" +#include "async.h" #ifdef HAVE_UNISTD_H #include <unistd.h> @@ -651,8 +652,12 @@ new_unit (st_parameter_open *opp, gfc_unit *u, unit_flags *flags) else u->fbuf = NULL; - - + /* Check if asynchrounous. */ + if (flags->async == ASYNC_YES) + init_async_unit (u); + else + u->au = NULL; + return u; cleanup: diff --git a/libgfortran/io/read.c b/libgfortran/io/read.c index 976020a..f972858 100644 --- a/libgfortran/io/read.c +++ b/libgfortran/io/read.c @@ -30,6 +30,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see #include <string.h> #include <ctype.h> #include <assert.h> +#include "async.h" typedef unsigned char uchar; @@ -42,6 +43,7 @@ typedef unsigned char uchar; void set_integer (void *dest, GFC_INTEGER_LARGEST value, int length) { + NOTE ("set_integer: %lld %p", (long long int) value, dest); switch (length) { #ifdef HAVE_GFC_INTEGER_16 diff --git a/libgfortran/io/transfer.c b/libgfortran/io/transfer.c index df33bed..31198a3 100644 --- a/libgfortran/io/transfer.c +++ b/libgfortran/io/transfer.c @@ -31,6 +31,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see #include "fbuf.h" #include "format.h" #include "unix.h" +#include "async.h" #include <string.h> #include <errno.h> @@ -184,6 +185,12 @@ static const st_option pad_opt[] = { {NULL, 0} }; +static const st_option async_opt[] = { + {"yes", ASYNC_YES}, + {"no", ASYNC_NO}, + {NULL, 0} +}; + typedef enum { FORMATTED_SEQUENTIAL, UNFORMATTED_SEQUENTIAL, FORMATTED_DIRECT, UNFORMATTED_DIRECT, FORMATTED_STREAM, UNFORMATTED_STREAM @@ -1594,7 +1601,8 @@ formatted_transfer_scalar_read (st_parameter_dt *dtp, bt type, void *p, int kind read_f (dtp, f, p, kind); break; default: - internal_error (&dtp->common, "formatted_transfer(): Bad type"); + internal_error (&dtp->common, + "formatted_transfer (): Bad type"); } break; @@ -2066,7 +2074,7 @@ formatted_transfer_scalar_write (st_parameter_dt *dtp, bt type, void *p, int kin break; default: internal_error (&dtp->common, - "formatted_transfer(): Bad type"); + "formatted_transfer (): Bad type"); } break; @@ -2281,6 +2289,38 @@ formatted_transfer (st_parameter_dt *dtp, bt type, void *p, int kind, } } +/* Wrapper function for I/O of scalar types. If this should be an async I/O + request, queue it. For a synchronous write on an async unit, perform the + wait operation and return an error. For all synchronous writes, call the + right transfer function. */ + +static void +wrap_scalar_transfer (st_parameter_dt *dtp, bt type, void *p, int kind, + size_t size, size_t n_elem) +{ + if (dtp->u.p.current_unit && dtp->u.p.current_unit->au) + { + if (dtp->u.p.async) + { + transfer_args args; + args.scalar.transfer = dtp->u.p.transfer; + args.scalar.arg_bt = type; + args.scalar.data = p; + args.scalar.i = kind; + args.scalar.s1 = size; + args.scalar.s2 = n_elem; + enqueue_transfer (dtp->u.p.current_unit->au, &args, + AIO_TRANSFER_SCALAR); + return; + } + } + /* Come here if there was no asynchronous I/O to be scheduled. */ + if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) + return; + + dtp->u.p.transfer (dtp, type, p, kind, size, 1); +} + /* Data transfer entry points. The type of the data entity is implicit in the subroutine call. This prevents us from having to @@ -2289,9 +2329,7 @@ formatted_transfer (st_parameter_dt *dtp, bt type, void *p, int kind, void transfer_integer (st_parameter_dt *dtp, void *p, int kind) { - if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) - return; - dtp->u.p.transfer (dtp, BT_INTEGER, p, kind, kind, 1); + wrap_scalar_transfer (dtp, BT_INTEGER, p, kind, kind, 1); } void @@ -2307,7 +2345,7 @@ transfer_real (st_parameter_dt *dtp, void *p, int kind) if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) return; size = size_from_real_kind (kind); - dtp->u.p.transfer (dtp, BT_REAL, p, kind, size, 1); + wrap_scalar_transfer (dtp, BT_REAL, p, kind, size, 1); } void @@ -2319,9 +2357,7 @@ transfer_real_write (st_parameter_dt *dtp, void *p, int kind) void transfer_logical (st_parameter_dt *dtp, void *p, int kind) { - if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) - return; - dtp->u.p.transfer (dtp, BT_LOGICAL, p, kind, kind, 1); + wrap_scalar_transfer (dtp, BT_LOGICAL, p, kind, kind, 1); } void @@ -2345,7 +2381,7 @@ transfer_character (st_parameter_dt *dtp, void *p, gfc_charlen_type len) p = empty_string; /* Set kind here to 1. */ - dtp->u.p.transfer (dtp, BT_CHARACTER, p, 1, len, 1); + wrap_scalar_transfer (dtp, BT_CHARACTER, p, 1, len, 1); } void @@ -2369,7 +2405,7 @@ transfer_character_wide (st_parameter_dt *dtp, void *p, gfc_charlen_type len, in p = empty_string; /* Here we pass the actual kind value. */ - dtp->u.p.transfer (dtp, BT_CHARACTER, p, kind, len, 1); + wrap_scalar_transfer (dtp, BT_CHARACTER, p, kind, len, 1); } void @@ -2385,7 +2421,7 @@ transfer_complex (st_parameter_dt *dtp, void *p, int kind) if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) return; size = size_from_complex_kind (kind); - dtp->u.p.transfer (dtp, BT_COMPLEX, p, kind, size, 1); + wrap_scalar_transfer (dtp, BT_COMPLEX, p, kind, size, 1); } void @@ -2395,8 +2431,8 @@ transfer_complex_write (st_parameter_dt *dtp, void *p, int kind) } void -transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind, - gfc_charlen_type charlen) +transfer_array_inner (st_parameter_dt *dtp, gfc_array_char *desc, int kind, + gfc_charlen_type charlen) { index_type count[GFC_MAX_DIMENSIONS]; index_type extent[GFC_MAX_DIMENSIONS]; @@ -2407,7 +2443,7 @@ transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind, bt iotype; /* Adjust item_count before emitting error message. */ - + if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) return; @@ -2471,6 +2507,36 @@ transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind, } void +transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind, + gfc_charlen_type charlen) +{ + if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) + return; + + if (dtp->u.p.current_unit && dtp->u.p.current_unit->au) + { + if (dtp->u.p.async) + { + transfer_args args; + size_t sz = sizeof (gfc_array_char) + + sizeof (descriptor_dimension) + * GFC_DESCRIPTOR_RANK (desc); + args.array.desc = xmalloc (sz); + NOTE ("desc = %p", (void *) args.array.desc); + memcpy (args.array.desc, desc, sz); + args.array.kind = kind; + args.array.charlen = charlen; + enqueue_transfer (dtp->u.p.current_unit->au, &args, + AIO_TRANSFER_ARRAY); + return; + } + } + /* Come here if there was no asynchronous I/O to be scheduled. */ + transfer_array_inner (dtp, desc, kind, charlen); +} + + +void transfer_array_write (st_parameter_dt *dtp, gfc_array_char *desc, int kind, gfc_charlen_type charlen) { @@ -2492,7 +2558,7 @@ transfer_derived (st_parameter_dt *parent, void *dtio_source, void *dtio_proc) else parent->u.p.fdtio_ptr = (formatted_dtio) dtio_proc; } - parent->u.p.transfer (parent, BT_CLASS, dtio_source, 0, 0, 1); + wrap_scalar_transfer (parent, BT_CLASS, dtio_source, 0, 0, 1); } @@ -2667,6 +2733,9 @@ data_transfer_init (st_parameter_dt *dtp, int read_flag) unit_flags u_flags; /* Used for creating a unit if needed. */ GFC_INTEGER_4 cf = dtp->common.flags; namelist_info *ionml; + async_unit *au; + + NOTE ("data_transfer_init"); ionml = ((cf & IOPARM_DT_IONML_SET) != 0) ? dtp->u.p.ionml : NULL; @@ -2693,9 +2762,9 @@ data_transfer_init (st_parameter_dt *dtp, int read_flag) } else if (dtp->u.p.current_unit->s == NULL) { /* Open the unit with some default flags. */ - st_parameter_open opp; - unit_convert conv; - + st_parameter_open opp; + unit_convert conv; + NOTE ("Open the unit with some default flags."); memset (&u_flags, '\0', sizeof (u_flags)); u_flags.access = ACCESS_SEQUENTIAL; u_flags.action = ACTION_READWRITE; @@ -2770,6 +2839,42 @@ data_transfer_init (st_parameter_dt *dtp, int read_flag) else if (dtp->u.p.current_unit->internal_unit_kind > 0) dtp->u.p.unit_is_internal = 1; + if ((cf & IOPARM_DT_HAS_ASYNCHRONOUS) != 0) + { + int f; + f = find_option (&dtp->common, dtp->asynchronous, dtp->asynchronous_len, + async_opt, "Bad ASYNCHRONOUS in data transfer " + "statement"); + if (f == ASYNC_YES && dtp->u.p.current_unit->flags.async != ASYNC_YES) + { + generate_error (&dtp->common, LIBERROR_OPTION_CONFLICT, + "ASYNCHRONOUS transfer without " + "ASYHCRONOUS='YES' in OPEN"); + return; + } + dtp->u.p.async = f == ASYNC_YES; + } + + au = dtp->u.p.current_unit->au; + if (au) + { + if (dtp->u.p.async) + { + /* If this is an asynchronous I/O statement, collect errors and + return if there are any. */ + if (collect_async_errors (&dtp->common, au)) + return; + } + else + { + /* Synchronous statement: Perform a wait operation for any pending + asynchronous I/O. This needs to be done before all other error + checks. See F2008, 9.6.4.1. */ + if (async_wait (&(dtp->common), au)) + return; + } + } + /* Check the action. */ if (read_flag && dtp->u.p.current_unit->flags.action == ACTION_WRITE) @@ -3009,6 +3114,57 @@ data_transfer_init (st_parameter_dt *dtp, int read_flag) if (dtp->u.p.current_unit->pad_status == PAD_UNSPECIFIED) dtp->u.p.current_unit->pad_status = dtp->u.p.current_unit->flags.pad; + /* Set up the subroutine that will handle the transfers. */ + + if (read_flag) + { + if (dtp->u.p.current_unit->flags.form == FORM_UNFORMATTED) + dtp->u.p.transfer = unformatted_read; + else + { + if ((cf & IOPARM_DT_LIST_FORMAT) != 0) + dtp->u.p.transfer = list_formatted_read; + else + dtp->u.p.transfer = formatted_transfer; + } + } + else + { + if (dtp->u.p.current_unit->flags.form == FORM_UNFORMATTED) + dtp->u.p.transfer = unformatted_write; + else + { + if ((cf & IOPARM_DT_LIST_FORMAT) != 0) + dtp->u.p.transfer = list_formatted_write; + else + dtp->u.p.transfer = formatted_transfer; + } + } + + if (au) + { + NOTE ("enqueue_data_transfer"); + enqueue_data_transfer_init (au, dtp, read_flag); + } + else + { + NOTE ("invoking data_transfer_init_worker"); + data_transfer_init_worker (dtp, read_flag); + } +} + +void +data_transfer_init_worker (st_parameter_dt *dtp, int read_flag) +{ + GFC_INTEGER_4 cf = dtp->common.flags; + + NOTE ("starting worker..."); + + if (read_flag && dtp->u.p.current_unit->flags.form != FORM_UNFORMATTED + && ((cf & IOPARM_DT_LIST_FORMAT) != 0) + && dtp->u.p.current_unit->child_dtio == 0) + dtp->u.p.current_unit->last_char = EOF - 1; + /* Check to see if we might be reading what we wrote before */ if (dtp->u.p.mode != dtp->u.p.current_unit->mode @@ -3135,38 +3291,6 @@ data_transfer_init (st_parameter_dt *dtp, int read_flag) pre_position (dtp); - - /* Set up the subroutine that will handle the transfers. */ - - if (read_flag) - { - if (dtp->u.p.current_unit->flags.form == FORM_UNFORMATTED) - dtp->u.p.transfer = unformatted_read; - else - { - if ((cf & IOPARM_DT_LIST_FORMAT) != 0) - { - if (dtp->u.p.current_unit->child_dtio == 0) - dtp->u.p.current_unit->last_char = EOF - 1; - dtp->u.p.transfer = list_formatted_read; - } - else - dtp->u.p.transfer = formatted_transfer; - } - } - else - { - if (dtp->u.p.current_unit->flags.form == FORM_UNFORMATTED) - dtp->u.p.transfer = unformatted_write; - else - { - if ((cf & IOPARM_DT_LIST_FORMAT) != 0) - dtp->u.p.transfer = list_formatted_write; - else - dtp->u.p.transfer = formatted_transfer; - } - } - /* Make sure that we don't do a read after a nonadvancing write. */ if (read_flag) @@ -4099,7 +4223,7 @@ extern void st_read_done (st_parameter_dt *); export_proto(st_read_done); void -st_read_done (st_parameter_dt *dtp) +st_read_done_worker (st_parameter_dt *dtp) { finalize_transfer (dtp); @@ -4127,6 +4251,30 @@ st_read_done (st_parameter_dt *dtp) free_format_data (dtp->u.p.fmt); free_format (dtp); } + } +} + +void +st_read_done (st_parameter_dt *dtp) +{ + if (dtp->u.p.current_unit) + { + if (dtp->u.p.current_unit->au) + { + if (dtp->common.flags & IOPARM_DT_HAS_ID) + *dtp->id = enqueue_done_id (dtp->u.p.current_unit->au, AIO_READ_DONE); + else + { + enqueue_done (dtp->u.p.current_unit->au, AIO_READ_DONE); + /* An asynchronous unit without ASYNCHRONOUS="YES" - make this + synchronous by performing a wait operation. */ + if (!dtp->u.p.async) + async_wait (&dtp->common, dtp->u.p.current_unit->au); + } + } + else + st_read_done_worker (dtp); + unlock_unit (dtp->u.p.current_unit); } @@ -4134,7 +4282,7 @@ st_read_done (st_parameter_dt *dtp) } extern void st_write (st_parameter_dt *); -export_proto(st_write); +export_proto (st_write); void st_write (st_parameter_dt *dtp) @@ -4143,11 +4291,9 @@ st_write (st_parameter_dt *dtp) data_transfer_init (dtp, 0); } -extern void st_write_done (st_parameter_dt *); -export_proto(st_write_done); void -st_write_done (st_parameter_dt *dtp) +st_write_done_worker (st_parameter_dt *dtp) { finalize_transfer (dtp); @@ -4196,16 +4342,65 @@ st_write_done (st_parameter_dt *dtp) free_format_data (dtp->u.p.fmt); free_format (dtp); } + } +} + +extern void st_write_done (st_parameter_dt *); +export_proto(st_write_done); + +void +st_write_done (st_parameter_dt *dtp) +{ + if (dtp->u.p.current_unit) + { + if (dtp->u.p.current_unit->au) + { + if (dtp->common.flags & IOPARM_DT_HAS_ID) + *dtp->id = enqueue_done_id (dtp->u.p.current_unit->au, + AIO_WRITE_DONE); + else + { + enqueue_done (dtp->u.p.current_unit->au, AIO_WRITE_DONE); + /* An asynchronous unit without ASYNCHRONOUS="YES" - make this + synchronous by performing a wait operation. */ + if (!dtp->u.p.async) + async_wait (&dtp->common, dtp->u.p.current_unit->au); + } + } + else + st_write_done_worker (dtp); + unlock_unit (dtp->u.p.current_unit); } + library_end (); } +/* Wait operation. We need to keep around the do-nothing version + of st_wait for compatibility with previous versions, which had marked + the argument as unused (and thus liable to be removed). + + TODO: remove at next bump in version number. */ -/* F2003: This is a stub for the runtime portion of the WAIT statement. */ void st_wait (st_parameter_wait *wtp __attribute__((unused))) { + return; +} + +void +st_wait_async (st_parameter_wait *wtp) +{ + gfc_unit *u = find_unit (wtp->common.unit); + if (ASYNC_IO && u->au) + { + if (wtp->common.flags & IOPARM_WAIT_HAS_ID) + async_wait_id (&(wtp->common), u->au, *wtp->id); + else + async_wait (&(wtp->common), u->au); + } + + unlock_unit (u); } diff --git a/libgfortran/io/unit.c b/libgfortran/io/unit.c index 559dba9..985569a 100644 --- a/libgfortran/io/unit.c +++ b/libgfortran/io/unit.c @@ -27,6 +27,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see #include "fbuf.h" #include "format.h" #include "unix.h" +#include "async.h" #include <string.h> #include <assert.h> @@ -240,7 +241,7 @@ insert_unit (int n) #else __GTHREAD_MUTEX_INIT_FUNCTION (&u->lock); #endif - __gthread_mutex_lock (&u->lock); + LOCK (&u->lock); u->priority = pseudo_random (); unit_root = insert (u, unit_root); return u; @@ -327,7 +328,9 @@ get_gfc_unit (int n, int do_create) gfc_unit *p; int c, created = 0; - __gthread_mutex_lock (&unit_lock); + NOTE ("Unit n=%d, do_create = %d", n, do_create); + LOCK (&unit_lock); + retry: for (c = 0; c < CACHE_SIZE; c++) if (unit_cache[c] != NULL && unit_cache[c]->unit_number == n) @@ -366,7 +369,7 @@ retry: { /* Newly created units have their lock held already from insert_unit. Just unlock UNIT_LOCK and return. */ - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return p; } @@ -374,10 +377,10 @@ found: if (p != NULL && (p->child_dtio == 0)) { /* Fast path. */ - if (! __gthread_mutex_trylock (&p->lock)) + if (! TRYLOCK (&p->lock)) { /* assert (p->closed == 0); */ - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return p; } @@ -385,15 +388,15 @@ found: } - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); if (p != NULL && (p->child_dtio == 0)) { - __gthread_mutex_lock (&p->lock); + LOCK (&p->lock); if (p->closed) { - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&p->lock); + LOCK (&unit_lock); + UNLOCK (&p->lock); if (predec_waiting_locked (p) == 0) destroy_unit_mutex (p); goto retry; @@ -640,7 +643,7 @@ init_units (void) fbuf_init (u, 0); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } if (options.stdout_unit >= 0) @@ -671,7 +674,7 @@ init_units (void) fbuf_init (u, 0); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } if (options.stderr_unit >= 0) @@ -702,13 +705,13 @@ init_units (void) fbuf_init (u, 256); /* 256 bytes should be enough, probably not doing any kind of exotic formatting to stderr. */ - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } /* The default internal units. */ u = insert_unit (GFC_INTERNAL_UNIT); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); u = insert_unit (GFC_INTERNAL_UNIT4); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } @@ -717,6 +720,9 @@ close_unit_1 (gfc_unit *u, int locked) { int i, rc; + if (ASYNC_IO && u->au) + async_close (u->au); + /* If there are previously written bytes from a write with ADVANCE="no" Reposition the buffer before closing. */ if (u->previous_nonadvancing_write) @@ -726,7 +732,7 @@ close_unit_1 (gfc_unit *u, int locked) u->closed = 1; if (!locked) - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); for (i = 0; i < CACHE_SIZE; i++) if (unit_cache[i] == u) @@ -744,7 +750,7 @@ close_unit_1 (gfc_unit *u, int locked) newunit_free (u->unit_number); if (!locked) - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); /* If there are any threads waiting in find_unit for this unit, avoid freeing the memory, the last such thread will free it @@ -753,7 +759,7 @@ close_unit_1 (gfc_unit *u, int locked) destroy_unit_mutex (u); if (!locked) - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return rc; } @@ -761,7 +767,9 @@ close_unit_1 (gfc_unit *u, int locked) void unlock_unit (gfc_unit *u) { - __gthread_mutex_unlock (&u->lock); + NOTE ("unlock_unit = %d", u->unit_number); + UNLOCK (&u->lock); + NOTE ("unlock_unit done"); } /* close_unit()-- Close a unit. The stream is closed, and any memory @@ -785,10 +793,10 @@ close_unit (gfc_unit *u) void close_units (void) { - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); while (unit_root != NULL) close_unit_1 (unit_root, 1); - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); free (newunits); @@ -895,7 +903,7 @@ finish_last_advance_record (gfc_unit *u) int newunit_alloc (void) { - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); if (!newunits) { newunits = xcalloc (16, 1); @@ -909,7 +917,7 @@ newunit_alloc (void) { newunits[ii] = true; newunit_lwi = ii + 1; - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return -ii + NEWUNIT_START; } } @@ -922,7 +930,7 @@ newunit_alloc (void) memset (newunits + old_size, 0, old_size); newunits[old_size] = true; newunit_lwi = old_size + 1; - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return -old_size + NEWUNIT_START; } diff --git a/libgfortran/io/unix.c b/libgfortran/io/unix.c index a8fd07a..4a133fd 100644 --- a/libgfortran/io/unix.c +++ b/libgfortran/io/unix.c @@ -27,6 +27,7 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see #include "io.h" #include "unix.h" +#include "async.h" #include <limits.h> #ifdef HAVE_UNISTD_H @@ -1742,7 +1743,7 @@ find_file (const char *file, gfc_charlen_type file_len) id = id_from_path (path); #endif - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); retry: u = find_file0 (unit_root, FIND_FILE0_ARGS); if (u != NULL) @@ -1751,20 +1752,20 @@ retry: if (! __gthread_mutex_trylock (&u->lock)) { /* assert (u->closed == 0); */ - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); goto done; } inc_waiting_locked (u); } - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); if (u != NULL) { - __gthread_mutex_lock (&u->lock); + LOCK (&u->lock); if (u->closed) { - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&u->lock); + LOCK (&unit_lock); + UNLOCK (&u->lock); if (predec_waiting_locked (u) == 0) free (u); goto retry; @@ -1794,7 +1795,7 @@ flush_all_units_1 (gfc_unit *u, int min_unit) return u; if (u->s) sflush (u->s); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } u = u->right; } @@ -1807,31 +1808,31 @@ flush_all_units (void) gfc_unit *u; int min_unit = 0; - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); do { u = flush_all_units_1 (unit_root, min_unit); if (u != NULL) inc_waiting_locked (u); - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); if (u == NULL) return; - __gthread_mutex_lock (&u->lock); + LOCK (&u->lock); min_unit = u->unit_number + 1; if (u->closed == 0) { sflush (u->s); - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&u->lock); + LOCK (&unit_lock); + UNLOCK (&u->lock); (void) predec_waiting_locked (u); } else { - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&u->lock); + LOCK (&unit_lock); + UNLOCK (&u->lock); if (predec_waiting_locked (u) == 0) free (u); } |