A beginners reactive tutorial using Rx.Net

By | February 5, 2019

This beginners reactive tutorial will cover the basics of reactive way of programming. I will explain important terms and demonstrate the concepts via simple snippets of code. At the end of this basic reactive tutorial you will have a better understanding this subject. I will be using Rx.Net in this tutorial. This tutorial will be the first in the series of posts I will be writing on this topic.

WHY

Streaming data. It can be a constant feed of stock prices or readings from your smart IoT devices. It is data streaming continuously like running water. Streams having no end. You might just want to scan the data continuously looking for a particular value. Or you might want to filter this data , for example to remove stocks below the given price or remove noise from IoT device stream. Or you might just want to do some operations on the data like finding the average value etc. There are just so many things we can do with streaming data.

HOW

Best way to understand reactive is to imagine an assembly line. Say an assembly line making cars. The chassis keeps on moving and different robots keep working on it. The chassis never stops. One robot may fit the engine, other one might weld the joints and so on. Similarly some robots can also remove some stuff, like removing the metal shavings on the floor of chassis. In short every robot does some operation (constructive or destructive) on the chassis as it rolls past.

In reactive world the the conveyor belt is time and the series of unfinished chassis on it is data. The robots are the operators provided by Reactive libraries. They can do many things. With that massively over simplified overview let us go a level deeper now. It will be useful at this stage to brush up on your event handling fundamentals.

From here on most of the terms I will be using are from Rx.Net. Not sure what terminology they are using in other implementations like RxJava or RxJs.
Two terms you will hear about a lot are Observables and Observers.

beginners reactive tutorial

Observable: These represent the stream of values which can be observed.
Observer: These represent the client code which actually observes the Observables.

For a beginners reactive tutorial, the stuff below is a bit technically involved. However it is totally worth the read.

So how do Observable and Observer interact?
Ans: Via interfaces.

Observable implements an interface called IObservable.
Observer implements an interface called IObserver.

This is how IObserver interface looks like.

public interface IObserver
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}

An Observer implementing the IObserver interface has to provide the implementation of these three methods.

Observer needs a way to let the Observable know that it is interested in data stream it represents. It does so via a method called Subscribe. This method is declared in IObservable. It returns a IDisposable object which observer can keep. It allows the observer to unsubscribe later on and for all intent stop observing the Observable.

public interface IObservable
{
    IDisposable Subscribe(IObserver observer);
}

Observable has a list of observers subscribed to it. When it has to push information to the observers it does so via three methods. These three methods are in IObserver interface. The names are self explanatory. To push data to the registered observers, OnNext is called. If there is an error notification is sent using OnError. No more data is sent after it. If the Observable is done emitting all the data then it uses OnCompleted to undergo a graceful shutdown. It also unsubscribes all the registered observers as a part of shutdown.

An Observable implementing IObservable has to provide an implementation for Subscribe method.

A simple naive implementation might look something like this:

using System;
using System.Reactive.Disposables;

namespace LeObservables
{
    public class DemoObservable : IObservable
    {
        public IDisposable Subscribe(IObserver observer)
        {
            observer.OnNext(1);
            observer.OnNext(2);
            observer.OnNext(3);
            observer.OnNext(4);
            observer.OnNext(5);
            observer.OnCompleted();
            return Disposable.Empty;
        }
    }
}

As you can see this code sends value 1,2,3,4,5 and 6 to the subscribed observers before calling it quits.

Note that there are edge case issues in the code above but since it is a beginners reactive tutorial I will ignore them. What is important is that you get the main idea.

Let us create an observer to see this in action.

using System;

namespace Demo
{
    public class DemoObserver : IObserver
    {
        public void OnCompleted()
        {
            Console.WriteLine("Observable is done sending all the data.");
        }

        public void OnError(Exception error)
        {
            Console.WriteLine($"Observable failed with error: {error.Message}");
        }

        public void OnNext(int value)
        {
            Console.WriteLine($"Observable emitted : {value}");
        }
    }
}

Here is the client code.

using LeObservables;

namespace Demo
{
    public class Program
    {
        private static void Main(string[] args)
        {
            var observableInstance = new DemoObservable();
            var observerInstance = new DemoObserver();
            var subscriptionHandle = observableInstance.Subscribe(observerInstance);
        }
    }
}

beginners reactive tutorial

You might be wondering why the fuss if same can be done using events. With my limited experience with Reactive, all I will say is that Reactive picks up from where events stop. The use cases which you can solve with a swish of sword of Reactive will be painfully complex if not impossible to implement using .NET events. Since this is a beginners reactive tutorial, I will not go into various usecases. But rest assured, if you need to do something, chances are high that there is an operator for that in Rx.Net.

Now as a user you have to create your own observers. But with observables the story is different. It is not encouraged to create observables yourself. Rather you are expected to use the helper methods which Rx.Net provides. The reason being that there are many tricky edge cases which can trip you. Especially the IDisposable implementing handle returned by the Subscribe is a bit tricky to code.

So in a nutshell. Don’t do it.

Then how do I create an Observable?

Using the Rx.Net provided operator Observable.Create().

You push the implementation of subscribe method into the Create method. Here is how our innocent example creating an Observable changes.

public static IObservable ObserveNumbers(int amount)
{
    return Observable.Create(observer =>
    {
        observer.OnNext(1);
        observer.OnNext(2);
        observer.OnNext(3);
        observer.OnNext(4);
        observer.OnNext(5);
        observer.OnNext(6);
        observer.OnCompleted();
        return Disposable.Empty();
    });
}

See how the Observer code is pushed into the Create operator using a lambda. The output remains same. But you don’t need to have a separate class file to define an IObserver implementing observer.

Infact there are several more operators which allow you to create observables. For example, FromEventPattern helps you to create an observable out of .NET events. ToObservable allows you to create an observable out of an enumerable.

Wrapping up

We have covered the very basics of the Reactive programming. Since this is beginners reactive tutorial I will keep it simple and stop here. The coming posts will be dealing with more complicated stuff.

2 thoughts on “A beginners reactive tutorial using Rx.Net

    1. Pankaj K Post author

      It is on the TODO list. A bit lower in the order of priorites though.

      Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.