/* Copyright (C) 1998, 1999  Cygnus Solutions

   This file is part of libgcj.

This software is copyrighted work licensed under the terms of the
Libgcj License.  Please consult the file "LIBGCJ_LICENSE" for
details.  */
 
package java.io;

/**
 * @author Warren Levy <warrenl@cygnus.com>
 * @date October 29, 1998.  
 */
/* Written using "Java Class Libraries", 2nd edition, ISBN 0-201-31002-3
 * "The Java Language Specification", ISBN 0-201-63451-1
 * plus online API docs for JDK 1.2 beta from http://www.javasoft.com.
 * Status:  Believed complete and correct.
 */
 
public class PipedInputStream extends InputStream
{
  /* The size of the pipe's circular input buffer. */
  protected static final int PIPE_SIZE = 1024;

  /* The circular buffer into which incoming data is placed. */
  protected byte[] buffer;

  /* The index in the buffer at which the next byte of data will be stored. */
  protected int in = -1;

  /* The index in the buffer at which the next byte of data will be read. */
  protected int out = 0;

  /* The output stream this is connected to; used to check for errors. */
  private PipedOutputStream po = null;

  /* Flag to indicate that the output stream was closed. */
  private boolean outClosed = false;

  public PipedInputStream(PipedOutputStream src) throws IOException
  {
    buffer = new byte[PIPE_SIZE];
    connect(src);
  }

  public PipedInputStream()
  {
    buffer = new byte[PIPE_SIZE];
  }

  public synchronized int available() throws IOException
  {
    if (in < 0)
      return 0;

    if (in > out)
      return in - out;

    // Buffer has wrapped around.
    return buffer.length - out + in;
  }

  public void close() throws IOException
  {
    buffer = null;
    po = null;

    // Mark as empty for available method.
    in = -1;
  }

  public void connect(PipedOutputStream src) throws IOException
  {
    if (buffer == null)
      throw new IOException("pipe closed");

    if (po != null)
      if (po == src)
	return;
      else
        throw new IOException("pipe already connected");

    po = src;
    try
    {
      src.connect(this);
    }
    catch (IOException ex)
    {
      po = null;
      throw ex;
    }
  }

  public synchronized int read() throws IOException
  {
    // TBD: Spec says to throw IOException if thread writing to output stream
    // died.  What does this really mean?  Theoretically, multiple threads
    // could be writing to this object.  Do you track the first, last, or
    // all of them?
    if (po == null)
      if (buffer == null)
        throw new IOException("pipe closed");
      else
        throw new IOException("pipe unconnected");

    // Block until there's something to read or output stream was closed.
    while (in < 0)
      try
	{
	  if (outClosed)
	    return -1;
	  wait();
	}
      catch (InterruptedException ex)
       {
	 throw new InterruptedIOException();
       }

    // Let other threads know there's room to write now.
    notifyAll();

    int retval = buffer[out++] & 0xFF;

    // Wrap back around if at end of the array.
    if (out >= buffer.length)
      out = 0;

    // When the last byte available is read, mark the buffer as empty.
    if (out == in)
      {
        in = -1;
	out = 0;
      }
      
    return retval;
  }

  public synchronized int read(byte[] b, int off, int len) throws IOException
  {
    if (off < 0 || len < 0 || off + len > b.length)
      throw new ArrayIndexOutOfBoundsException();

    // TBD: Spec says to throw IOException if thread writing to output stream
    // died.  What does this really mean?  Theoretically, multiple threads
    // could be writing to this object.  Do you track the first, last, or
    // all of them?
    if (po == null)
      if (buffer == null)
        throw new IOException("pipe closed");
      else
        throw new IOException("pipe unconnected");

    // Block until there's something to read or output stream was closed.
    while (in < 0)
      try
	{
	  if (outClosed)
	    return -1;
	  wait();
	}
      catch (InterruptedException ex)
       {
	 throw new InterruptedIOException();
       }

    // Let other threads know there's room to write now.
    notifyAll();

    int numRead;
    len = Math.min(len, available());
    if (in <= out && len >= (numRead = buffer.length - out))
      {
	// Buffer has wrapped around; need to copy in 2 steps.
	// Copy to the end of the buffer first; second copy may be of zero
	// bytes but that is ok.  Doing it that way saves having to check
	// later if 'out' has grown to buffer.length.
        System.arraycopy(buffer, out, b, off, numRead);
	len -= numRead;
	off += numRead;
	out = 0;
      }
    else
      numRead = 0;

    System.arraycopy(buffer, out, b, off, len);
    numRead += len;
    out += len;

    // When the last byte available is read, mark the buffer as empty.
    if (out == in)
      {
        in = -1;
	out = 0;
      }

    return numRead;
  }

  protected synchronized void receive(int b) throws IOException
  {
    if (buffer == null)
      throw new IOException("pipe closed");

    // TBD: Spec says to throw IOException if thread reading from input stream
    // died.  What does this really mean?  Theoretically, multiple threads
    // could be reading to this object (why else would 'read' be synchronized?).
    // Do you track the first, last, or all of them?

    if (b < 0)
      {
        outClosed = true;
	notifyAll();	// In case someone was blocked in a read.
	return;
      }

    // Block until there's room in the pipe.
    while (in == out)
      try
	{
	  wait();
	}
      catch (InterruptedException ex)
       {
	 throw new InterruptedIOException();
       }

    // Check if buffer is empty.
    if (in < 0)
      in = 0;

    buffer[in++] = (byte) b;

    // Wrap back around if at end of the array.
    if (in >= buffer.length)
      in = 0;

    // Let other threads know there's something to read when this returns.
    notifyAll();
  }
}