Observers in multithreaded environment

Observer design pattern:

Let’s begin with the general definition:

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.Wikipedia

In this post we will discuss various implementations of Observer design pattern in single-threaded and multi-threaded environments, and review one common mistake that programmers do in multi-threaded implementations.

Basic single-threaded (non thread-safe) implementation:

When using Observer design pattern, the set of Observers is not known at compile time – Observable (another name for Subject) should allow addition and removal of Observers at runtime.

In the simplest case, Observable object in Observer design pattern can be implement this way:

public class BasicObservableExample {

    public interface Observer {
        void onObservableChanged();
    }

    private Set<Observer> mObservers;

    /**
     * This method adds a new Observer - it will be notified when Observable changes
     */
    public void registerObserver(Observer observer) {
        if (observer == null) return;
        if (mObservers == null) {
            mObservers = new HashSet<>(1);          
        }
        mObservers.add(observer);
    }
    
    /**
     * This method removes an Observer - it will no longer be notified when Observable changes
     */
    public void unregisterObserver(Observer observer) {
        if (mObservers != null && observer != null) {
            mObservers.remove(observer);
        }
    }

    /**
     * This method notifies currently registered observers about Observable's change
     */
    private void notifyObservers() {
        if (mObservers == null) return;
        for (Observer observer : mObservers) {
            observer.onObservableChanged();
        }
    }

}

Invocations of all the listed methods are assumed to take place on a single thread, therefore there is no need for synchronization or thread-safe data structures. This is an ideal implementation if you can guarantee the “single thread condition” because the best way to write thread-safe code is to avoid the need for thread-safe code (which is a reduction of the general rule “the best code is the code that wasn’t written” to multi-threaded environments).

What can happen if we use non thread-safe implementation in multi-threaded environment:

In the most general case of multi-threaded environment we shall assume that all methods will be called on random threads at random times. The implementation we used for a single-threaded environment is not safe anymore. It is not hard to imagine a flow when two distinct threads attempt to register new Observers at the same time, and end up leaving our system screwed up. One such flow could be (there are many other flows which could break non-thread-safe implementation):

  1. Thread A invokes registerObserver(Observer)
  2. Thread A executes mObservers == null check and proceeds to instantiation of a new set
  3. before Thread A got a chance to create a new set, OS suspended it and resumed execution of Thread B
  4. Thread B executes steps 1 and 2 above
  5. since Thread A hasn’t instantiated the set yet, Thread B instantiates a new set and stores a reference to it in mObservers
  6. Thread B adds an observer to the newly created set
  7. at some point OS resumes execution of Thread A (which was suspended right before instantiation of a new set)
  8. Thread A instantiates a new set and overrides the reference in mObservers
  9. Thread A adds an observer to the newly created set

Despite the fact that both calls to registerObserver(Observer) completed successfully, the end result is that only one observer will be notified when notifyObservers() called. It is important to understand that any of the observers could end up being “ignored”, or both observers could be registered successfully – the outcome depends on the scheduling of threads by OS which we can’t control. This non-determinism is what makes multi-threading bugs very hard to track and resolve.

Basic thread-safe implementation:

Thread-safe classes in Java usually employ one of the following techniques (or mix them together):

  1. Synchronization of access to non-thread-safe parts of code
  2. Using existing thread-safe objects in non-thread-safe parts of code

IMHO, if you have an option to use tested thread-safe objects in order to make your own code thread safe – you should always favor that approach over synchronization. I’m saying this because custom synchronization has higher probability of being buggy, and bugs in multi-threaded code are like non-detonated mines, shells and bombs which remain in ground after wars: they can sit there quietly for years before exploding, and the more “infrastructure” was build around that place the larger the disaster when they detonate.

Therefore, for the most basic multi-threaded case where all you want to do is to add/remove/notify Observers from random threads, I recommend the following implementation:

public class BasicThreadSafeObservableExample {

    public interface Observer {
        void onObservableChanged();
    }
    
    // Can use CopyOnWriteArraySet too
    private final Set<Observer> mObservers = Collections.newSetFromMap(
            new ConcurrentHashMap<Observer, Boolean>(0));
    
    /**
     * This method adds a new Observer - it will be notified when Observable changes
     */
    public void registerObserver(Observer observer) {
        if (observer == null) return;
        mObservers.add(observer); // this is safe due to thread-safe Set
    }
    
    /**
     * This method removes an Observer - it will no longer be notified when Observable changes
     */
    public void unregisterObserver(Observer observer) {
        if (observer != null) {
            mObservers.remove(observer); // this is safe due to thread-safe Set
        }
    }

    /**
     * This method notifies currently registered observers about Observable's change
     */
    private void notifyObservers() {
        for (Observer observer : mObservers) { // this is safe due to thread-safe Set
            observer.onObservableChanged();
        }
    }

}

Note that the reference to the Set is finalized. Finalization is important here because it ensures safe initialization and publication of the Set among different threads.

Also note that we had to give up lazy initialization of the Set of Observers – we instantiate a thread-safe ConcurrentHashMap and convert it into Set at Observable’s creation time. This is the “cost” of using a thread-safe data structure instead of custom synchronization (as will be shown later, we can keep lazy initialization when we use custom synchronization). Is this an issue? Well, this Set will be allocated even if no Observer will ever be registered with this Observable, which translates into several tens of bytes being wasted. This is not an issue in most situations.

Another delicate point with this implementation is that Observers could still be notified even after a call to unregisterObserver(Observer) returns. The reason for this behavior is that contract of ConcurrentHashMap does not guarantee that concurrent modifications will be reflected into an already existing iterators – if thread B removes some Observer while thread A has already entered into for-each loop in notifyObservers(), thread A could still notify that Observer even though the Observer has already been removed. In other words, after a call to unregisterObserver(Observer) had returned the Observer could still be notified about pending events which happened before unregistration. Under no circumstances will the Observer be notified about events that happened after unregistration completed. This is not an issue from functional point of view, but the Observer should be able to cope with these “spurious” notifications (by e.g. simply discarding them).

There are some cases, however, when the above thread-safe implementation will not suffice. One such case is when lazy initialization is required (whatever the reason). Another case is when some initialization should be done when the very first Observer registered, and some cleanup should be done when the last observer unregistered. We will use second, more complicated case, in order to demonstrate the most general approach to implementation of Observable design pattern in multi-threaded environment, and review one common mistake that programmers do.

Advanced thread-safe implementation (WRONG):

The below code uses custom synchronization approach in order to ensure thread-safety and atomicity of steps performed in registerObserver(Observer) and unregisterObserver(Observer) methods. From what I saw, the synchronization scheme used in this code is the most common approach to implementation of thread-safe Observer design pattern (though sometimes the monitor is not a private final object, but Observable itself, which is another potential source of bugs due to “implicit sharing” of a reference to monitor object with outside world).

public class AdvancedWrongThreadSafeObservableExample {

    public interface Observer {
        void onObservableChanged();
    }

    // this is the object we will be synchronizing on ("the monitor")
    private final Object MONITOR = new Object();
    
    private Set<Observer> mObservers; 
    
    /**
     * This method adds a new Observer - it will be notified when Observable changes
     */
    public void registerObserver(Observer observer) {
        if (observer == null) return;
        
        synchronized(MONITOR) {
            if (mObservers == null) {
                mObservers = new HashSet<>(1);
            }
            if (mObservers.add(observer) && mObservers.size() == 1) {
                performInit(); // some initialization when first observer added
            }
        }
    }
    
    /**
     * This method removes an Observer - it will no longer be notified when Observable changes
     */
    public void unregisterObserver(Observer observer) {
        if (observer == null) return;
        
        synchronized(MONITOR) {
            if (mObservers != null && mObservers.remove(observer) && mObservers.isEmpty()) {
                performCleanup(); // some cleanup when last observer removed
            }
        }
    }

    /**
     * This method notifies currently registered observers about Observable's change
     */
    private void notifyObservers() {
        synchronized(MONITOR) {
            if (mObservers != null) {
                for (Observer observer : mObservers) { 
                    observer.onObservableChanged();
                }
            }
        }
    }

}

Functionally this works well and is thread-safe. However, there is one issue with this code which can become a source for very nasty bugs in the future.

Note that access to the set of Observers is synchronized on exactly the same monitor in all methods. Furthermore, a thread that calls notifyObservers() will hold a lock on this monitor for as long as it takes to notifies all the Observers. Depending on implementation of onObservableChanged() method in Observers, it might take a while to notify all Observers, which is not an issue in general (it is common to call notifyObservers() in Observable on a specially spawn thread in order to account for the possible delays, or even spawn a new “notification” thread per Observer), but it can become an issue if some other thread attempts to register/unregister Observers during ongoing Observers notification!

Think about this from perspective of a programmer who writes a code that uses our Observable – this programmer would expect registerObserver(Observer) and unregisterObserver(Observer) methods to return immediately. If one of these methods block for a prolonged period of time (e.g. few seconds) due to ongoing Observers notification, the code that was written under assumption of immediate return is no longer guaranteed to function properly. And by “no longer guaranteed to function properly” I mean that the possible behavior spawns the entire range from “continue to execute after such a delay as if nothing happened” to “crash the entire system and corrupt important data”.

Additionally, Observers become exposed to deadlock if they attempt to obtain lock on their internal monitors during onObservableChanged() call.

But the worst part of the above issues is that it will be extremely difficult to find a root cause of such behaviors because they might not be reproducible.

This is very serious issue and under no circumstances should a programmer jeopardize the system it works on in such a way!

Advanced thread-safe implementation:

It turns out that it is not that difficult to resolve the issue described in the previous section. The following code contains the fix (the only affected method is notifyObservers()):

public class AdvancedThreadSafeObservableExample {

    public interface Observer {
        void onObservableChanged();
    }

    // this is the object we will be synchronizing on ("the monitor")
    private final Object MONITOR = new Object();
    
    private Set<Observer> mObservers; 
    
    /**
     * This method adds a new Observer - it will be notified when Observable changes
     */
    public void registerObserver(Observer observer) {
        if (observer == null) return;
        
        synchronized(MONITOR) {
            if (mObservers == null) {
                mObservers = new HashSet<>(1);
            }
            if (mObservers.add(observer) && mObservers.size() == 1) {
                performInit(); // some initialization when first observer added
            }
        }
    }
    
    /**
     * This method removes an Observer - it will no longer be notified when Observable changes
     */
    public void unregisterObserver(Observer observer) {
        if (observer == null) return;
        
        synchronized(MONITOR) {
            if (mObservers != null && mObservers.remove(observer) && mObservers.isEmpty()) {
                performCleanup(); // some cleanup when last observer removed
            }
        }
    }

    /**
     * This method notifies currently registered observers about Observable's change
     */
    private void notifyObservers() {
        Set<Observer> observersCopy;
        
        synchronized(MONITOR) {
            if (mObservers == null) return;
            observersCopy = new HashSet<>(mObservers);
        }
        
        for (Observer observer : observersCopy) { 
            observer.onObservableChanged();
        }
    }
}

As you see, the fix is very simple: the thread that calls notifyObservers() obtains a lock on the monitor as before, but it doesn’t notify the Observers when holding a lock anymore. Instead, it copies the set of Observers to a local Set and releases a lock, and only then proceeds to iteration over local set and Observers’ notification.

Conclusion:

In this post we discussed the implementation of Observer design pattern in a single and multi-threaded environments. Two solutions were provided for multi-threaded implementation: “basic” for a situation when all you want is to add/remove/notify the Observers (it uses thread-safe data structures in order to ensure thread safety), and “advanced” for more complex situation when the “basic” implementation doesn’t suffice anymore (it uses a custom synchronization).

We also reviewed one very common mistake that programmers do when implementing a thread-safe Observer design pattern using custom synchronization approach.

Please leave your comments and questions below, and consider subscribing to our newsletter if you liked the post.

This article has 5 comments

  1. Minal Reply

    A very well written article, good insight on the simplest and critical problem!!
    Thank you!!!

  2. Munish Reply

    In Advanced thread-safe implementation:
    There is a possiblity that performInit() will not called.
    T1 got suspended after calling mObservers.add(observer) .
    T2 comes along and call mObservers.add(observer) and find size is 2 .
    t1 also finds size as 2.

    • Vasiliy Reply

      Hi. Please note that this section of code is “guarded” by synchronization. While T1 calls mObservers.add(observer), it must hold a lock on MONITOR. This ensures that no other thread can concurrently enter this (or any other) synchronized section until T1 releases the lock. In your example, even if T1 will be suspended during execution of synchronized section, no other thread will be able to enter this section. Therefore T2 will not interfere with T1, and performInit() will be called as required.

  3. Pod Reply

    Have you thought about either of these:

    1. using a copyonwriteset instead of a set?
    2. using a ReentrantReadWriteLock + fair-mode rather than copying the set each time?

    • Vasiliy Reply

      Hi Pod, thanks for your comment.

      1. I guess you referred to CopyOnWriteArraySet, right? It is totally possible to use it instead of a set created from ConcurrentHashMap. The main difference between these approaches is different performance characteristics. Therefore, in case of performance bottlenecks, it would be best to benchmark both of them and use one that performs better. However, performance optimizations are outside the scope of this post. Though there was one mistake in the code – there was a comment (now removed) that stated that there is no thread-safe implementation of Set in java. This is wrong. Thanks for pointing out.

      2. If we use ReentrantReadWriteLock and do not copy the set, then the thread that executes notification will need to hold a read lock while notifying. If some other thread will try to add/remove observer during this time, it won’t be able to acquire the write lock until the notifying thread is done. This brings us back to the undesired behavior of blocking on add/remove observers calls. Am I missing something here?

      However if we use CopyOnWriteArraySet, as you suggested, then we could remove the code that copies the set. This is possible because CopyOnWriteArraySet internally copies the set during each modification. But this is more an alternative to the approach I called “basic” than “advanced” because it relies on a thread-safe collection.

      And another bug I found in my code is that the “basic” case wasn’t really thread-safe – the reference to the set of observers wasn’t finalized, therefore there were no guarantees about consistent visibility among different threads (now fixed).

      Thank you for bringing these points up.

Leave a Comment

Your email address will not be published. Required fields are marked *