Principles of reactive programming on a base of simple RSS aggregator using ReactiveX library for Python

Thanks to Kseniya, a python developer from Noveo, for a detailed description of benefits of reactive programming.

During recent years reactive programming in general as well as ReactiveX library is on increasing popularity among developers. Some already use all the advantages of this approach, the others have only heard “something about that”. I will try to help you realize, how some concepts of reactive programming can change the view on the things that seem so familiar.

There are two different principles of large systems organization: according to objects and their states stored in a system, and according to data flows passing through it. The model of reactive programming supposes lightness in data flows expression and changes distribution thanks to these flows. For example, in imperative programming an assignment operation means finiteness of the result, while in reactive programming the value will be recalculated when new data is recieved. A values flow in a system will undergo a number of transformations which are necessary to solve a certain task. Operation of flows allows the system to be expandable and asynchronous and a correct reaction to errors ensures its fail safety.

ReactiveX is a library allowing to create asynchronous and event-oriented programs which use observable sequences. It expands Observer template for data sequences support, adds operators for their declarative binding, allowing to get rid of the necessity to care about synchronizing and security of streams, divided data structures and unblocking I/O.

One of the main differences between ReactiveX library and functional reactive programming is that it operates not constantly changing, but discrete values, that are emitted for a long time.

Here we should explain what Observer, Observable, and Subject are. Observable model is a data source, which allows to process asynchronous events flows in a way similar to the one you use for data collections, such as arrays. All this is used instead of callbacks, which means that your code becomes more readable and less fallible.

In ReactiveX an Observer subscribes to an Observable and then reacts to all the produced elements For each Observer, subscribed for an Observable, Observer.on_next() method is called for each element in a data flow, after that it can be called both as Observer.on_complete(), and Observer.on_error(). It’s common to use Observable in such a way, that it doesn’t give data until some Observer subscribes to it. This is so called ”lazy calculations”, where values are calculated only when they become necessary.

There are some tasks for which you should combine Observer and Observable in order to receive messages about events and inform subscribers. For this there is the Subject, which has several more implementations beyond the standard one.

  • ReplaySubject can cache all the incoming data, sending it from the beginning when a new subscriber appears, and then working in a usual mode.
  • BehaviorSubject stores the last value, and in the same way as ReplaySubject  provides it to a new subscriber. Being created, it gets a default value that will be received by each subscriber if there was no last value.
  • AsyncSubject also stores the last value, but doesn’t send any data until the operation is finished.

Observable and Observer are only the beginning of ReactiveX. They don’t show all the features which are represented by operators allowing to transform, combine and manipulate the sequences of elements provided by Observable.

The description of operators in ReactiveX documentation includes the usage of  Marble Diagram. This is an example how these diagrams  represent Observable and their transformations.

Looking at the diagram below it’s easy to understand that the map operator transforms elements, provided by Observable, applying functions to each of them.

A good illustration of ReactiveX capabilities is an RSS aggregator app. Here we need an asynchronous data loading, filtering and transformation of data, and keeping the information up-to-date with the help of periodical updates.

In this article we will demonstrate the main principles of ReactiveX on examples, written using rx library for Python language. This is what an abstract Observer reaction looks like:

class Observer(metaclass=ABCMeta):
    @abstractmethod
    def on_next(self, value):
        return NotImplemented

    @abstractmethod
    def on_error(self, error):
        return NotImplemented

    @abstractmethod
    def on_completed(self):
        return NotImplemented

Our application will exchange messages with a browser using web sockets in real time. Tornado gives the possibility to implement is easily. The work of program starts when a server is launched. When a browser addresses the server, a web socket is opened.

import json
import os

import feedparser
from rx import config, Observable
from rx.subjects import Subject
from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler

asyncio = config['asyncio']

class WSHandler(WebSocketHandler):
    urls = ['https://lenta.ru/rss/top7',
            'http://wsrss.bbc.co.uk/russian/index.xml']
    def open(self):
        print("WebSocket opened")
        # here will be the main logic of our app

    def on_message(self, message):
        obj = json_decode(message)
        # Sends a message that will be received by user_input
        self.subject.on_next(obj['term'])

    def on_close(self):
        # Unsubscribe from the Observable;dispose will be invoked for all the observables in chain
        self.combine_latest_sbs.dispose()
        print("WebSocket closed")

class MainHandler(RequestHandler):
    def get(self):
        self.render("index.html")

def main():
    AsyncIOMainLoop().install()

    port = os.environ.get("PORT", 8080)
    app = Application([
        url(r"/", MainHandler),
        (r'/ws', WSHandler),
        (r'/static/(.*)', StaticFileHandler, {'path': "."})
    ])
    print("Starting server at port: %s" % port)
    app.listen(port)
    asyncio.get_event_loop().run_forever()

In order to process a user’s request, the app creates a Subject, after subscription it sends a default value (in our case it’s an empty string), then once a second sends the input that was entered by a user and corresponds to the following criteria: length is 0 or more than 2, the value is changed.

        # Subject is Observable and Observer at the same time
        self.subject = Subject()
        user_input = self.subject.throttle_last(
            1000  # Receive the last value at a given time interval
        ).start_with(
            ''  # Sends a default value just after subscription 
        ).filter(
            lambda text: len(text) == 0 or len(text) > 2
        ).distinct_until_changed()  # Only if the value has changed

For periodical updates there is an Observable, which sends a value each 60 seconds.

        interval_obs = Observable.interval(
            60000  # Sends a value once in 60 sec (for periodical updates)
        ).start_with(0)

These two flows are combined by the combine_latest operator, the Observable is inserted into the chain in order to get a news list. Then a subscription for this Observable is created, and this is the moment when the whole chain starts working.

        # combine_latest collects 2 flows from user requests and time intervals,
        # works for each message from each flow
        self.combine_latest_sbs = user_input.combine_latest(
            interval_obs, lambda input_val, i: input_val
        ).do_action(  # Works for each sent element 
            # Sends a message to clear the list to frontend 
            lambda x: send_response('clear')
        ).flat_map(
            # Observable is built into the chain to receive the list
            self.get_data
        ).subscribe(send_response, on_error)
        # The subscription is created, all the chain starts working only at this moment

Let’s stop and view in more details what “Observable for receiving a news list” is. From a list of urls for news getting we create a data flow, where elements are passed to a function, where the Tornado HTTP client AsyncHTTPClient performs asynchronous upload for each url from the list. They also form a data flow that is filtered according to the user’s request. We take 5 pieces of news from each flow and convert them to a desired format for frontend.

    def get_rss(self, rss_url):
        http_client = AsyncHTTPClient()
        return http_client.fetch(rss_url, method='GET')

    def get_data(self, query):
        # Observable is created from the url list
        return Observable.from_list(
            self.urls
        ).flat_map(
            # For each url there is an Observable, that downloads data
            lambda url: Observable.from_future(self.get_rss(url))
        ).flat_map(
            # The received data are parsed and form an Observable
            lambda x: Observable.from_list(
                feedparser.parse(x.body)['entries']
            ).filter(
                # Filters by the request in title or body of a piece of news
                lambda val, i: query in val.title or query in val.summary
            ).take(5)  # We take only 5 pieces of news from each url
        ).map(lambda x: {'title': x.title, 'link': x.link,
                         'published': x.published, 'summary': x.summary})
        # Converts data for frontend

When the output data flow is formed, its subscriber starts getting data element by element. The send_response function sends responds to the frontend, and it adds a piece of news to the list.

        def send_response(x):
            self.write_message(json.dumps(x))

        def on_error(ex):
            print(ex)

In the feeder.js file

        ws.onmessage = function(msg) {
            var value = JSON.parse(msg.data);
            if (value === "clear") {$results.empty(); return;}

            // Append the results
            $('<li><a tabindex="-1" href="' + value.link +
                '">' + value.title +'</a> <p>' + value.published +
                '</p><p>' + value.summary + '</p></li>'
            ).appendTo($results);
            $results.show();
        }

Therefore, here we have a push technology where the data are sent from a server to frontend, which only sends a  request for news by the user inputs.

At the end let’s imagine, what the implementation would have looked like if we used a mainstream approach with callbacks instead of Observable, without the possibility to easily combine data flows or instantly send data to frontend, and with the necessity to follow changes in  the search bar. The technology isn’t yet popular among Python developers, nevertheless, I see several possibilities to use it on our current projects. For example, the one that we have already described in our blog.

An example of usage of ReactiveX for Python you can find in our github repository with a demo RSS aggregator project: https://github.com/noveogroup/rxpy-example