#1
  1. No Profile Picture
    Registered User
    Devshed Newbie (0 - 499 posts)

    Join Date
    Aug 2011
    Posts
    7
    Rep Power
    0

    Threads, blocking I/O and excutor pattern question


    I'm doing some asynchronous serial port I/O. I have a good handle on the low level interface that fills the buffer. On top of that I've built a Ring Buffer that blocks while waiting for data or a specific data sequence (protocol).

    Now I would like to design a timeout for the thread that is waiting for I/O.

    Pseudo code:
    A--Call thread routine to look for Pattern (ACK)
    B--Routine calls thread that waits for Pattern (ACK) to appear in buffer

    B waits forever, where as A will wait 3 seconds and then kill B and report a timeout if ACK never is found by B.

    I tried to do something like this:
    PHP Code:
    //ROUTINE A
        
    public static boolean canGetLine() {
            
    ExecutorService executor Executors.newFixedThreadPool(1);
            
    Future f executor.submit(new Runnable() {
                @
    Override
                
    public void run() {
                    
    String i;
                    try {
                        
    peekString();
                    } catch (
    Exception e) {
                        
    System.out.println("canGetLine: "+e.toString());
                        
    e.printStackTrace();
                    }
                }
            });
            try {
                
    f.get(3000TimeUnit.MILLISECONDS);
            } catch (
    Exception e) {
                
    System.out.println("f.get: "+e.toString());
    e.printStackTrace();
                
    f.cancel(true);
                
    executor.shutdownNow();
                return 
    false;
            }
            
    f.cancel(true);
            return 
    true;
        } 
    Routine B (peakString()) blocks until the ACK (\n for testing) is found.
    PHP Code:
    public synchronized String peekString() throws InterruptedException {

            
    T result null;
            
    String message "";
            
    int testElements unconsumedElements;
            if (
    testElements == 0) {
                
    System.out.println("peekString: no elements. waiting");
                
    wait();
                
    System.out.println("peekString: " unconsumedElements " elements");
                
    testElements unconsumedElements;
            }
            do {
                
    /* Use peek() to get the element to return. */
                
    if (unconsumedElements message.length() <= && !message.endsWith("\n")) {
                    while (
    unconsumedElements message.length() <= 0) {
                        
    System.out.println("inner peekString: no elements. waiting");
                        
    wait();
                        
    System.out.println("inner peekString: " unconsumedElements " elements");
                    }
                    
    testElements unconsumedElements message.length();
                }
                
    result elements[(offset + (capacity() - testElements)) % capacity()];
                
    message += String.format("%c"result);
                --
    testElements;
            } while (!
    message.endsWith("\n"));
            
    System.out.println("Found line!");
            return 
    message;
        } 
    This setup works...sort of. However I don't know threads very well and this all seems quite awkward. I'm trying to debug it and make it exit more gracefully. Can someone provide some examples or suggestions to make this more robust? (All the println's are in there along with stacktrace so I can try to learn what is going on inside the threads).
  2. #2
  3. Contributing User
    Devshed Expert (3500 - 3999 posts)

    Join Date
    Aug 2010
    Location
    Eastern Florida
    Posts
    3,696
    Rep Power
    347
    Can the code be tested without a serial port? Do you have a driver for testing without the serial port?
  4. #3
  5. No Profile Picture
    Registered User
    Devshed Newbie (0 - 499 posts)

    Join Date
    Aug 2011
    Posts
    7
    Rep Power
    0
    Originally Posted by NormR
    Can the code be tested without a serial port? Do you have a driver for testing without the serial port?
    Thanks for your help.

    Yes. Here's an example of just the buffer by itself. I just write to the buffer and read from it. So here's a test case:

    PHP Code:
    //Setup synchronous ring buffer
            
    inBuffer = new RingBuffer(bufSize);
           
    System.out.println("Testing ringBuffer timeouts");
            try{
                
    inBuffer.add('a'); // add one char
                
    inBuffer.add('\n'); // add terminating char for canGetLine
                
    while (inBuffer.canGetLine()) {
                    
    System.out.println("Found a line");
                    
    System.out.println(inBuffer.remove()); // gets one line (a\n) then times out during next loop
                
    }
            }catch(
    Exception e){} 
    Here's the whole class (sorry about all the debug stuff everywhere) that I'm modifying:

    PHP Code:
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;

    /**
     * ****************************************************************************
     * File: RingBuffer.java Author: Keith Schwarz (htiek@cs.stanford.edu)
     *
     * An implementation of a synchronized queue backed by a ring buffer. This
     * functionality and implementation is similar to the ArrayBlockingQueue class,
     * but I thought that I'd implement my own version to get a better feel for how
     * it works.
     *
     * A ring buffer is a space-efficient, locality-friendly implementation of a
     * FIFO queue. It is implemented as a fixed-sized array that is treated as
     * though it wraps around like a ring; it has no well-defined start or end
     * point. This array stores two pointers, a read pointer and a write pointer,
     * delineating where the next insert should take place and from where the next
     * element should be dequeued. For example:
     *
     * [2] [3] [ ] [ ] [ ] [ ] [0] [1] ^ ^ | | write read
     *
     * When using a ring buffer, one must be careful not to let the read and write
     * pointers cross one another. If this happens, future write operations will
     * start overwriting old elements that have not yet been consumed. For this
     * reason, most ring buffers adopt one of two strategies. First, the ring buffer
     * can increase its size whenever it runs out of room. This approach allows the
     * buffer to grow arbitrarily large if need be. The second option, and the one
     * used in this implementation, is simply to block on a read or write when data
     * is not available. This allows the ring buffer to implement the
     * producer/consumer pattern fairly easily; any number of threads can begin
     * creating data while some number of threads consume it, and at no time are too
     * many elements kept in memory waiting to be read.
     */
    public final class RingBuffer<T> {
        
    /* The actual ring buffer. */

        
    private final T[] elements;

        
    /* The write pointer, represented as an offset into the array. */
        
    private int offset 0;

        
    /* The read pointer is encoded implicitly by keeping track of the number of
         * unconsumed elements.  We can then determine its position by backing up
         * that many positions before the read position.
         */
        
    private int unconsumedElements 0;

        
    /**
         * Constructs a new RingBuffer with the specified capacity, which must be
         * positive.
         *
         * @param size The capacity of the new ring buffer.
         * @throws IllegalArgumentException If the capacity is negative.
         */
        
    @SuppressWarnings("unchecked")
        public 
    RingBuffer(int size) {
            
    /* Validate the size. */
            
    if (size <= 0) {
                throw new 
    IllegalArgumentException("RingBuffer capacity must be positive.");
            }

            
    /* Construct the array to be that size. */
            
    elements = (T[]) new Object[size];
        }

        
    /**
         * Appends an element to the ring buffer, blocking until space becomes
         * available.
         *
         * @param elem The element to add to the ring buffer.
         * @throws InterruptedException If the thread is interrupted before the
         * insertion completes.
         */
        
    public synchronized void add(T elemthrows InterruptedException {
            
    /* Block until the capacity is nonzero.  Otherwise we don't have any
             * space to write.
             */
            
    while (unconsumedElements == elements.length) {
                
    wait();
            }

            
    /* Write the element into the next open spot, then advance the write
             * pointer forward a step.
             */
            
    elements[offset] = elem;
            
    offset = (offset 1) % elements.length;

            
    /* Increase the number of unconsumed elements by one, then notify any
             * threads that are waiting that more data is now available.
             */
            
    ++unconsumedElements;
            
    notifyAll();
        }

        
    /**
         * Returns the maximum capacity of the ring buffer.
         *
         * @return The maximum capacity of the ring buffer.
         */
        
    public int capacity() {
            return 
    elements.length;
        }

        
    /**
         * Observes, but does not dequeue, the next available element, blocking
         * until data becomes available.
         *
         * @return The next available element.
         * @throws InterruptedException If the caller is interrupted before data
         * becomes available.
         */
        
    public synchronized T peek() throws InterruptedException {
            
    /* Wait for data to become available. */
            
    while (unconsumedElements == 0) {
                
    wait();
            }

            
    /* Hand back the next value.  The index of this next value is a bit
             * tricky to compute.  We know that there are unconsumedElements
             * elements waiting to be read, and they're contiguously before the
             * write position.  However, the buffer wraps around itself, and so we
             * can't just do a naive subtraction; that might end up giving us a
             * negative index.  To avoid this, we'll use a clever trick in which
             * we'll add to the index the capacity minus the distance.  This value
             * must be positive, since the distance is never greater than the
             * capacity, and if we then wrap this value around using the modulus
             * operator we'll end up with a valid index.  All of this machinery
             * works because
             *
             *                 (x + (n - k)) mod n == (x - k) mod n
             * 
             * And Java's modulus operator works best on positive values.
             */
            
    return elements[(offset + (capacity() - unconsumedElements)) % capacity()];
        }

        
    /**
         * Observes, but does not dequeue, the next available element. If no data is
         * present this routine will return without blocking.
         *
         * @return The next available element or null if no data.
         * @throws InterruptedException If the caller is interrupted before data
         * becomes available.
         */
        
    public synchronized T peekNoBlock() throws IOException {
            
    /* Wait for data to become available. */
            
    if (unconsumedElements == 0) {
                throw new 
    IOException("No More Data In Buffer");
            }
            return 
    elements[(offset + (capacity() - unconsumedElements)) % capacity()];
        }

        
    /**
         * Removes and returns the next available element(s) to first newline (\n)
         *
         * @return The next available element
         * @throws IOException If no data is present
         */
        
    public synchronized String removeString() throws InterruptedException {

            
    T result null;
            
    String message "";
            do {
                
    /* Use peek() to get the element to return. */
                
    try {
                    
    result peek();
                } catch (
    InterruptedException e) {
                    throw 
    e;
                }
                
    message += String.format("%c"result);

                
    /* Mark that one fewer elements are now available to read. */
                
    --unconsumedElements;

                
    /* Because there is more space left, wake up any waiting threads. */
                
    notifyAll();
            } while (!
    message.endsWith("\n"));

            return 
    message;
        }

        
    /**
         * Returns the next available element(s) to first newline (\n) does not
         * dequeue. Blocks until (\n) is found.
         *
         * @return The next available string
         */
        
    public synchronized String peekString() throws InterruptedException {

            
    T result null;
            
    String message "";
            
    int testElements unconsumedElements;
            if (
    testElements == 0) {
                
    System.out.println("peekString: no elements. waiting");
                
    wait();
                
    System.out.println("peekString: " unconsumedElements " elements");
                
    testElements unconsumedElements;
            }
            do {
                
    /* Use peek() to get the element to return. */
                
    if (unconsumedElements message.length() <= && !message.endsWith("\n")) {
                    while (
    unconsumedElements message.length() <= 0) {
                        
    System.out.println("inner peekString: no elements. wating");
                        
    wait();
                        
    System.out.println("inner peekString: " unconsumedElements " elements");
                    }
                    
    testElements unconsumedElements message.length();
                }
                
    result elements[(offset + (capacity() - testElements)) % capacity()];
                
    message += String.format("%c"result);
                --
    testElements;
            } while (!
    message.endsWith("\n"));
            
    System.out.println("Found line!");
            return 
    message;
        }

        
    /**
         * Removes and returns the next available element, blocking until data
         * becomes available.
         *
         * @return The next available element
         * @throws InterruptedException If the caller is interrupted before data
         * becomes available.
         */
        
    public synchronized T remove() throws InterruptedException {
            
    /* Use peek() to get the element to return. */
            
    T result peek();

            
    /* Mark that one fewer elements are now available to read. */
            
    --unconsumedElements;

            
    /* Because there is more space left, wake up any waiting threads. */
            
    notifyAll();

            return 
    result;
        }

        
    /**
         * Returns the number of elements that are currently being stored in the
         * ring buffer.
         *
         * @return The number of elements currently stored in the ring buffer.
         */
        
    public synchronized int size() {
            return 
    unconsumedElements;
        }

        
    /**
         * Returns whether the ring buffer is empty.
         *
         * @return Whether the ring buffer is empty.
         */
        
    public synchronized boolean isEmpty() {
            return 
    size() == 0;
        }

        
    /**
         * Purge all data from the buffer
         */
        
    public synchronized void clear() {
            
    unconsumedElements 0;
            
    notifyAll();
        }
        
            public 
    boolean canGetChar() {
            
    ExecutorService executor Executors.newFixedThreadPool(1);
            
    Future f executor.submit(new Runnable() {
                @
    Override
                
    public void run() {
                    
    T i;
                    try {
                        
    peek();
                    } catch (
    Exception e) {
                    }
                }
            });
            try {
                
    f.get(3000TimeUnit.MILLISECONDS);
            } catch (
    InterruptedException ExecutionException TimeoutException e) {
                
    f.cancel(true);
                
    executor.shutdownNow();
                return 
    false;
            }
            
    System.out.println("Got char");
            
    f.cancel(true);
            
    executor.shutdownNow();
            return 
    true;
        }

        public 
    boolean canGetLine() {
            
    ExecutorService executor Executors.newFixedThreadPool(1);
            
    Future f executor.submit(new Runnable() {
                @
    Override
                
    public void run() {
                    
    String i;
                    try {
                        
    peekString();
                    } catch (
    InterruptedException e) {
                        
    //f.get timeout stops thread
                        //should find a way to shutdown thread execution
                        
    System.out.println("canGetLine: " e.toString());
    //                    e.printStackTrace();
                    
    }
                }
            });
            try {
                
    f.get(3000TimeUnit.MILLISECONDS);
            } catch (
    InterruptedException ExecutionException TimeoutException e) {
                
    System.out.println("f.get: " e.toString());
    //            e.printStackTrace();
                
    f.cancel(true);
                
    executor.shutdownNow();
                return 
    false;
            }
            
    f.cancel(true);
            return 
    true;
        }

  6. #4
  7. Contributing User
    Devshed Expert (3500 - 3999 posts)

    Join Date
    Aug 2010
    Location
    Eastern Florida
    Posts
    3,696
    Rep Power
    347
    How do you execute the code for testing? There is no main() method.

    This is the best commented code that I have seen in a long time.
    Last edited by NormR; December 25th, 2012 at 09:15 AM.
  8. #5
  9. No Profile Picture
    Registered User
    Devshed Newbie (0 - 499 posts)

    Join Date
    Aug 2011
    Posts
    7
    Rep Power
    0
    Originally Posted by NormR
    How do you execute the code for testing? There is no main() method.

    This is the best commented code that I have seen in a long time.
    Most of the code isn't mine. I extracted the test case from my build. You can just take the RingBuffer class and add the test case into a main method like this:
    PHP Code:
        public static void main(String[] args) {
           
    //Setup synchronous ring buffer
           
    inBuffer = new RingBuffer(bufSize);
           
    System.out.println("Testing RingBuffer timeouts");
            try{
                
    inBuffer.add('a'); // add one char
                
    inBuffer.add('\n'); // add terminating char for canGetLine
                
    while (inBuffer.canGetLine()) {
                    
    System.out.println("Found a line");
                    
    System.out.println(inBuffer.remove()); // gets one line (a\n) then times out during next loop
                
    }
            }catch(
    Exception e){
               
    System.err.println("Error:  "+e.getMessage());
            }  
        } 
    Like I said it works, but it seems like it could be done a better way.

IMN logo majestic logo threadwatch logo seochat tools logo