Sunday, 19 July 2009

Reacting to the Reactive Framework: Part 4

This post is part 4 in a series.

About a week ago another video on the Reactive Framework went up, this time on Channel 9. It was an enjoyable video, though I found it tricky to follow in parts because my understanding of monad theory is virtually non-existent and the discussion does get abstract/theoretical in places. It did however give me a much better understanding of the basic Reactive Framework interfaces (IObserver<T> and IObservable<T>), which meant I had some work to do get my implementation up to scratch!

Side note: I think I started this series badly by getting too complicated too fast, so I’m going to use this as an opportunity to start again.

Here are the relevant interface definitions (I haven’t specified any covariance or contravariance for these types because I am still using C# 3.):

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<T>
{        
    void OnNext(T item);
    void OnDone();
    void OnError(Exception e);
}

Hopefully the expected interaction between these two interfaces is relatively self-explanatory. There are a few differences between this pair of interfaces and what I had in the previous posts:

  • Rather than passing a callback function to the Subscribe method (previously called ‘Attach’) an IObserver is passed. As you would expect, the OnNext(item) method is called on the subscribed observer when the IObservable has a new result.
  • The Subscribe method returns an IDisposable. When it is disposed, the IObserver is unsubscribed.
  • When the IObservable has finished (no more results), each of the IObservers are notified using OnDone().
  • If the IObservable encounters an error, the IObservers are notified using OnError(e).

The task today is to capture the stream of MouseDown events on a button as an IObservable<MouseEventArgs>, and then convert that into an IObservable<string> representing messages to be written to a text box. When the MouseDown event occurs, the text box should be updated accordingly. We can start by obtaining a IObservable<MouseEventArgs> like so:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();

GetMouseDowns() is an extension method – its purpose is to wrap the MouseDown event as an IObservable. Its implementation is very simple:

public static IObservable<MouseEventArgs> GetMouseDowns(this Button button)
{
    var wrapper = new EventWrapper<MouseEventArgs>();
    button.MouseDown += wrapper.Handler;
    return wrapper;
}

The EventWrapper class is wired up to the MouseDown event. It has a collection of observers which it notifies when the MouseDown event occurs:

public class EventWrapper<T> : IObservable<T> where T : EventArgs
{
    private readonly ObserverCollection<T> _observers = new ObserverCollection<T>();

    public void Handler(object sender, T eventArgs)
    {
        _observers.NotifyNext(eventArgs);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _observers.Subscribe(observer);
    }
}

Lets skip the implementation of the ObserverCollection for now. I’d prefer to move on to converting this IObservable<MouseEventArgs> to an IObservable<string>:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();
IObservable<string> messages = from md in mouseEvents
                               select "Mouse down at: " + md.X + "\n";

This works as long as the “Select” LINQ operator has been implemented for IObservable, because the C# compiler converts the above into:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();
IObservable<string> messages = mouseEvents.Select( (MouseEventArgs md) => "Mouse down at: " + md.X + "\n");

The following extension method has the appropriate signature:

public static IObservable<TResult> Select<T, TResult>(this IObservable<T> observable, Func<T, TResult> func)
{
    return new SelectObservable<T, TResult>(observable, func);            
}

By calling this method, really what we’re saying is that we want a new IObservable that calls its subscribers when the old IObservable did, but first uses the passed function “func” to convert from the old result type to the new result type. In this case, that function is the lambda expression:

(MouseEventArgs md) => "Mouse down at: " + md.X + "\n"

Lets skip the implementation of the SelectObservable, and instead look at consuming the messages. I’ll subscribe an observer that will update the textbox:

messages.Subscribe(new TextBoxUpdater(textBox1));

Here is the implementation for the TextBoxUpdater:

public class TextBoxUpdater : IObserver<string>
{
    private readonly TextBox _textBox;

    public TextBoxUpdater(TextBox textBox)
    {
        _textBox = textBox;
    }

    private void AppendText(string text)
    {
        Action textboxUpdater = () => _textBox.AppendText(text);
        _textBox.BeginInvoke(textboxUpdater);
    }

    public void OnNext(string s)
    {
        AppendText(s);
    }

    public void OnDone()
    {
        AppendText("Done\n");
    }

    public void OnError(Exception e)
    {
        AppendText("Error: " + e.Message);
    }
}
As you can see, it implements the OnNext, OnDone and OnError methods by writing some appropriate text, though in this simple example, I’m not really using OnDone or OnError. If I run this program now and click the button a few times, I get this:

image

That wraps it up for today. The code as of this post is available here.

2 comments:

  1. Could you please provide the implementation that makes OnDone() or OnError() work? Thanks.

    ReplyDelete
  2. I never got that far with this exercise, so I suggest you download the reactive extensions and use reflector if you are curious about the implementation.

    ReplyDelete