diff options
author | Nicolas Koenig <koenigni@gcc.gnu.org> | 2018-07-25 18:48:39 +0000 |
---|---|---|
committer | Nicolas Koenig <koenigni@gcc.gnu.org> | 2018-07-25 18:48:39 +0000 |
commit | b1f45884f676cd5bde92d6babac2e8d44629f5ac (patch) | |
tree | d02f3de667db3dd45796c412b0481a9661283c83 /libgfortran/io/async.c | |
parent | 16d0ab7f441a21b93fe28c5bd55fc32672ad510b (diff) | |
download | gcc-b1f45884f676cd5bde92d6babac2e8d44629f5ac.zip gcc-b1f45884f676cd5bde92d6babac2e8d44629f5ac.tar.gz gcc-b1f45884f676cd5bde92d6babac2e8d44629f5ac.tar.bz2 |
re PR fortran/25829 ([F03] Asynchronous IO support)
2018-07-25 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* gfortran.texi: Add description of asynchronous I/O.
* trans-decl.c (gfc_finish_var_decl): Treat asynchronous variables
as volatile.
* trans-io.c (gfc_build_io_library_fndecls): Rename st_wait to
st_wait_async and change argument spec from ".X" to ".w".
(gfc_trans_wait): Pass ID argument via reference.
2018-07-25 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* gfortran.dg/f2003_inquire_1.f03: Add write statement.
* gfortran.dg/f2003_io_1.f03: Add wait statement.
2018-07-25 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* Makefile.am: Add async.c to gfor_io_src.
Add async.h to gfor_io_headers.
* Makefile.in: Regenerated.
* gfortran.map: Add _gfortran_st_wait_async.
* io/async.c: New file.
* io/async.h: New file.
* io/close.c: Include async.h.
(st_close): Call async_wait for an asynchronous unit.
* io/file_pos.c (st_backspace): Likewise.
(st_endfile): Likewise.
(st_rewind): Likewise.
(st_flush): Likewise.
* io/inquire.c: Add handling for asynchronous PENDING
and ID arguments.
* io/io.h (st_parameter_dt): Add async bit.
(st_parameter_wait): Correct.
(gfc_unit): Add au pointer.
(st_wait_async): Add prototype.
(transfer_array_inner): Likewise.
(st_write_done_worker): Likewise.
* io/open.c: Include async.h.
(new_unit): Initialize asynchronous unit.
* io/transfer.c (async_opt): New struct.
(wrap_scalar_transfer): New function.
(transfer_integer): Call wrap_scalar_transfer to do the work.
(transfer_real): Likewise.
(transfer_real_write): Likewise.
(transfer_character): Likewise.
(transfer_character_wide): Likewise.
(transfer_complex): Likewise.
(transfer_array_inner): New function.
(transfer_array): Call transfer_array_inner.
(transfer_derived): Call wrap_scalar_transfer.
(data_transfer_init): Check for asynchronous I/O.
Perform a wait operation on any pending asynchronous I/O
if the data transfer is synchronous. Copy PDT and enqueue
thread for data transfer.
(st_read_done_worker): New function.
(st_read_done): Enqueue transfer or call st_read_done_worker.
(st_write_done_worker): New function.
(st_write_done): Enqueue transfer or call st_read_done_worker.
(st_wait): Document as no-op for compatibility reasons.
(st_wait_async): New function.
* io/unit.c (insert_unit): Use macros LOCK, UNLOCK and TRYLOCK;
add NOTE where necessary.
(get_gfc_unit): Likewise.
(init_units): Likewise.
(close_unit_1): Likewise. Call async_close if asynchronous.
(close_unit): Use macros LOCK and UNLOCK.
(finish_last_advance_record): Likewise.
(newunit_alloc): Likewise.
* io/unix.c (find_file): Likewise.
(flush_all_units_1): Likewise.
(flush_all_units): Likewise.
* libgfortran.h (generate_error_common): Add prototype.
* runtime/error.c: Include io.h and async.h.
(generate_error_common): New function.
2018-07-25 Nicolas Koenig <koenigni@gcc.gnu.org>
Thomas Koenig <tkoenig@gcc.gnu.org>
PR fortran/25829
* testsuite/libgomp.fortran/async_io_1.f90: New test.
* testsuite/libgomp.fortran/async_io_2.f90: New test.
* testsuite/libgomp.fortran/async_io_3.f90: New test.
* testsuite/libgomp.fortran/async_io_4.f90: New test.
* testsuite/libgomp.fortran/async_io_5.f90: New test.
* testsuite/libgomp.fortran/async_io_6.f90: New test.
* testsuite/libgomp.fortran/async_io_7.f90: New test.
Co-Authored-By: Thomas Koenig <tkoenig@gcc.gnu.org>
From-SVN: r262978
Diffstat (limited to 'libgfortran/io/async.c')
-rw-r--r-- | libgfortran/io/async.c | 483 |
1 files changed, 483 insertions, 0 deletions
diff --git a/libgfortran/io/async.c b/libgfortran/io/async.c new file mode 100644 index 0000000..b32af16 --- /dev/null +++ b/libgfortran/io/async.c @@ -0,0 +1,483 @@ +/* Copyright (C) 2018 Free Software Foundation, Inc. + Contributed by Nicolas Koenig + + This file is part of the GNU Fortran runtime library (libgfortran). + + Libgfortran is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3, or (at your option) + any later version. + + Libgfortran is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + Under Section 7 of GPL version 3, you are granted additional + permissions described in the GCC Runtime Library Exception, version + 3.1, as published by the Free Software Foundation. + + You should have received a copy of the GNU General Public License and + a copy of the GCC Runtime Library Exception along with this program; + see the files COPYING3 and COPYING.RUNTIME respectively. If not, see + <http://www.gnu.org/licenses/>. */ + +#include "libgfortran.h" + +#define _GTHREAD_USE_COND_INIT_FUNC +#include "../../libgcc/gthr.h" +#include "io.h" +#include "fbuf.h" +#include "format.h" +#include "unix.h" +#include <string.h> +#include <assert.h> + +#include <sys/types.h> + +#include "async.h" + +DEBUG_LINE (__thread const char *aio_prefix = MPREFIX); + +DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;) +DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;) + +/* Current unit for asynchronous I/O. Needed for error reporting. */ + +__thread gfc_unit *thread_unit = NULL; + +/* Queue entry for the asynchronous I/O entry. */ +typedef struct transfer_queue +{ + enum aio_do type; + struct transfer_queue *next; + struct st_parameter_dt *new_pdt; + transfer_args arg; + _Bool has_id; + int read_flag; +} transfer_queue; + +struct error { + st_parameter_dt *dtp; + int id; +}; + +/* Helper function to exchange the old vs. a new PDT. */ + +static void +update_pdt (st_parameter_dt **old, st_parameter_dt *new) { + st_parameter_dt *temp; + NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit)); + temp = *old; + *old = new; + if (temp) + free (temp); +} + +/* Destroy an adv_cond structure. */ + +static void +destroy_adv_cond (struct adv_cond *ac) +{ + T_ERROR (__gthread_mutex_destroy, &ac->lock); + T_ERROR (__gthread_cond_destroy, &ac->signal); +} + +/* Function invoked as start routine for a new asynchronous I/O unit. + Contains the main loop for accepting requests and handling them. */ + +static void * +async_io (void *arg) +{ + DEBUG_LINE (aio_prefix = TPREFIX); + transfer_queue *ctq = NULL, *prev = NULL; + gfc_unit *u = (gfc_unit *) arg; + async_unit *au = u->au; + LOCK (&au->lock); + thread_unit = u; + au->thread = __gthread_self (); + while (true) + { + /* Main loop. At this point, au->lock is always held. */ + WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock); + LOCK (&au->lock); + ctq = au->head; + prev = NULL; + /* Loop over the queue entries until they are finished. */ + while (ctq) + { + if (prev) + free (prev); + prev = ctq; + if (!au->error.has_error) + { + UNLOCK (&au->lock); + + switch (ctq->type) + { + case AIO_WRITE_DONE: + NOTE ("Finalizing write"); + st_write_done_worker (au->pdt); + UNLOCK (&au->io_lock); + break; + + case AIO_READ_DONE: + NOTE ("Finalizing read"); + st_read_done_worker (au->pdt); + UNLOCK (&au->io_lock); + break; + + case AIO_DATA_TRANSFER_INIT: + NOTE ("Data transfer init"); + LOCK (&au->io_lock); + update_pdt (&au->pdt, ctq->new_pdt); + data_transfer_init_worker (au->pdt, ctq->read_flag); + break; + + case AIO_TRANSFER_SCALAR: + NOTE ("Starting scalar transfer"); + ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt, + ctq->arg.scalar.data, + ctq->arg.scalar.i, + ctq->arg.scalar.s1, + ctq->arg.scalar.s2); + break; + + case AIO_TRANSFER_ARRAY: + NOTE ("Starting array transfer"); + NOTE ("ctq->arg.array.desc = %p", + (void *) (ctq->arg.array.desc)); + transfer_array_inner (au->pdt, ctq->arg.array.desc, + ctq->arg.array.kind, + ctq->arg.array.charlen); + free (ctq->arg.array.desc); + break; + + case AIO_CLOSE: + NOTE ("Received AIO_CLOSE"); + goto finish_thread; + + default: + internal_error (NULL, "Invalid queue type"); + break; + } + LOCK (&au->lock); + if (unlikely (au->error.has_error)) + au->error.last_good_id = au->id.low - 1; + } + else + { + if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE) + { + UNLOCK (&au->io_lock); + } + else if (ctq->type == AIO_CLOSE) + { + NOTE ("Received AIO_CLOSE during error condition"); + UNLOCK (&au->lock); + goto finish_thread; + } + } + + NOTE ("Next ctq, current id: %d", au->id.low); + if (ctq->has_id && au->id.waiting == au->id.low++) + SIGNAL (&au->id.done); + + ctq = ctq->next; + } + au->tail = NULL; + au->head = NULL; + au->empty = 1; + UNLOCK (&au->lock); + SIGNAL (&au->emptysignal); + LOCK (&au->lock); + } + finish_thread: + au->tail = NULL; + au->head = NULL; + au->empty = 1; + SIGNAL (&au->emptysignal); + free (ctq); + return NULL; +} + +/* Free an asynchronous unit. */ + +static void +free_async_unit (async_unit *au) +{ + if (au->tail) + internal_error (NULL, "Trying to free nonempty asynchronous unit"); + + destroy_adv_cond (&au->work); + destroy_adv_cond (&au->emptysignal); + destroy_adv_cond (&au->id.done); + T_ERROR (__gthread_mutex_destroy, &au->lock); + free (au); +} + +/* Initialize an adv_cond structure. */ + +static void +init_adv_cond (struct adv_cond *ac) +{ + ac->pending = 0; + __GTHREAD_MUTEX_INIT_FUNCTION (&ac->lock); + __gthread_cond_init_function (&ac->signal); +} + +/* Initialize an asyncronous unit, returning zero on success, + nonzero on failure. It also sets u->au. */ + +void +init_async_unit (gfc_unit *u) +{ + async_unit *au; + if (!__gthread_active_p ()) + { + u->au = NULL; + return; + } + + au = (async_unit *) xmalloc (sizeof (async_unit)); + u->au = au; + init_adv_cond (&au->work); + init_adv_cond (&au->emptysignal); + __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock); + __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock); + LOCK (&au->lock); + T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u); + au->pdt = NULL; + au->head = NULL; + au->tail = NULL; + au->empty = true; + au->id.waiting = -1; + au->id.low = 0; + au->id.high = 0; + au->error.fatal_error = 0; + au->error.has_error = 0; + au->error.last_good_id = 0; + init_adv_cond (&au->id.done); + UNLOCK (&au->lock); +} + +/* Enqueue a transfer statement. */ + +void +enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + tq->arg = *arg; + tq->type = type; + tq->has_id = 0; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* Enqueue an st_write_done or st_read_done which contains an ID. */ + +int +enqueue_done_id (async_unit *au, enum aio_do type) +{ + int ret; + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + + tq->type = type; + tq->has_id = 1; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + ret = au->id.high++; + NOTE ("Enqueue id: %d", ret); + UNLOCK (&au->lock); + SIGNAL (&au->work); + return ret; +} + +/* Enqueue an st_write_done or st_read_done without an ID. */ + +void +enqueue_done (async_unit *au, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + tq->type = type; + tq->has_id = 0; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* Enqueue a CLOSE statement. */ + +void +enqueue_close (async_unit *au) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + + tq->type = AIO_CLOSE; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* The asynchronous unit keeps the currently active PDT around. + This function changes that to the current one. */ + +void +enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag) +{ + st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt)); + transfer_queue *tq = xmalloc (sizeof (transfer_queue)); + + memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt)); + + NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc); + NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK); + tq->next = NULL; + tq->type = AIO_DATA_TRANSFER_INIT; + tq->read_flag = read_flag; + tq->has_id = 0; + tq->new_pdt = new; + LOCK (&au->lock); + + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = 0; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} + +/* Collect the errors that may have happened asynchronously. Return true if + an error has been encountered. */ + +bool +collect_async_errors (st_parameter_common *cmp, async_unit *au) +{ + bool has_error = au->error.has_error; + + if (has_error) + { + if (generate_error_common (cmp, au->error.family, au->error.message)) + { + au->error.has_error = 0; + au->error.cmp = NULL; + } + else + { + /* The program will exit later. */ + au->error.fatal_error = true; + } + } + return has_error; +} + +/* Perform a wait operation on an asynchronous unit with an ID specified, + which means collecting the errors that may have happened asynchronously. + Return true if an error has been encountered. */ + +bool +async_wait_id (st_parameter_common *cmp, async_unit *au, int i) +{ + bool ret; + + if (au == NULL) + return false; + + if (cmp == NULL) + cmp = au->error.cmp; + + if (au->error.has_error) + { + if (i <= au->error.last_good_id) + return false; + + return collect_async_errors (cmp, au); + } + + LOCK (&au->lock); + NOTE ("Waiting for id %d", i); + if (au->id.waiting < i) + au->id.waiting = i; + UNLOCK (&au->lock); + SIGNAL (&(au->work)); + LOCK (&au->lock); + WAIT_SIGNAL_MUTEX (&(au->id.done), + (au->id.low >= au->id.waiting || au->empty), &au->lock); + LOCK (&au->lock); + ret = collect_async_errors (cmp, au); + UNLOCK (&au->lock); + return ret; +} + +/* Perform a wait operation an an asynchronous unit without an ID. */ + +bool +async_wait (st_parameter_common *cmp, async_unit *au) +{ + bool ret; + + if (au == NULL) + return false; + + if (cmp == NULL) + cmp = au->error.cmp; + + SIGNAL (&(au->work)); + LOCK (&(au->lock)); + + if (au->empty) + { + ret = collect_async_errors (cmp, au); + UNLOCK (&au->lock); + return ret; + } + + WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock); + ret = collect_async_errors (cmp, au); + return ret; +} + +/* Close an asynchronous unit. */ + +void +async_close (async_unit *au) +{ + if (au == NULL) + return; + + NOTE ("Closing async unit"); + enqueue_close (au); + T_ERROR (__gthread_join, au->thread, NULL); + free_async_unit (au); +} |