aboutsummaryrefslogtreecommitdiff
path: root/libjava/java/io/PipedInputStream.java
diff options
context:
space:
mode:
Diffstat (limited to 'libjava/java/io/PipedInputStream.java')
-rw-r--r--libjava/java/io/PipedInputStream.java242
1 files changed, 242 insertions, 0 deletions
diff --git a/libjava/java/io/PipedInputStream.java b/libjava/java/io/PipedInputStream.java
new file mode 100644
index 0000000..d8a836c
--- /dev/null
+++ b/libjava/java/io/PipedInputStream.java
@@ -0,0 +1,242 @@
+/* Copyright (C) 1998, 1999 Cygnus Solutions
+
+ This file is part of libgcj.
+
+This software is copyrighted work licensed under the terms of the
+Libgcj License. Please consult the file "LIBGCJ_LICENSE" for
+details. */
+
+package java.io;
+
+/**
+ * @author Warren Levy <warrenl@cygnus.com>
+ * @date October 29, 1998.
+ */
+/* Written using "Java Class Libraries", 2nd edition, ISBN 0-201-31002-3
+ * "The Java Language Specification", ISBN 0-201-63451-1
+ * plus online API docs for JDK 1.2 beta from http://www.javasoft.com.
+ * Status: Believed complete and correct.
+ */
+
+public class PipedInputStream extends InputStream
+{
+ /* The size of the pipe's circular input buffer. */
+ protected static final int PIPE_SIZE = 1024;
+
+ /* The circular buffer into which incoming data is placed. */
+ protected byte[] buffer;
+
+ /* The index in the buffer at which the next byte of data will be stored. */
+ protected int in = -1;
+
+ /* The index in the buffer at which the next byte of data will be read. */
+ protected int out = 0;
+
+ /* The output stream this is connected to; used to check for errors. */
+ private PipedOutputStream po = null;
+
+ /* Flag to indicate that the output stream was closed. */
+ private boolean outClosed = false;
+
+ public PipedInputStream(PipedOutputStream src) throws IOException
+ {
+ buffer = new byte[PIPE_SIZE];
+ connect(src);
+ }
+
+ public PipedInputStream()
+ {
+ buffer = new byte[PIPE_SIZE];
+ }
+
+ public synchronized int available() throws IOException
+ {
+ if (in < 0)
+ return 0;
+
+ if (in > out)
+ return in - out;
+
+ // Buffer has wrapped around.
+ return buffer.length - out + in;
+ }
+
+ public void close() throws IOException
+ {
+ buffer = null;
+ po = null;
+
+ // Mark as empty for available method.
+ in = -1;
+ }
+
+ public void connect(PipedOutputStream src) throws IOException
+ {
+ if (buffer == null)
+ throw new IOException("pipe closed");
+
+ if (po != null)
+ if (po == src)
+ return;
+ else
+ throw new IOException("pipe already connected");
+
+ po = src;
+ try
+ {
+ src.connect(this);
+ }
+ catch (IOException ex)
+ {
+ po = null;
+ throw ex;
+ }
+ }
+
+ public synchronized int read() throws IOException
+ {
+ // TBD: Spec says to throw IOException if thread writing to output stream
+ // died. What does this really mean? Theoretically, multiple threads
+ // could be writing to this object. Do you track the first, last, or
+ // all of them?
+ if (po == null)
+ if (buffer == null)
+ throw new IOException("pipe closed");
+ else
+ throw new IOException("pipe unconnected");
+
+ // Block until there's something to read or output stream was closed.
+ while (in < 0)
+ try
+ {
+ if (outClosed)
+ return -1;
+ wait();
+ }
+ catch (InterruptedException ex)
+ {
+ throw new InterruptedIOException();
+ }
+
+ // Let other threads know there's room to write now.
+ notifyAll();
+
+ int retval = buffer[out++] & 0xFF;
+
+ // Wrap back around if at end of the array.
+ if (out >= buffer.length)
+ out = 0;
+
+ // When the last byte available is read, mark the buffer as empty.
+ if (out == in)
+ {
+ in = -1;
+ out = 0;
+ }
+
+ return retval;
+ }
+
+ public synchronized int read(byte[] b, int off, int len) throws IOException
+ {
+ if (off < 0 || len < 0 || off + len > b.length)
+ throw new ArrayIndexOutOfBoundsException();
+
+ // TBD: Spec says to throw IOException if thread writing to output stream
+ // died. What does this really mean? Theoretically, multiple threads
+ // could be writing to this object. Do you track the first, last, or
+ // all of them?
+ if (po == null)
+ if (buffer == null)
+ throw new IOException("pipe closed");
+ else
+ throw new IOException("pipe unconnected");
+
+ // Block until there's something to read or output stream was closed.
+ while (in < 0)
+ try
+ {
+ if (outClosed)
+ return -1;
+ wait();
+ }
+ catch (InterruptedException ex)
+ {
+ throw new InterruptedIOException();
+ }
+
+ // Let other threads know there's room to write now.
+ notifyAll();
+
+ int numRead;
+ len = Math.min(len, available());
+ if (in <= out && len >= (numRead = buffer.length - out))
+ {
+ // Buffer has wrapped around; need to copy in 2 steps.
+ // Copy to the end of the buffer first; second copy may be of zero
+ // bytes but that is ok. Doing it that way saves having to check
+ // later if 'out' has grown to buffer.length.
+ System.arraycopy(buffer, out, b, off, numRead);
+ len -= numRead;
+ off += numRead;
+ out = 0;
+ }
+ else
+ numRead = 0;
+
+ System.arraycopy(buffer, out, b, off, len);
+ numRead += len;
+ out += len;
+
+ // When the last byte available is read, mark the buffer as empty.
+ if (out == in)
+ {
+ in = -1;
+ out = 0;
+ }
+
+ return numRead;
+ }
+
+ protected synchronized void receive(int b) throws IOException
+ {
+ if (buffer == null)
+ throw new IOException("pipe closed");
+
+ // TBD: Spec says to throw IOException if thread reading from input stream
+ // died. What does this really mean? Theoretically, multiple threads
+ // could be reading to this object (why else would 'read' be synchronized?).
+ // Do you track the first, last, or all of them?
+
+ if (b < 0)
+ {
+ outClosed = true;
+ notifyAll(); // In case someone was blocked in a read.
+ return;
+ }
+
+ // Block until there's room in the pipe.
+ while (in == out)
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException ex)
+ {
+ throw new InterruptedIOException();
+ }
+
+ // Check if buffer is empty.
+ if (in < 0)
+ in = 0;
+
+ buffer[in++] = (byte) b;
+
+ // Wrap back around if at end of the array.
+ if (in >= buffer.length)
+ in = 0;
+
+ // Let other threads know there's something to read when this returns.
+ notifyAll();
+ }
+}