// ************************************************************************ // $Id: WaitFreeReadQueue.java 593 2005-07-21 23:31:26Z mdeters $ // ************************************************************************ // // jRate // // Copyright (C) 2001-2005 by Angelo Corsaro. // // All Rights Reserved. // // Permission to use, copy, modify, and distribute this software and // its documentation for any purpose is hereby granted without fee, // provided that the above copyright notice appear in all copies and // that both that copyright notice and this permission notice appear // in supporting documentation. I don't make any representations // about the suitability of this software for any purpose. It is // provided "as is" without express or implied warranty. // // // ************************************************************************* // // ************************************************************************* package javax.realtime; /** * A queue that provides unsynchronized nonblocking read() and * synchronized blocking write(). * * @author Morgan Deters * @version 1.0 * @since 0.3.8 */ public class WaitFreeReadQueue { // Implementation notes // // We provide blocking write() and nonblocking read(). Problem // is, we also have to support other operations, like clear(), // size(), etc. So we have to have extra bookkeeping, but then // that bookkeeping needs to be updated atomically, but we can't // synch access to the books without breaking the nonblocking // read()! // // We don't have to worry about conflicting writers (we synch // that), and we don't have to worry about conflicting readers // (the user is supposed to synch that), but we *do* have to worry // about a reader and a writer conflicting. // // We extend the RTSJ requirement that users do their own // reader-synchronization to include the clear(), size(), // isEmpty(), and isFull(). That is, these four operations are // viewed as *reading* from the queue, and shouldn't be // concurrently executed with a read() operation by the user. A // writer thread should probably *not* call these methods, and in // particular if a writer wants to check for the isFull() // condition before attempting a write(), the WaitFreeDequeue // should be employed for the purpose instead, as it has a // non-blocking write() operation. // // We maintain a standard circular buffer of N slots with a read // head (pointing to the next-to-be-read element) and a write head // (pointing to the next-to-be-written slot). Therefore the // number of elements currently in the queue is: // // write - read (mod N) // // [[ NOTE that I'm talking about *real* mod here, NOT the // remainder operator (%). ]] // // The queue-full condition is: // // write == read // // The queue-empty condition is also: // // write == read // // A boolean isEmpty flag is kept to distinguish the two. It is // initially true. Only the writer code sets it to false (and // only does so when it's true), and only the reader code sets it // to true (and only does so when it's false). Therefore the // reader code can distinguish the difference. // queue buffer private final Object[] buffer; // queue capacity private final int capacity; // queue empty and full conditions private final Object empty, full; // write lock private final Object writing; // whether the reader wants notification or not (via waitForData()) private boolean notify; // read and write heads in the buffer private int readHead, writeHead; // true if queue is empty; false otherwise private boolean isEmpty; /** * Constructs a new WaitFreeReadQueue. * * @param writer the writing thread * @param reader the reading thread * @param maximum the capacity of the queue * @param memory the {@link MemoryArea} in which to allocate the * queue's buffer; if null, then the current memory area is * used * @param notify whether to notify the reader when data is added * [unclear in spec, so unimplemented in jRate] */ public WaitFreeReadQueue(Thread writer, Thread reader, int maximum, MemoryArea memory, boolean notify) { if(writer != null && !(writer instanceof Thread) && !(writer instanceof Schedulable)) throw new IllegalArgumentException("writer must be null, Thread, or Schedulable"); if(reader != null && !(reader instanceof Thread) && !(reader instanceof Schedulable)) throw new IllegalArgumentException("reader must be null, Thread, or Schedulable"); if(maximum <= 0) throw new IllegalArgumentException("maximum must be > 0"); if(memory == null) memory = RealtimeThread.getCurrentMemoryArea(); this.capacity = maximum; try { this.buffer = (Object[]) memory.newArray(Object.class, maximum); this.empty = memory.newInstance(Object.class); this.full = memory.newInstance(Object.class); this.writing = memory.newInstance(Object.class); } catch(IllegalAccessException e) { throw new InaccessibleAreaException(); } catch(InstantiationException e) { throw new InternalError(); } this.readHead = 0; this.writeHead = 0; this.isEmpty = true; this.notify = notify; } /** * Constructs a new {@link WaitFreeReadQueue}. * * @param writer the writing thread * @param reader the reading thread * @param maximum the capacity of the queue * @param memory the {@link MemoryArea} in which to allocate the * queue's buffer; if null, then the current memory area is * used */ public WaitFreeReadQueue(Thread writer, Thread reader, int maximum, MemoryArea memory) { this(writer, reader, maximum, memory, false); } public WaitFreeReadQueue(int maximum, MemoryArea memory, boolean notify) { this(null, null, maximum, memory, notify); } public WaitFreeReadQueue(int maximum, boolean notify) { this(null, null, maximum, null, notify); } /** * Empties the queue. This method is unsynchronized; if it could * be called at the same time as another unsynchronized read * operation on this object, the callers need to synchronize. */ public void clear() { boolean release = isFull(); // FIXME: concurrency issue between readHead and isEmpty if(isEmpty == false) isEmpty = true; readHead = writeHead; if(release) noLongerFull(); } /** * Checks if the queue is empty. This method is unsynchronized; * if it could be called at the same time as another * unsynchronized read operation on this object, the callers need * to synchronize. * * @return true if the queue is empty, false otherwise */ public boolean isEmpty() { return isEmpty; } /** * Checks if the queue is full. This method is unsynchronized; if * it could be called at the same time as another unsynchronized * read operation on this object, the callers need to synchronize. * * @return true if the queue is full, false otherwise */ public boolean isFull() { // FIXME: writeHead and isEmpty concurrent access issue? return writeHead == readHead && !isEmpty; } /** * Returns the number of elements in the queue. This method is * unsynchronized; if it could be called at the same time as * another unsynchronized read operation on this object, the * callers need to synchronize. * * @return the number of elements in the queue */ public int size() { int span = writeHead - readHead; if(span == 0) return isEmpty ? 0 : capacity; // with full and empty conditions taken care of, we return // span (mod capacity) return (span < 0) ? span + capacity : span; } /** * An unsynchronized and nonblocking read operation. This method * is unsynchronized; if it could be called at the same time as * another unsynchronized read operation on this object, the * callers need to synchronize. * * @return the object removed from the queue, or null if the * queue is empty */ public Object read() { Object retval = buffer[readHead]; boolean release = isFull(); readHead = (readHead + 1) % capacity; // set isEmpty // unacceptable; could block if writer has full lock if(release) noLongerFull(); return retval; } /** * If the queue is empty, this method doesn't return until the * writing thread inserts an element. Otherwise, this method * returns immediately. */ public void waitForData() { if(isEmpty()) { synchronized(empty) { while(isEmpty()) { try { empty.wait(); } catch(InterruptedException e) {} } } } } /** * A synchronized and blocking write operation. * * @param object the object to write to the queue * @return true * @throws MemoryScopeException if the object is in an * incompatible memory area */ public boolean write(Object object) throws MemoryScopeException { synchronized(writing) { if(isFull()) { synchronized(full) { while(isFull()) { try { full.wait(); } catch(InterruptedException e) {} } } } try { buffer[writeHead] = object; } catch(IllegalAssignmentError e) { throw new MemoryScopeException(); } writeHead = (writeHead + 1) % capacity; noLongerEmpty(); } return true; } /** * Signal a blocked writer that the queue is no longer full. */ private void noLongerFull() { synchronized(full) { full.notifyAll(); } } /** * Signal a blocked thread that the queue is no longer empty. */ private void noLongerEmpty() { synchronized(empty) { empty.notifyAll(); } } }