By 'expensive' we could mean resource-wise expensive, ie amount of memory/ cpu time or financially expensive...perhaps there's a charge levied per subscription.
As an example, let's say you have a trading application with the following requirements:
- User can open any number of trade entry screens.
- The trade entry screen allows the user to enter a stock symbol, eg MSFT
- Once a symbol has been entered the application should subscribe to a market data source, eg Reuters, for that symbol and display the current price
- If multiple screens request the same symbol then only one underlying subscription should be utilised - don't go creating yet another subscription for the same symbol
- When a screen is closed or another symbol is selected, then we should notified when there no longer any active subscribers, thus allowing us to release the underlying source subscription
- Subsequent requests to an already subscribed symbol should immediately send back the latest price received rather than wait until a new price arrives.
There are a few ways to break this down with the Reactive library. For this demo I'll use v2.2.5 from NuGet. Hopefully you're familiar with the NuGet package manager.
After running this in Package Manager Console:
Install-Package Rx-PlatformServices
My packages.config looks like this:
<packages> <package id="Rx-Core" version="2.2.5" targetFramework="net451" /> <package id="Rx-Interfaces" version="2.2.5" targetFramework="net451" /> <package id="Rx-PlatformServices" version="2.2.5" targetFramework="net451" /> </packages>
Steps
I'm breaking this problem down into a number of steps...if you're not interested in a step-by-step guide then jump to the bottom of this post for the full solution,.I've created a class called PriceSource which takes single string constructor arg:
using System; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects;public class PriceSource { private readonly string _symbol; private readonly ISubject<int> _subject = new Subject<int>(); public PriceSource(string symbol) { _symbol = symbol; }public IObservable<int> Multicast { get; private set; }
}The class level variable _subject will be used shortly to tie together a new observer to our source provider. The subject acts as observable sequence and an observer.
The Multicast property is be the mechanism by which we expose access to an IObservable. For the sake of this demo we're providing access to an int which represents a ticking price.
To the body of the constructor, add this line: Multicast = CreateMulticast();
Now add the body of CreateMulticast:public PriceSource(string symbol) { _symbol = symbol; Multicast = CreateMulticast(); }
private IObservable<int> CreateMulticast() { const int NumberOfTicksToReplay = 1; // Use the static Create method so that we can define our own // implementation for the return value of the Subscribe function return Observable.Create<int>(observer => { Console.WriteLine("{0}: Create underlying observable...", _symbol); CreateUnderlyingSubscription(); // create a new observable based on our source provider var newSubscription = _subject.Subscribe(observer); // wrap in a dispose action, which will be called once the refcount gets to zero var unsubscriber = Disposable.Create(() => { // Do release bit Console.WriteLine("{0}: RefCount - all subscriptions disposed. calling Release...", _symbol); ReleaseUnderlyingSubscription(); }); return new CompositeDisposable(newSubscription, unsubscriber); }) .Replay(NumberOfTicksToReplay) .RefCount(); }
There's quite a bit going on. Essentially we're using the Observable.Create static method to allow us return an IDisposable from a specific Subscribe implementation.
We'll add CreateUnderlyingSubscription() soon. This gets called each time a new source provider needs to be created. You might delegate that to a provider factory which knows how to subscribe to a service.
Here we're telling the source provider subject (_subject) that the new subscription should receive notifications:
var newSubscription = _subject.Subscribe(observer);
Note that _subject is a plain Subject<int>. We don't need to use a ReplaySubject or BehaviorSubject in our own implementation (see below)
Next we make use of Disposable.Create (from System.Reactive.Disposables).
Creates a disposable object that invokes the specified action when disposed
Disposable.Create(Action dispose)
This static method returns an object of type IDisposable and performs the action passed when the Dispose method is called. As expected, no matter how many times the Dispose method is called the dispose Action will called just once - the very first time (see Idempotence)
By combining this with the .RefCount extension we will be told when it's ready to release the underlying provider's subscription, in our case we'll call ReleaseUnderlyingSubscription()
The body needs to return an IDisposable, so we make use of the CompositeDisposable. Any call to Dispose on the CompositeDisposable will result in Dispose() being called on both newSubscription and unsubscriber which RefCount needs in order to track IObservable usage.
The last two statements, .Replay() and .RefCount() are key to our requirements.
Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notificationsObservable.Replay
It's an extension method found in System.Reactive.Linq namespace. It returns an IConnectableObservable<T> which extends IObservable<T> to include a Connect() method. I won't go into much more detail here, Connect() is used for sharing of data.
The particular overload that we are using is passed an integer - that's the size of a replay buffer that we want replied to any new observer. Here we're passing 1 as we only want the last price to be replayed. Under the hood Replay makes use of a ReplaySubject - the reason why we did not need to use that as our type for _subject.
Moving onto the RefCount. It is an IConnectableObservable<T> extension method - returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
This does all the hard work of tracking calls to Dispose() on the objects that we created in CreateMulticast - it's as simple as that.
All that remains to be done is to add the two remaining methods - remember these are just dummy methods in this demo with a bit of logging
private void CreateUnderlyingSubscription() { Console.WriteLine("{0}: Created underlying subscription", _symbol); } private void ReleaseUnderlyingSubscription() { Console.WriteLine("{0}: Released underlying subscription", _symbol);}
Completed Code
The completed code should look like this.
Notice I've sneaked in a new method void NotifyPriceChange(int newPrice).
I've added this so I can simulate the underlying provider sending us a new price for this source for the unit tests...clearly you'd delegate that down to the market data service.
using System; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; public class PriceSource { private readonly ISubject<int> _subject = new Subject<int>(); private readonly string _symbol; public PriceSource(string symbol) { _symbol = symbol; Multicast = CreateMulticast(); }
// Here for use with unit tests to simulate prices flowing in internal void NotifyPriceChange(int newPrice) { Console.WriteLine("{0}: price changed to {1}", _symbol, newPrice); _subject.OnNext(newPrice); } public IObservable<int> Multicast { get; private set; } private IObservable<int> CreateMulticast() { const int NumberOfTicksToReplay = 1; // Use the static Create method so that we can define our own // implementation for the return value of the Subscribe function return Observable.Create<int>(observer => { Console.WriteLine("{0}: Create underlying observable...", _symbol); CreateUnderlyingSubscription(); // create a new observable based on our source provider var newSubscription = _subject.Subscribe(observer); // wrap in a dispose action, which will be called once the refcount gets to zero var unsubscriber = Disposable.Create(() => { // Do release bit Console.WriteLine("{0}: RefCount - all subscriptions disposed. calling Release...", _symbol); ReleaseUnderlyingSubscription(); }); return new CompositeDisposable(newSubscription, unsubscriber); }) .Replay(NumberOfTicksToReplay) .RefCount(); } private void CreateUnderlyingSubscription() { Console.WriteLine("{0}: Created underlying subscription", _symbol); } private void ReleaseUnderlyingSubscription() { Console.WriteLine("{0}: Released underlying subscription", _symbol); } }
That's it. How easy was that?
Tests
How about some unit tests to see this in place? I'm using the NUnit framework in conjunction with Resharper to easily run my unit tests in
Install-Package NUnit
Purely for the sake of clarity, I've added a whole bunch of rather verbose Arrange-Act-Assert calls into one big test :(
using System;
using NUnit.Framework;
[TestFixture]
class PriceSourceTests
{
[Test]
public void MultipleSubscribes_AllActive_ReceiveSamePrice()
{
// ARRANGE
var source = new PriceSource("MSFT");
var subs1Price = 0;
var subs1Ticks = 0;
var subs1 = source.Multicast.Subscribe(price =>
{
Console.WriteLine("Observer 1 received {0}", price);
subs1Ticks++;
subs1Price = price;
});
var subs2Price = 0;
var subs2Ticks = 0;
var subs2 = source.Multicast.Subscribe(price =>
{
Console.WriteLine("Observer 2 received {0}", price);
subs2Ticks++;
subs2Price = price;
});
// ACT
source.NotifyPriceChange(100);
// ASSERT
Assert.That(subs1Price, Is.EqualTo(100));
Assert.That(subs1Ticks, Is.EqualTo(1));
Assert.That(subs2Price, Is.EqualTo(100));
Assert.That(subs2Ticks, Is.EqualTo(1));
// ACT
source.NotifyPriceChange(200);
// ASSERT
Assert.That(subs1Price, Is.EqualTo(200));
Assert.That(subs1Ticks, Is.EqualTo(2));
Assert.That(subs2Price, Is.EqualTo(200));
Assert.That(subs2Ticks, Is.EqualTo(2));
// ACT
Console.WriteLine("Disposing subs1");
subs1.Dispose();
source.NotifyPriceChange(300);
// ASSERT
Assert.That(subs1Price, Is.EqualTo(200));
Assert.That(subs1Ticks, Is.EqualTo(2));
Assert.That(subs2Price, Is.EqualTo(300));
Assert.That(subs2Ticks, Is.EqualTo(3));
// ACT
Console.WriteLine("Disposing subs2");
subs2.Dispose();
// ASSERT
Assert.That(subs1Price, Is.EqualTo(200));
Assert.That(subs1Ticks, Is.EqualTo(2));
Assert.That(subs2Price, Is.EqualTo(300));
Assert.That(subs2Ticks, Is.EqualTo(3));
// ACT
var subs3Price = 0;
var subs3Ticks = 0;
source.Multicast.Subscribe(price =>
{
Console.WriteLine("Observer 3 received {0}", price);
subs3Ticks++;
subs3Price = price;
});
// ASSERT
Assert.That(subs3Price, Is.EqualTo(300));
Assert.That(subs3Ticks, Is.EqualTo(1));
// ACT
source.NotifyPriceChange(400);
// ASS
Assert.That(subs3Price, Is.EqualTo(400));
Assert.That(subs3Ticks, Is.EqualTo(2));
}
}
Test Results
MSFT: Create underlying observable...MSFT: Created underlying subscription
MSFT: price changed to 100
Observer 1 received 100
Observer 2 received 100
MSFT: price changed to 200
Observer 1 received 200
Observer 2 received 200
Disposing subs1
MSFT: price changed to 300
Observer 2 received 300
Disposing subs2
MSFT: RefCount - all subscriptions disposed. calling Release...
MSFT: Released underlying subscription
Observer 3 received 300
MSFT: Create underlying observable...
MSFT: Created underlying subscription
MSFT: price changed to 400
Observer 3 received 400
Nice to read your article! I am looking forward to sharing your adventures and experiences. Gold Coast skip bin hire
ReplyDelete
ReplyDeleteThanks for publishing this article, really superb. Most of the valid good points are there and its very useful for my study also.
Drupal Training in Chennai
Drupal 8 Training
Drupal Training in Adyar
Photoshop Classes in Chennai
Photoshop Training Institute in Chennai
photoshop classes
Manual Testing Training in Chennai
Mobile Testing Training in Chennai