Tag Archives: c#

Using And, Then & When in Reactive Extensions

When you look over the list of observable method’s on MSDN I can see that I use maybe 60% of them quite frequently.  I find that I’ll use Do’s or SelectMany etc almost every day and more involved operators like Publish and Merge at least once a week.

Today I wanted to cover 3 operators that I find aren’t used frequently but can be used together to provide powerful synchronization techniques.  The operators are “And”, “Then” and “When”.

The “And” operator

If we look on MSDN for the “And” operator we can see that documentation tells us this “Matches when both observable sequences have an available value.”. Well ok, that sounds pretty simple.  I reckon I can put some code together to show how this works.   I just need two streams which both produce a value. Let’s have a try.

Hmmm, well firstly that reads well.  I can see quite easily from reading this code what its intent is, but what I expected as a result of the “And” operator was another IObservable which I could then subscribe to.  So it seems on its own the “And” operator doesn’t work as I expected.  When I look at the return type for “And” I can see it returns something called a “Pattern” and the only operator I can apply to this seems to be a “Then”.

The “Then” Operator

So it seems I have been led to the “Then” operator by some clever design decisions from the Rx designers.  They obviously want me the use the then operator as its really the only thing available to me.  Therefore I better go back to msdn and see what it tells me “Observable.Then<TSource, TResult> Method – Matches when the observable sequence has an available value and projects the value.”.  Well this operator seems very familiar, its almost exactly the same as the “Select” operator.  The select operator takes the value that is pushed to it and transforms the pipeline by returning a different return type.  I can do that!!! lets implement “Then” and we are done right?

Uh oh!! Dead end.  Well this isn’t good.  The one thing that gets me through with LINQ and RX is that operators are discoverable and now I’ve got nowhere to go.  Digging a little deeper I can see that actually what I thought I would get back from “Then” an IObservable<T> what I’m  actually getting is a Plan<T>.  What the heck is a Plan? Why does it have no methods on it so I can do something with it? I’m going to have to do some research.

The “When” Operator

Back to msdn looking for a Plan<T> lets see what it tells us. “Represents an execution plan for join patterns.”  Well that makes sense as that’s essentially what I have been trying to do.  But unfortunately its not giving me the answers for how I actually get values from my stream.  Therefore it’s time to stretch my google-fu and see what I can find.  Turn’s out all the information I needed was actually at the same msdn page I had been looking at for other observable operators.   When I search the page for “Plan” I found that only 1 operator seems to be able to use this construct and thats the “When” operator.

This time I’m told that the “When” operator takes in a Plan<T> and returns a IObservable<T> “Joins together the results from several patterns.”. Yes! This looks like the final piece of the puzzle.  Lets finish the code.

As you can see from the code above, I now use the When operator on my plan so that we can actually gain access to the projected values. Then I write the values to the console window.  The slower stream is in essence the regulating stream in this scenario and therefore the stream only pumps once every 10 seconds.

Summary

As you can see the three operators “And”, “Then” and “When” working together provide some powerful synchronization primitives that we can use in our code to solve a number of different scenarios.  Discovery of these operators is a little trickier than the basic operators and I feel that a few extensions could have been added to make life easier (Why no “When” extension on a Plan<T> for example?).

Advertisement