In part one of this series, I introduced the Live Labs Reactive Framework and provided a simple example of what can be achieved when using LINQ for reactive programming. In part two I did my best to explain how my example worked. In today’s post I want to extend the example slightly. Just to reiterate, this series is not about using the Live Labs Reactive Framework as it has not yet been released. I’m exploring the idea of a reactive framework using my own implementation.
Here is my example from the previous posts:
Func<MouseEventArgs, int> slowOperation = args => { System.Threading.Thread.Sleep(3000); return args.X; }; IObservable<int> observable = from md in button1.GetMouseDowns() from x in slowOperation.AsAsyncObservable(md.EventArgs) select x;
The idea behind this code is that the IObservable<int> represents a stream of integers that are the result of capturing the mouse down event on a button, doing some “slow processing” (really just a 3 second sleep) asynchronously, and returning the x coordinate from the mouse down event. You can then attach something to that observable like so:
Action<string> textboxUpdater = s => textBox1.AppendText(s); observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));
The attached code updates the textbox as the values bubble up through the IObservable. Unfortunately its a little more complicated than I would like because the update has to be marshalled onto the UI thread – hence the BeginInvoke call. Today I want to modify this example so that the textbox is only updated when the right mouse button was clicked. The MouseEventArgs can tell me which button was clicked, so really this is just a matter of implementing support for where:
IObservable<int> observable = from md in button1.GetMouseDowns() where md.EventArgs.Button == MouseButtons.Right from x in slowOperation.AsAsyncObservable(md.EventArgs) select x;
Well this is a very natural way to express what I want – and the implementation is not too hard. First I create the appropriate extension method:
public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate) { return new WhereObservable<T>(source, predicate); }
This extension method is just a wrapper call for a class. In this case, its WhereObservable<T> which takes a source IObservable, and a predicate to apply:
public class WhereObservable<T> : IObservable<T> { private readonly IObservable<T> _source; private readonly Func<T, bool> _predicate; public WhereObservable(IObservable<T> source, Func<T, bool> predicate) { _source = source; _predicate = predicate; } public void Attach(Action<T> action) { _source.Attach(t => { if (_predicate(t)) action(t); }); } }
All I’m doing here is wrapping an inner observable so that I can check to see if the predicate applies before I let the observable event bubble up. That’s all there is to it. My example will now only update the textbox if the right mouse button was clicked.
My work on this project is available on github. To get the code as it was at the time this post was written, use this tag.
Hi Paul,
ReplyDeleteI was intrigued about the Reactive Framework as well. I found out about it after stumbling on Flapjax, which is a javascript library for functional reactive programming. You may find it interesting as well. They have a pretty good tutorial which introduces the concepts. In Flapjax, a Behavior is about the same as IObservable, as far as I can tell.
I'm still trying to learn about reactive programming concepts. The best I can understand it at this point is that it allows to describe a pipeline of computation (like a chain of functions) just like IEnumerable/LINQ. But IEnumerable has a pull model, where the computation occurs on the consumer thread. Whereas IObservable has a push model, where the computation occurs on the producer thread.
In terms of differences, IEnumerable seems to make it easier to program pipelines where multiple sources contribute to one destination. If you have multiple destinations attached to one source and you are not careful, you could evaluate that source multiple times.
On the other hand, IObservable makes it easier to program pipelines where one source produces data in multiple destinations/consumers.
I can't wait for more stuff to be published on the Reactive Framework. That presentation from Eric Meijer is such a tease ;-)
Cheers,
Julien
Hi Julien, thanks for the comment. I have some posts on Flapjax starred in my google reader so I'll be sure to take a look!
ReplyDelete