Advertisement
Help Keep Boards Alive. Support us by going ad free today. See here: https://subscriptions.boards.ie/.
If we do not hit our goal we will be forced to close the site.

Current status: https://keepboardsalive.com/

Annual subs are best for most impact. If you are still undecided on going Ad Free - you can also donate using the Paypal Donate option. All contribution helps. Thank you.
https://www.boards.ie/group/1878-subscribers-forum

Private Group for paid up members of Boards.ie. Join the club.

Java Producer Consumer Issue

  • 10-11-2016 04:51PM
    #1
    Closed Accounts Posts: 6,075 ✭✭✭


    Can anyone explain how consumer-2 below is consuming a 'null'? My code should be preventing this.
    public class Test {
    
        public static void main(String args[]) throws InterruptedException {
    
            BoundedQueue<Integer> sharedQueue = new BoundedQueue<>(10);
    
            Callable<Integer> producer1 = new Producer(sharedQueue, "producer-1");
            Callable<Integer> producer2 = new Producer(sharedQueue, "producer-2");
            Callable<Integer> consumer1 = new Consumer(sharedQueue, "consumer-1");
            Callable<Integer> consumer2 = new Consumer(sharedQueue, "consumer-2");
    
            Collection<Callable<Integer>> callables = new HashSet<>();
            callables.add(producer1);
            callables.add(producer2);
            callables.add(consumer1);
            callables.add(consumer2);
    
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            executorService.invokeAll(callables);
        }
    }
    
    package com.bounded.queue;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundedQueue<T> {
    
        private int capacity;
        private int head;
        private int tail;
        private int currentSizeOfBuffer;
        private T[] buffer;
    
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition notFull = lock.newCondition();
        private final Condition notEmpty = lock.newCondition();
    
        public BoundedQueue(int capacity) {
            this.capacity = capacity;
            this.buffer = (T[]) new Object[capacity];
        }
    
        public void put(T element) throws InterruptedException {
    
            final ReentrantLock lock = this.lock;
            lock.lock();
    
            if(isBufferFull()) {
                waitOnAvailableSlot();
            }
    
            try {
                buffer[tail] = element;
                tail = getNextAvailableSlot(tail);
                currentSizeOfBuffer++;
    
                informConsumerQueueHasElement();
    
            } finally {
                lock.unlock();
            }
        }
    
        private boolean isBufferFull() {
            return capacity == currentSizeOfBuffer;
        }
    
        private void waitOnAvailableSlot() throws InterruptedException {
            notFull.await();
        }
    
        private void informConsumerQueueHasElement() {
            notEmpty.signal();
        }
    
        public T take() throws InterruptedException {
    
            final ReentrantLock lock = this.lock;
            lock.lock();
    
            if(isBufferEmpty()) {
                waitOnAvailableElement();
            }
    
            try {
                T element = buffer[head];
                head = getNextAvailableSlot(head);
                currentSizeOfBuffer--;
    
                informProducerQueueHasSpaceAvailable();
    
                return element;
            } finally {
                lock.unlock();
            }
        }
    
        private boolean isBufferEmpty() {
            return 0 == currentSizeOfBuffer;
        }
    
        private void waitOnAvailableElement() throws InterruptedException {
            notEmpty.await();
        }
    
        private void informProducerQueueHasSpaceAvailable() {
            notFull.signal();
        }
    
        private final int getNextAvailableSlot(int currentSlotPosition) {
            int nextAvailableSlot = ++currentSlotPosition;
            return (nextAvailableSlot == capacity) ? 0 : nextAvailableSlot;
        }
    }
    
    package com.bounded.queue.jobs;
    
    import com.bounded.queue.BoundedQueue;
    
    import java.util.concurrent.Callable;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class Producer implements Callable<Integer> {
    
        private final BoundedQueue sharedQueue;
        private String name;
    
        @Override
        public Integer call() throws Exception {
    
            for(int i=0; i<10; i++){
                try {
                    sharedQueue.put(i);
                    System.out.println(name + " produced: " + i);
                } catch (InterruptedException ex) {
                    Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            return null;
        }
    
        public Producer(BoundedQueue sharedQueue, String name) {
            this.sharedQueue = sharedQueue;
            this.name = name;
        }
    }
    
    package com.bounded.queue.jobs;
    
    import com.bounded.queue.BoundedQueue;
    
    import java.util.concurrent.Callable;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class Consumer implements Callable<Integer> {
    
        private final BoundedQueue sharedQueue;
        private String name;
    
        @Override
        public Integer call() throws Exception {
    
            while(true){    //what is happening here?
                try {
                    Integer element = (Integer) sharedQueue.take();
                    System.out.println(name + " consumed: "+ element);
                } catch (InterruptedException ex) {
                    Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    
        public Consumer(BoundedQueue sharedQueue, String name) {
            this.sharedQueue = sharedQueue;
            this.name = name;
        }
    }
    

    Output:
    producer-2 produced: 0
    consumer-2 consumed: null
    consumer-1 consumed: 0
    producer-2 produced: 1
    producer-2 produced: 2
    consumer-2 consumed: 2
    consumer-1 consumed: 0
    producer-1 produced: 0
    consumer-2 consumed: 3
    etc
    

    Another run did this:
    producer-2 produced: 0
    consumer-1 consumed: 0
    consumer-2 consumed: null
    producer-1 produced: 0
    producer-2 produced: 1
    producer-1 produced: 1
    consumer-2 consumed: 0
    consumer-1 consumed: null
    consumer-2 consumed: 2
    etc
    


Comments

  • Closed Accounts Posts: 6,075 ✭✭✭IamtheWalrus


    I replaced
    if(isBufferFull()) {
                waitOnAvailableSlot();
            }
    

    with
    while(isBufferFull()) {
                waitOnAvailableSlot();
            }
    

    and
    if(isBufferEmpty()) {
                waitOnAvailableElement();
            }
    

    with
    while(isBufferEmpty()) {
                waitOnAvailableElement();
            }
    

    and it fixed the issue.


  • Moderators, Technology & Internet Moderators Posts: 1,337 Mod ✭✭✭✭croo


    Your change might have stopped the error occurring but being lazy and not wanting to debug all your code; I did wonder if the problem could be caused by you use of a Set of "callables" to test.
    Collection<Callable<Integer>> callables = [B]new HashSet<>()[/B];
    callables.add(producer1);
    callables.add(producer2);
    callables.add(consumer1);
    callables.add(consumer2);
    
    The Set has no order so while you add produce1 & 2 followed by consumers they might no necessarily be retrieved again from the queue in that order. And that might mean a consumer is called before a producer!
    This would explain the non-repeatable nature of your initial error.


Advertisement