aboutsummaryrefslogtreecommitdiff
path: root/libgfortran/io/async.h
diff options
context:
space:
mode:
authorNicolas Koenig <koenigni@gcc.gnu.org>2018-08-21 18:48:59 +0000
committerNicolas Koenig <koenigni@gcc.gnu.org>2018-08-21 18:48:59 +0000
commit2b4c90656132abb8b8ad155d345c7d4fbf1687c9 (patch)
tree15008b4ee6a44100a4cee36683749d369449af7e /libgfortran/io/async.h
parent774fb6c4eb205eaf9d3b6667e7de9c90cc1784ad (diff)
downloadgcc-2b4c90656132abb8b8ad155d345c7d4fbf1687c9.zip
gcc-2b4c90656132abb8b8ad155d345c7d4fbf1687c9.tar.gz
gcc-2b4c90656132abb8b8ad155d345c7d4fbf1687c9.tar.bz2
re PR fortran/25829 ([F03] Asynchronous IO support)
2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org> Thomas Koenig <tkoenig@gcc.gnu.org> PR fortran/25829 * gfortran.texi: Add description of asynchronous I/O. * trans-decl.c (gfc_finish_var_decl): Treat asynchronous variables as volatile. * trans-io.c (gfc_build_io_library_fndecls): Rename st_wait to st_wait_async and change argument spec from ".X" to ".w". (gfc_trans_wait): Pass ID argument via reference. 2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org> Thomas Koenig <tkoenig@gcc.gnu.org> PR fortran/25829 * gfortran.dg/f2003_inquire_1.f03: Add write statement. * gfortran.dg/f2003_io_1.f03: Add wait statement. 2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org> Thomas Koenig <tkoenig@gcc.gnu.org> PR fortran/25829 * Makefile.am: Add async.c to gfor_io_src. Add async.h to gfor_io_headers. * Makefile.in: Regenerated. * gfortran.map: Add _gfortran_st_wait_async. * io/async.c: New file. * io/async.h: New file. * io/close.c: Include async.h. (st_close): Call async_wait for an asynchronous unit. * io/file_pos.c (st_backspace): Likewise. (st_endfile): Likewise. (st_rewind): Likewise. (st_flush): Likewise. * io/inquire.c: Add handling for asynchronous PENDING and ID arguments. * io/io.h (st_parameter_dt): Add async bit. (st_parameter_wait): Correct. (gfc_unit): Add au pointer. (st_wait_async): Add prototype. (transfer_array_inner): Likewise. (st_write_done_worker): Likewise. * io/open.c: Include async.h. (new_unit): Initialize asynchronous unit. * io/transfer.c (async_opt): New struct. (wrap_scalar_transfer): New function. (transfer_integer): Call wrap_scalar_transfer to do the work. (transfer_real): Likewise. (transfer_real_write): Likewise. (transfer_character): Likewise. (transfer_character_wide): Likewise. (transfer_complex): Likewise. (transfer_array_inner): New function. (transfer_array): Call transfer_array_inner. (transfer_derived): Call wrap_scalar_transfer. (data_transfer_init): Check for asynchronous I/O. Perform a wait operation on any pending asynchronous I/O if the data transfer is synchronous. Copy PDT and enqueue thread for data transfer. (st_read_done_worker): New function. (st_read_done): Enqueue transfer or call st_read_done_worker. (st_write_done_worker): New function. (st_write_done): Enqueue transfer or call st_read_done_worker. (st_wait): Document as no-op for compatibility reasons. (st_wait_async): New function. * io/unit.c (insert_unit): Use macros LOCK, UNLOCK and TRYLOCK; add NOTE where necessary. (get_gfc_unit): Likewise. (init_units): Likewise. (close_unit_1): Likewise. Call async_close if asynchronous. (close_unit): Use macros LOCK and UNLOCK. (finish_last_advance_record): Likewise. (newunit_alloc): Likewise. * io/unix.c (find_file): Likewise. (flush_all_units_1): Likewise. (flush_all_units): Likewise. * libgfortran.h (generate_error_common): Add prototype. * runtime/error.c: Include io.h and async.h. (generate_error_common): New function. 2018-08-21 Nicolas Koenig <koenigni@gcc.gnu.org> Thomas Koenig <tkoenig@gcc.gnu.org> PR fortran/25829 * testsuite/libgomp.fortran/async_io_1.f90: New test. * testsuite/libgomp.fortran/async_io_2.f90: New test. * testsuite/libgomp.fortran/async_io_3.f90: New test. * testsuite/libgomp.fortran/async_io_4.f90: New test. * testsuite/libgomp.fortran/async_io_5.f90: New test. * testsuite/libgomp.fortran/async_io_6.f90: New test. * testsuite/libgomp.fortran/async_io_7.f90: New test. Co-Authored-By: Thomas Koenig <tkoenig@gcc.gnu.org> From-SVN: r263750
Diffstat (limited to 'libgfortran/io/async.h')
-rw-r--r--libgfortran/io/async.h400
1 files changed, 400 insertions, 0 deletions
diff --git a/libgfortran/io/async.h b/libgfortran/io/async.h
new file mode 100644
index 0000000..7dfbc8b
--- /dev/null
+++ b/libgfortran/io/async.h
@@ -0,0 +1,400 @@
+/* Copyright (C) 2018 Free Software Foundation, Inc.
+ Contributed by Nicolas Koenig
+
+ This file is part of the GNU Fortran runtime library (libgfortran).
+
+ Libgfortran is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3, or (at your option)
+ any later version.
+
+ Libgfortran is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ Under Section 7 of GPL version 3, you are granted additional
+ permissions described in the GCC Runtime Library Exception, version
+ 3.1, as published by the Free Software Foundation.
+
+ You should have received a copy of the GNU General Public License and
+ a copy of the GCC Runtime Library Exception along with this program;
+ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
+ <http://www.gnu.org/licenses/>. */
+
+#ifndef ASYNC_H
+#define ASYNC_H
+
+/* Async I/O will not work on targets which do not support
+ __gthread_cond_t and __gthread_equal / __gthread_self. Check
+ this. */
+
+#if defined(__GTHREAD_HAS_COND) && defined(__GTHREADS_CXX0X)
+#define ASYNC_IO 1
+#else
+#define ASYNC_IO 0
+#endif
+
+/* Defining DEBUG_ASYNC will enable somewhat verbose debugging
+ output for async I/O. */
+
+#define DEBUG_ASYNC
+#undef DEBUG_ASYNC
+
+#ifdef DEBUG_ASYNC
+
+/* Define this if you want to use ANSI color escape sequences in your
+ debugging output. */
+
+#define DEBUG_COLOR
+
+#ifdef DEBUG_COLOR
+#define MPREFIX "\033[30;46mM:\033[0m "
+#define TPREFIX "\033[37;44mT:\033[0m "
+#define RPREFIX "\033[37;41mR:\033[0m "
+#define DEBUG_RED "\033[31m"
+#define DEBUG_ORANGE "\033[33m"
+#define DEBUG_GREEN "\033[32m"
+#define DEBUG_DARKRED "\033[31;2m"
+#define DEBUG_PURPLE "\033[35m"
+#define DEBUG_NORM "\033[0m"
+#define DEBUG_REVERSE_RED "\033[41;37m"
+#define DEBUG_BLUE "\033[34m"
+
+#else
+
+#define MPREFIX "M: "
+#define TPREFIX "T: "
+#define RPREFIX ""
+#define DEBUG_RED ""
+#define DEBUG_ORANGE ""
+#define DEBUG_GREEN ""
+#define DEBUG_DARKRED ""
+#define DEBUG_PURPLE ""
+#define DEBUG_NORM ""
+#define DEBUG_REVERSE_RED ""
+#define DEBUG_BLUE ""
+
+#endif
+
+#define DEBUG_PRINTF(...) fprintf (stderr,__VA_ARGS__)
+
+#define IN_DEBUG_QUEUE(mutex) ({ \
+ __label__ end; \
+ aio_lock_debug *curr = aio_debug_head; \
+ while (curr) { \
+ if (curr->m == mutex) { \
+ goto end; \
+ } \
+ curr = curr->next; \
+ } \
+ end:; \
+ curr; \
+ })
+
+#define TAIL_DEBUG_QUEUE ({ \
+ aio_lock_debug *curr = aio_debug_head; \
+ while (curr && curr->next) { \
+ curr = curr->next; \
+ } \
+ curr; \
+ })
+
+#define CHECK_LOCK(mutex, status) do { \
+ aio_lock_debug *curr; \
+ INTERN_LOCK (&debug_queue_lock); \
+ if (__gthread_mutex_trylock (mutex)) { \
+ if ((curr = IN_DEBUG_QUEUE (mutex))) { \
+ sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line); \
+ } else \
+ sprintf (status, DEBUG_RED "unknown" DEBUG_NORM); \
+ } \
+ else { \
+ __gthread_mutex_unlock (mutex); \
+ sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM); \
+ } \
+ INTERN_UNLOCK (&debug_queue_lock); \
+ }while (0)
+
+#define T_ERROR(func, ...) do { \
+ int t_error_temp; \
+ t_error_temp = func(__VA_ARGS__); \
+ if (t_error_temp) \
+ ERROR (t_error_temp, "args: " #__VA_ARGS__ "\n"); \
+ } while (0)
+
+#define NOTE(str, ...) do{ \
+ char note_str[200]; \
+ sprintf (note_str, "%s" DEBUG_PURPLE "NOTE: " DEBUG_NORM str, aio_prefix, ##__VA_ARGS__); \
+ DEBUG_PRINTF ("%-90s %20s():%-5d\n", note_str, __FUNCTION__, __LINE__); \
+ }while (0);
+
+#define ERROR(errnum, str, ...) do{ \
+ char note_str[200]; \
+ sprintf (note_str, "%s" DEBUG_REVERSE_RED "ERROR:" DEBUG_NORM " [%d] " str, aio_prefix, \
+ errnum, ##__VA_ARGS__); \
+ DEBUG_PRINTF ("%-68s %s():%-5d\n", note_str, __FUNCTION__, __LINE__); \
+ }while (0)
+
+#define MUTEX_DEBUG_ADD(mutex) do { \
+ aio_lock_debug *n; \
+ n = malloc (sizeof(aio_lock_debug)); \
+ n->prev = TAIL_DEBUG_QUEUE; \
+ if (n->prev) \
+ n->prev->next = n; \
+ n->next = NULL; \
+ n->line = __LINE__; \
+ n->func = __FUNCTION__; \
+ n->m = mutex; \
+ if (!aio_debug_head) { \
+ aio_debug_head = n; \
+ } \
+ } while (0)
+
+#define UNLOCK(mutex) do { \
+ aio_lock_debug *curr; \
+ DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_GREEN "UNLOCK: " DEBUG_NORM #mutex, \
+ __FUNCTION__, __LINE__, (void *) mutex); \
+ INTERN_LOCK (&debug_queue_lock); \
+ curr = IN_DEBUG_QUEUE (mutex); \
+ if (curr) \
+ { \
+ if (curr->prev) \
+ curr->prev->next = curr->next; \
+ if (curr->next) { \
+ curr->next->prev = curr->prev; \
+ if (curr == aio_debug_head) \
+ aio_debug_head = curr->next; \
+ } else { \
+ if (curr == aio_debug_head) \
+ aio_debug_head = NULL; \
+ } \
+ free (curr); \
+ } \
+ INTERN_UNLOCK (&debug_queue_lock); \
+ INTERN_UNLOCK (mutex); \
+ }while (0)
+
+#define TRYLOCK(mutex) ({ \
+ char status[200]; \
+ int res; \
+ aio_lock_debug *curr; \
+ res = __gthread_mutex_trylock (mutex); \
+ INTERN_LOCK (&debug_queue_lock); \
+ if (res) { \
+ if ((curr = IN_DEBUG_QUEUE (mutex))) { \
+ sprintf (status, DEBUG_RED "%s():%d" DEBUG_NORM, curr->func, curr->line); \
+ } else \
+ sprintf (status, DEBUG_RED "unknown" DEBUG_NORM); \
+ } \
+ else { \
+ sprintf (status, DEBUG_GREEN "unlocked" DEBUG_NORM); \
+ MUTEX_DEBUG_ADD (mutex); \
+ } \
+ DEBUG_PRINTF ("%s%-44s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \
+ DEBUG_DARKRED "TRYLOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, \
+ (void *) mutex); \
+ INTERN_UNLOCK (&debug_queue_lock); \
+ res; \
+ })
+
+#define LOCK(mutex) do { \
+ char status[200]; \
+ CHECK_LOCK (mutex, status); \
+ DEBUG_PRINTF ("%s%-42s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \
+ DEBUG_RED "LOCK: " DEBUG_NORM #mutex, status, __FUNCTION__, __LINE__, (void *) mutex); \
+ INTERN_LOCK (mutex); \
+ INTERN_LOCK (&debug_queue_lock); \
+ MUTEX_DEBUG_ADD (mutex); \
+ INTERN_UNLOCK (&debug_queue_lock); \
+ DEBUG_PRINTF ("%s" DEBUG_RED "ACQ:" DEBUG_NORM " %-30s %78p\n", aio_prefix, #mutex, mutex); \
+ } while (0)
+
+#define DEBUG_LINE(...) __VA_ARGS__
+
+#else
+#define DEBUG_PRINTF(...) {}
+#define CHECK_LOCK(au, mutex, status) {}
+#define NOTE(str, ...) {}
+#define DEBUG_LINE(...)
+#define T_ERROR(func, ...) func(__VA_ARGS__)
+#define LOCK(mutex) INTERN_LOCK (mutex)
+#define UNLOCK(mutex) INTERN_UNLOCK (mutex)
+#define TRYLOCK(mutex) (__gthread_mutex_trylock (mutex))
+#endif
+
+#define INTERN_LOCK(mutex) T_ERROR (__gthread_mutex_lock, mutex);
+
+#define INTERN_UNLOCK(mutex) T_ERROR (__gthread_mutex_unlock, mutex);
+
+#if ASYNC_IO
+
+#define SIGNAL(advcond) do{ \
+ INTERN_LOCK (&(advcond)->lock); \
+ (advcond)->pending = 1; \
+ DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE "SIGNAL: " DEBUG_NORM \
+ #advcond, __FUNCTION__, __LINE__, (void *) advcond); \
+ T_ERROR (__gthread_cond_broadcast, &(advcond)->signal); \
+ INTERN_UNLOCK (&(advcond)->lock); \
+ } while (0)
+
+#define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{ \
+ __label__ finish; \
+ INTERN_LOCK (&((advcond)->lock)); \
+ DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_BLUE "WAITING: " DEBUG_NORM \
+ #advcond, __FUNCTION__, __LINE__, (void *) advcond); \
+ if ((advcond)->pending || (condition)){ \
+ UNLOCK (mutex); \
+ goto finish; \
+ } \
+ UNLOCK (mutex); \
+ while (!__gthread_cond_wait(&(advcond)->signal, &(advcond)->lock)) { \
+ { int cond; \
+ LOCK (mutex); cond = condition; UNLOCK (mutex); \
+ if (cond){ \
+ DEBUG_PRINTF ("%s%-75s %20s():%-5d %18p\n", aio_prefix, DEBUG_ORANGE "REC: " DEBUG_NORM \
+ #advcond, __FUNCTION__, __LINE__, (void *)advcond); \
+ break; \
+ } \
+ } \
+ } \
+ finish: \
+ (advcond)->pending = 0; \
+ INTERN_UNLOCK (&((advcond)->lock)); \
+ } while (0)
+
+#define REVOKE_SIGNAL(advcond) do{ \
+ INTERN_LOCK (&(advcond)->lock); \
+ (advcond)->pending = 0; \
+ INTERN_UNLOCK (&(advcond)->lock); \
+ } while (0)
+
+#else
+
+#define SIGNAL(advcond) do{} while(0)
+#define WAIT_SIGNAL_MUTEX(advcond, condition, mutex) do{} while(0)
+#define REVOKE_SIGNAL(advcond) do{} while(0)
+
+#endif
+
+#if ASYNC_IO
+DEBUG_LINE (extern __thread const char *aio_prefix);
+
+DEBUG_LINE (typedef struct aio_lock_debug{
+ __gthread_mutex_t *m;
+ int line;
+ const char *func;
+ struct aio_lock_debug *next;
+ struct aio_lock_debug *prev;
+} aio_lock_debug;)
+
+DEBUG_LINE (extern aio_lock_debug *aio_debug_head;)
+DEBUG_LINE (extern __gthread_mutex_t debug_queue_lock;)
+
+/* Thread - local storage of the current unit we are looking at. Needed for
+ error reporting. */
+
+extern __thread gfc_unit *thread_unit;
+#endif
+
+enum aio_do {
+ AIO_INVALID = 0,
+ AIO_DATA_TRANSFER_INIT,
+ AIO_TRANSFER_SCALAR,
+ AIO_TRANSFER_ARRAY,
+ AIO_WRITE_DONE,
+ AIO_READ_DONE,
+ AIO_CLOSE
+};
+
+typedef union transfer_args
+{
+ struct
+ {
+ void (*transfer) (struct st_parameter_dt *, bt, void *, int, size_t, size_t);
+ bt arg_bt;
+ void *data;
+ int i;
+ size_t s1;
+ size_t s2;
+ } scalar;
+ struct
+ {
+ gfc_array_char *desc;
+ int kind;
+ gfc_charlen_type charlen;
+ } array;
+} transfer_args;
+
+struct adv_cond
+{
+ int pending;
+ __gthread_mutex_t lock;
+ __gthread_cond_t signal;
+};
+
+typedef struct async_unit
+{
+ pthread_mutex_t lock; /* Lock for manipulating the queue structure. */
+ pthread_mutex_t io_lock; /* Lock for doing actual I/O. */
+ struct adv_cond work;
+ struct adv_cond emptysignal;
+ struct st_parameter_dt *pdt;
+ pthread_t thread;
+ struct transfer_queue *head;
+ struct transfer_queue *tail;
+ struct
+ {
+ int waiting;
+ int low;
+ int high;
+ struct adv_cond done;
+ } id;
+
+ bool empty;
+
+ struct {
+ const char *message;
+ st_parameter_common *cmp;
+ bool has_error;
+ int last_good_id;
+ int family;
+ bool fatal_error;
+ } error;
+
+} async_unit;
+
+void init_async_unit (gfc_unit *);
+internal_proto (init_async_unit);
+
+bool async_wait (st_parameter_common *, async_unit *);
+internal_proto (async_wait);
+
+bool async_wait_id (st_parameter_common *, async_unit *, int);
+internal_proto (async_wait_id);
+
+bool collect_async_errors (st_parameter_common *, async_unit *);
+internal_proto (collect_async_errors);
+
+void async_close (async_unit *);
+internal_proto (async_close);
+
+void enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do);
+internal_proto (enqueue_transfer);
+
+void enqueue_done (async_unit *, enum aio_do type);
+internal_proto (enqueue_done);
+
+int enqueue_done_id (async_unit *, enum aio_do type);
+internal_proto (enqueue_done_id);
+
+void enqueue_init (async_unit *);
+internal_proto (enqueue_init);
+
+void enqueue_data_transfer_init (async_unit *, st_parameter_dt *, int);
+internal_proto (enqueue_data_transfer_init);
+
+void enqueue_close (async_unit *);
+internal_proto (enqueue_close);
+
+#endif