Wednesday, January 14, 2015

Using RefCount To Create Automatic Disposal and Lazy Connections

This is quite a common scenario, I need to create a source data provider from some type of 'expensive' service, eg a stock market price ticker, which will automatically handle the job of connecting and disconnecting as the number of active observers changes.

By 'expensive' we could mean resource-wise expensive, ie amount of memory/ cpu time or financially expensive...perhaps there's a charge levied per subscription.

As an example, let's say you have a trading application with the following requirements:
  • User can open any number of trade entry screens.  
  • The trade entry screen allows the user to enter a stock symbol, eg MSFT
  • Once a symbol has been entered the application should subscribe to a market data source, eg Reuters, for that symbol and display the current price
  • If multiple screens request the same symbol then only one underlying subscription should be utilised - don't go creating yet another subscription for the same symbol
  • When a screen is closed or another symbol is selected, then we should notified when there no longer any active subscribers, thus allowing us to release the underlying source subscription
  • Subsequent requests to an already subscribed symbol should immediately send back the latest price received rather than wait until a new price arrives.
Full Source:


There are a few ways to break this down with the Reactive library.  For this demo I'll use v2.2.5 from NuGet.  Hopefully you're familiar with the NuGet package manager.

After running this in Package Manager Console:

Install-Package Rx-PlatformServices

My packages.config looks like this:

<packages>
  <package id="Rx-Core" version="2.2.5" targetFramework="net451" />
  <package id="Rx-Interfaces" version="2.2.5" targetFramework="net451" />
  <package id="Rx-PlatformServices" version="2.2.5" targetFramework="net451" />
</packages>

Steps

I'm breaking this problem down into a number of steps...if you're not interested in a step-by-step guide then jump to the bottom of this post for the full solution,.




I've created a class called PriceSource which takes single string constructor arg:
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
 
public class PriceSource
{
    private readonly string _symbol;
    private readonly ISubject<int> _subject = new Subject<int>();

    public PriceSource(string symbol)
    {
        _symbol = symbol;
    }
    public IObservable<int> Multicast { getprivate set; }
}
The class level variable _subject will be used shortly to tie together a new observer to our source provider.  The subject acts as observable sequence and an observer.

The Multicast property is be the mechanism by which we expose access to an IObservable. For the sake of this demo we're providing access to an int which represents a ticking price.

To the body of the constructor, add this line: Multicast = CreateMulticast();

public PriceSource(string symbol)
{
    _symbol = symbol;
    Multicast = CreateMulticast();
}
Now add the body of CreateMulticast:
private IObservable<int> CreateMulticast()
{
    const int NumberOfTicksToReplay = 1;
 
    // Use the static Create method so that we can define our own
    // implementation for the return value of the Subscribe function
    return Observable.Create<int>(observer =>
    {
        Console.WriteLine("{0}: Create underlying observable...", _symbol);
        CreateUnderlyingSubscription();
 
        // create a new observable based on our source provider
        var newSubscription = _subject.Subscribe(observer);
 
        // wrap in a dispose action, which will be called once the refcount gets to zero
        var unsubscriber = Disposable.Create(() =>
        {
            // Do release bit 
            Console.WriteLine("{0}: RefCount - all subscriptions disposed. calling Release...", _symbol);
            ReleaseUnderlyingSubscription();
        });
 
        return new CompositeDisposable(newSubscription, unsubscriber);
    })
    .Replay(NumberOfTicksToReplay)
    .RefCount();
}

There's quite a bit going on. Essentially we're using the Observable.Create static method to allow us return an IDisposable from a specific Subscribe implementation.

We'll add CreateUnderlyingSubscription() soon.  This gets called each time a new source provider needs to be created. You might delegate that to a provider factory which knows how to subscribe to a service.

Here we're telling the source provider subject (_subject) that the new subscription should receive notifications:

var newSubscription = _subject.Subscribe(observer); 

Note that _subject is a plain Subject<int>.  We don't need to use a ReplaySubject or BehaviorSubject in our own implementation (see below)

Next we make use of Disposable.Create (from System.Reactive.Disposables). 

Creates a disposable object that invokes the specified action when disposed
Disposable.Create(Action dispose) 

This static method returns an object of type IDisposable and performs the action passed when the Dispose method is called.  As expected, no matter how many times the Dispose method is called the dispose Action will called just once - the very first time (see Idempotence)

By combining this with the .RefCount extension we will be told when it's ready to release the underlying provider's subscription, in our case we'll call ReleaseUnderlyingSubscription()

The body needs to return an IDisposable, so we make use of the CompositeDisposable. Any call to Dispose on the  CompositeDisposable  will result in Dispose() being called on both newSubscription and unsubscriber which RefCount needs in order to track IObservable usage.
 
The last two statements, .Replay() and .RefCount() are key to our requirements.

Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notificationsObservable.Replay
It's an extension method found in System.Reactive.Linq namespace.  It returns an IConnectableObservable<T> which extends IObservable<T>  to include a Connect() method.  I won't go into much more detail here, Connect() is used for sharing of data. 

The particular overload that we are using is passed an integer - that's the size of a replay buffer that we want replied to any new observer.  Here we're passing 1 as we only want the last price to be replayed.  Under the hood Replay makes use of a ReplaySubject -  the reason why we did not need to use that as our type for _subject.

Moving onto the RefCount.  It is an  IConnectableObservable<T> extension method -  returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.

This does all the hard work of tracking calls to Dispose() on the objects that we created in CreateMulticast - it's as simple as that.

All that remains to be done is to add the two remaining methods - remember these are just dummy methods in this demo with a bit of logging


private void CreateUnderlyingSubscription()
{
    Console.WriteLine("{0}: Created underlying subscription", _symbol);
}
 
private void ReleaseUnderlyingSubscription()
{
    Console.WriteLine("{0}: Released underlying subscription", _symbol); 
} 




Completed Code 


The completed code should look like this.
Notice I've sneaked in a new method void NotifyPriceChange(int newPrice).
I've added this so I can simulate the underlying provider sending us a new price for this source for the unit tests...clearly you'd delegate that down to the market data service. 


using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
 
public class PriceSource
{
    private readonly ISubject<int> _subject = new Subject<int>();
    private readonly string _symbol;
 
    public PriceSource(string symbol)
    {
        _symbol = symbol;
        Multicast = CreateMulticast();
    }
 
    // Here for use with unit tests to simulate prices flowing in
    internal void NotifyPriceChange(int newPrice)
    {
        Console.WriteLine("{0}: price changed to {1}", _symbol, newPrice);
        _subject.OnNext(newPrice);
    }
 
    public IObservable<int> Multicast { getprivate set; }
 
    private IObservable<int> CreateMulticast()
    {
        const int NumberOfTicksToReplay = 1;
 
        // Use the static Create method so that we can define our own
        // implementation for the return value of the Subscribe function
        return Observable.Create<int>(observer =>
        {
            Console.WriteLine("{0}: Create underlying observable...", _symbol);
            CreateUnderlyingSubscription();
 
            // create a new observable based on our source provider
            var newSubscription = _subject.Subscribe(observer);
 
            // wrap in a dispose action, which will be called once the refcount gets to zero
            var unsubscriber = Disposable.Create(() =>
            {
                // Do release bit 
                Console.WriteLine("{0}: RefCount - all subscriptions disposed. calling Release...", _symbol);
                ReleaseUnderlyingSubscription();
            });
 
            return new CompositeDisposable(newSubscription, unsubscriber);
        })
        .Replay(NumberOfTicksToReplay)
        .RefCount();
    }
 
    private void CreateUnderlyingSubscription()
    {
        Console.WriteLine("{0}: Created underlying subscription", _symbol);
    }
 
    private void ReleaseUnderlyingSubscription()
    {
        Console.WriteLine("{0}: Released underlying subscription", _symbol);
    }
}
 
That's it.  How easy was that?  


Tests

How about some unit tests to see this in place?  I'm using the NUnit framework in conjunction with Resharper to easily run my unit tests in 

Install-Package NUnit

Purely for the sake of clarity, I've added a whole bunch of rather verbose Arrange-Act-Assert calls into one big test :(

using System;
using NUnit.Framework;
 
[TestFixture]
class PriceSourceTests
{
    [Test]
    public void MultipleSubscribes_AllActive_ReceiveSamePrice()
    {
        // ARRANGE
        var source = new PriceSource("MSFT");
 
        var subs1Price = 0;
        var subs1Ticks = 0;
        var subs1 = source.Multicast.Subscribe(price =>
        {
            Console.WriteLine("Observer 1 received {0}", price);
            subs1Ticks++;
            subs1Price = price;
        });
 
        var subs2Price = 0;
        var subs2Ticks = 0;
        var subs2 = source.Multicast.Subscribe(price =>
        {
            Console.WriteLine("Observer 2 received {0}", price);
            subs2Ticks++;
            subs2Price = price;
        });
 
        // ACT
        source.NotifyPriceChange(100);
 
        // ASSERT
        Assert.That(subs1Price, Is.EqualTo(100));
        Assert.That(subs1Ticks, Is.EqualTo(1));
        Assert.That(subs2Price, Is.EqualTo(100));
        Assert.That(subs2Ticks, Is.EqualTo(1));
 
        // ACT
        source.NotifyPriceChange(200);
 
        // ASSERT
        Assert.That(subs1Price, Is.EqualTo(200));
        Assert.That(subs1Ticks, Is.EqualTo(2));
        Assert.That(subs2Price, Is.EqualTo(200));
        Assert.That(subs2Ticks, Is.EqualTo(2));
 
        // ACT
        Console.WriteLine("Disposing subs1");
        subs1.Dispose();
        source.NotifyPriceChange(300);
 
        // ASSERT
        Assert.That(subs1Price, Is.EqualTo(200));
        Assert.That(subs1Ticks, Is.EqualTo(2));
        Assert.That(subs2Price, Is.EqualTo(300));
        Assert.That(subs2Ticks, Is.EqualTo(3));
 
        // ACT
        Console.WriteLine("Disposing subs2");
        subs2.Dispose();
 
        // ASSERT
        Assert.That(subs1Price, Is.EqualTo(200));
        Assert.That(subs1Ticks, Is.EqualTo(2));
        Assert.That(subs2Price, Is.EqualTo(300));
        Assert.That(subs2Ticks, Is.EqualTo(3));
 
        // ACT
        var subs3Price = 0;
        var subs3Ticks = 0;
        source.Multicast.Subscribe(price =>
        {
            Console.WriteLine("Observer 3 received {0}", price);
            subs3Ticks++;
            subs3Price = price;
        });
 
        // ASSERT
        Assert.That(subs3Price, Is.EqualTo(300));
        Assert.That(subs3Ticks, Is.EqualTo(1));
 
        // ACT
        source.NotifyPriceChange(400);
 
        // ASS
        Assert.That(subs3Price, Is.EqualTo(400));
        Assert.That(subs3Ticks, Is.EqualTo(2));
    }
}

Test Results

MSFT: Create underlying observable...
MSFT: Created underlying subscription
MSFT: price changed to 100
Observer 1 received 100
Observer 2 received 100
MSFT: price changed to 200
Observer 1 received 200
Observer 2 received 200
Disposing subs1
MSFT: price changed to 300
Observer 2 received 300
Disposing subs2
MSFT: RefCount - all subscriptions disposed. calling Release...
MSFT: Released underlying subscription
Observer 3 received 300
MSFT: Create underlying observable...
MSFT: Created underlying subscription
MSFT: price changed to 400
Observer 3 received 400


2 comments: