DISCLAIMER: The code in this post was the result of a big hackfest with the single goal of getting a particular use case working. I am telling you right now that I paid no attention to potential concurrency problems and that if you try to reuse any of this stuff you are just asking for trouble.
In the previous post, I explained how Eric Meijer’s recent Lang.NET presentation on the LiveLabs Reactive Framework piqued my interest. Wondering how the framework works, I decided to have a go at emulating some of the basic functionality it appears to provide. This code demonstrates the functionality I’ve implemented so far:
Func<MouseEventArgs, int> slowOperation = args =>
{
System.Threading.Thread.Sleep(3000);
return args.X;
};
IObservable<int> observable = from md in button1.GetMouseDowns()
from x in slowOperation.AsAsyncObservable(md.EventArgs)
select x;
Action<string> textboxUpdater = s => textBox1.AppendText(s);
observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));
Today I am going to work my way through the query comprehension you can see here and do my best to explain how I got this working. Perhaps a good place to start would be to convert the query comprehension into a method chain in the same way the C# compiler does for us under the covers. Resharper can do this automatically, which is convenient:
IObservable<int> observable = button1.GetMouseDowns().SelectMany(
md => slowOperation.AsAsyncObservable(md.EventArgs),
(md, x) => x
);
The first call here is to an extension method called GetMouseDowns:
public static IObservable<EventResult<Button, MouseEventArgs>> GetMouseDowns(this Button b)
{
var wrapper = new EventWrapper<Button, MouseEventArgs>();
b.MouseDown += wrapper.Handle;
return wrapper;
}
So far, this is relatively straightforward. The GetMouseDowns extension method creates an EventWrapper, and wires that EventWrapper up to the MouseDown event on the button. You can see that the wrapper is being returned as an IObservable of an EventResult. Here is the code for these:
public interface IObservable<T>
{
void Attach(Action<T> action);
}
public class EventResult<TSender, TArgs>
{
public EventResult(TSender sender, TArgs args)
{
Sender = sender;
EventArgs = args;
}
public TSender Sender { get; private set; }
public TArgs EventArgs { get; private set; }
}
public class EventWrapper<TSender, TArgs> : IObservable<EventResult<TSender,TArgs>>
{
private List<Action<EventResult<TSender, TArgs>>> _attached = new List<Action<EventResult<TSender, TArgs>>>();
public void Handle(object sender, TArgs e)
{
foreach (var action in _attached)
action(new EventResult<TSender,TArgs>((TSender)sender, e ));
}
public void Attach(Action<EventResult<TSender, TArgs>> action)
{
_attached.Add(action);
}
}
Now there is some really naive code here. What would happen if someone called Attach while we were in the middle of handling the event? It would crash, because the list of actions to be called would be modified while it was being iterated over. In general, the code for the EventWrapper feels bad. I am sure it can be done much better, but it works for now.
So the result of the GetMouseDowns method is an EventWrapper that will call the attached methods when the mouse down event occurs, and this is exposed as an IObservable. SelectMany is then called on that IObservable. This is another extension method:
public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
return new SelectManyObservable<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
So how on earth did I arrive at this method? Its quite simple actually – I stole its signature from the SelectMany extension method implemented on IEnumerable. The interesting thing about LINQ query comprehensions is that they actually don’t have a dependence on IEnumerable or IQueryable. Instead they depend on the existence of particular methods with the appropriate signature such as Select, SelectMany, Where, GroupBy, etc. By writing these extension methods for IObservable, I can then write query comprehensions against IObservables.
Once its established that my SelectMany borrowed its signature from the appropriate extension method on IEnumerable, there isn’t much more to know about it. You can see that all it simply does is return a SelectManyObservable that wraps the passed arguments. What is the SelectManyObservable exactly? Well, here is the code:
public class SelectManyObservable<TSource, TCollection, TResult> : IObservable<TResult>
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
public SelectManyObservable(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
_source = source;
_collectionSelector = collectionSelector;
_resultSelector = resultSelector;
}
public void Attach(Action<TResult> action)
{
_source.Attach(
s => _collectionSelector(s).Attach(
c => action(_resultSelector(s, c))
)
);
}
}
So this class simply wraps an underlying IObservable, and when methods are attached to the SelecyManyObservable, it actually gets attached to the underlying IObservable. The funny thing about this code is that it practically wrote itself. It was basically a matter of just figuring out how the underlying observable, collectionSelector, resultSelector and attached action related to each other and then calling them accordingly. It felt like solving a 4 piece jigsaw puzzle with only 1 solution.
Before the explanation of the query comprehension is complete, there is one more extension method that requires explanation:
public static IObservable<TResult> AsAsyncObservable<TInput, TResult>(this Func<TInput,TResult> funcToObserve,TInput input)
{
return new AsyncWrapper<TInput, TResult>(funcToObserve, input);
}
This pattern should look familiar. Its like the SelectMany in that it simply returns a wrapper object that takes all the method arguments as parameters. The signature however, wasn’t borrowed from the framework. The point of this method is to allow the user to convert an anonymous delegate or lambda expression into an IObservable. Now I hope you packed the goggles I mentioned in my previous post, because AsyncWrapper is really nasty:
public class AsyncWrapper<TInput, TResult> : IObservable<TResult>
{
private List<Action<TResult>> _attached = new List<Action<TResult>>();
public AsyncWrapper(Func<TInput, TResult> funcToObserve, TInput input)
{
funcToObserve.BeginInvoke(input, CompletedCallback, null);
}
public void Attach(Action<TResult> action)
{
_attached.Add(action);
}
private void CompletedCallback(IAsyncResult asyncResult)
{
TResult calculatedValue = ((Func<TInput, TResult>) ((AsyncResult) asyncResult).AsyncDelegate).EndInvoke(
asyncResult);
foreach (var action in _attached)
action(calculatedValue);
}
}
It might not seem so bad, until you look at what the constructor is doing. When the AsyncWrapper is created, it fires off the function to observe. Now when that function completes, the callback will be invoked, and as a result all the attached actions will also be invoked. The reason why this is all so heinous is that the actions will have to be attached AFTER the function to observe is invoked, but before it completes. As a result, I am fairly sure that if we changed the slow asynchronous operation to be much faster, then this code would stop working. Surely a better solution can be found, and I do hope to attend to this later.
So that covers all the code that makes my example work. The nice thing about this approach is the composability. It is convenient to be able to work with enumerables using select, where, etc and the same is true for observables. I am really looking forward to getting my hands on the Reactive Framework code. In the mean time, I’ll continue to experiment. In the next post I’ll extend this example, starting with adding support for ‘where’ operations.