BlockingQueue.size() returns wrong size in Publisher-Subscribers

2015-02-01T03:21:40

I have the problem regarding the implementation of One Publisher - Multiple Subscribers pattern. The Publisher uses the fixed-size buffer and queue the messages. The messages are send to all subscribers. The ordering of messages get by subscribers must be the same as the ordering of publishing messages.

I use BlockingQueue to hold publisher messages (publisherQueue) and pass them to each subscriber BlockingQueue (subscriberQueue).

The issue is that the buffer and subscribers are working correctly, but the buffer size (publisherQueue.size()) always returns 1.

System.out.println("Actual number of messages in buffer: " + publisherQueue.size());

Here is my full code:

PublisherSubscriberService.java

package program;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class PublisherSubscriberService {
    private int buffer;
    private int subscribersNumber;
    static Set<subscriber> subscribers = new HashSet<subscriber>();

    public PublisherSubscriberService(int buffer, int subscribersNumber) {
        this.buffer = buffer;
        this.subscribersNumber = subscribersNumber;
    }

    public void addsubscriber(subscriber subscriber) {
        subscribers.add(subscriber);
    }

    public void start() {
        publisher publisher = new publisher(buffer);
        System.out.println("publisher started the job");

        for (int i = 0; i < subscribersNumber; i++) {
            subscriber subscriber = new subscriber(buffer);
            subscriber.setName(Integer.toString(i + 1));
            subscribers.add(subscriber);
            new Thread(subscriber).start();
            System.out.println("Subscriber " + subscriber.getName() + " started the job");
        }
        new Thread(publisher).start();
    }

    public class Publisher implements Runnable {
        private int buffer;
        final BlockingQueue<Message> publisherQueue;

        public Publisher(int buffer) {
            this.buffer = buffer;
            publisherQueue = new LinkedBlockingQueue<>(buffer);
        }

        @Override
        public void run() {
            for (int i = 1; i < 100; i++) {
                Message messageObject = new Message("" + i);
                try {
                    Thread.sleep(50);
                    publisherQueue.put(messageObject);
                    System.out.println("Queued message no " +         messageObject.getMessage());
                    System.out.println("Actual number of messages in buffer:     " + publisherQueue.size());
                    for (subscriber subscriber : subscribers) {
                        subscriber.subscriberQueue.put(messageObject);
                    }
                    publisherQueue.take();
                } catch (InterruptedException e) {
                    System.out.println("Some error");
                    e.printStackTrace();
                }
            }
        }
    }

    class Subscriber implements Runnable {
        private String name;
        private int buffer;
        final BlockingQueue<Message> subscriberQueue;

        public Subscriber(int buffer) {
            this.buffer = buffer;
            subscriberQueue = new LinkedBlockingQueue<>(buffer);
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {
            try {
                Message messageObject;
                while (true) {
                    Thread.sleep(100);
                    messageObject = subscriberQueue.take();
                    System.out.println(this.getName() + " got message: " + messageObject.getMessage());
                }
            } catch (InterruptedException e) {
                System.out.println("Some error");
                e.printStackTrace();
            }
        }
    }
class Message {
    private String message;

    public Message(String str) {
        this.message = str;
    }

    public String getMessage() {
        return message;
    }

}
}

PublisherSubscriberProgram.java

    package program;

public class ProducerConsumerProgram {

    public static void main(String[] args) {
        ProducerConsumerService service = new ProducerConsumerService(10, 3);
        service.start();
    }
}

Copyright License:
Author:「Admx」,Reproduced under the CC 4.0 BY-SA copyright license with link to original source & disclaimer.
Link to:https://stackoverflow.com/questions/28255559/blockingqueue-size-returns-wrong-size-in-publisher-subscribers

About “BlockingQueue.size() returns wrong size in Publisher-Subscribers” questions

I have the problem regarding the implementation of One Publisher - Multiple Subscribers pattern. The Publisher uses the fixed-size buffer and queue the messages. The messages are send to all subscr...
This issue is probably a bug in Qt which I have just reposted at https://bugreports.qt.io/browse/QTBUG-95999 But until it gets fixed I would need any workaround. Unfortunately I cannot find any. Any
I have a .png file in my resources folder.(actual size is 411 KB) When I convert the uiimage to nsdata and try accessing length property, it gives me wrong value. Code... UIImage *image = [UIImage
i have a problem - ASP.NET MVC returns file with wrong size, therefore it is corrupted and cant open it. All files on server are good and i can open them, so the problem is in returning and downloa...
I am trying to match the size of button1 to button2 by checking the size of button1 and then setting the size of button2 to match, but size() on button1 returns the incorrect value of (640, 480) un...
My project target iOS - 11 and later. It has correct Launch images assets for XR and XS Max with appropriate dimensions. On simulator my app works as expected on both XR and XS Max. But, users rep...
This bug is driving me nuts. I have an MFC application with a window in which I can draw and interact. The problem is that the size of the CHwndRenderTarget does not match the actual size of the wi...
I have an atlas of images, they are 80 by 80. But SKTexture says some of frames are 33x80 despite them clearly being 80x80. What is wrong? My animation looks very wrong when it runs. Here is the c...
I am using media recorder to record audio then I upload it to firebase, by saving the output file content into a byte array, the problem is that the recorded file is correctly saved and played, whi...
I am picking an image from the gallery and querying its size via ContentResolver API, it returns 29kb. However when I check the file via adb using ls -al it is 44kb here is how I query the size o...

Copyright License:Reproduced under the CC 4.0 BY-SA copyright license with link to original source & disclaimer.