Crawling the Web with Akka Streams

Konstantin Tsykulenko
Universal Language
Published in
5 min readMar 31, 2016

--

Introduction

In our previous article, Solving distributed recursive problems with Akka, we ‘ve discussed how we can use Akka’s actor model and clustering to solve resource-heavy recursive problems. This topic is going to be a bit more practical: we’re going to look into one of the Akka’s newer features, reactive streams, and how we can use them to solve some practical tasks, like web crawling.

Why streams?

I have been using actors for a while in various projects. Initially, they were a bit hard to get into, but once you get a grip of the model, it becomes easier for you to think in it and you start to ignore the limitations you initially had to deal with. However, there are quite a few limitations.
First of all, actor code is rather verbose. Consider the following example: we need to fetch a web page and parse the links from it. We can write code like this:

We send urls to the fetcher and the fetched page contents are passed to the parser. This is quite a lot of boilerplate for something this simple. There are a few other problems as well:

  • Our logic is spread out across several classes, which makes it hard to reason about the data flow
  • We don’t have type safety (We could use typed actors potentially, but these are not ideal either)
  • We do not have control over the data flow. If, for instance, we fetch way more pages than we can parse, the parser’s mailbox will start growing in an uncontrollable manner.

This is where the new Akka Streams specification comes in. It is a part of Reactive Streams initiative, which was specifically designed to handle problems like backpressure and data flow composition. Let’s quickly cover the core concepts:

Source

Source is where the input data for the stream comes from. Source has output, but does not have input. Some source examples can include an actor, a messaging queue listener, or simply a collection (although that’s mostly useful for testing purposes).

Sink

Sink is an endpoint which processes the results produced by the stream. It will usually store the produced data or send it somewhere else for further processing.

Flow

Flow is something that comes in between the Source and the Sink and can apply some data transformations.

Let’s take a look at how our code would look like if we used streams:

Here’s what’s happening in this example:

  1. We create a Source from a list of seed url we want to fetch. We could just as easily use an Actor as our source if using ActorPublisher trait. We’ll show that later.
  2. We map our initial urls to Futures containing the HttpResponse we get from fetching that url and then map the results to (originalUrl, response) tuples
  3. We then parse the urls from the http responses
  4. Only now do we actually get the results of our futures, by using the foreach Sink that would call onComplete on each stream result and print it when it’s available. We also could easily use an Actor as a Sink by implementing an ActorSubscriber trait

Now, this is for sure much more concise than our previous example. We also explicitly define the whole flow in one place, where it’s easy to decipher and reason about. All transformations are type safe and we know exactly what data to expect for each step of our flow. Note that all processing is still done in parallel by actors on top of an actor system, which we can tune as normal if needed.

The implementation above has one problem, though. The result of the stream above is a single future that contains a list of parsed urls as a result. What we would ideally want is a stream of parsed urls. Let’s modify our flow to achieve that.

  1. mapAsync is a function that accepts a value from the stream and returns a Future, passing the result of the future downstream. Since we already have a Future we just want to unwrap, we call mapAsync on identity.
  2. mapConcat is a function that accepts a value from the stream and returns an Iterable which is then split into separate stream values. AYet again, since we already have a list of urls after parsing, we can just call mapConcat(identity).

Now let’s make a producer actor to be a source for our flow:

Here, we’re just sending the initial url to ourselves, but it can come from anywhere potentially, like another akka cluster instance.

Adding buffering

We’ve previously mentioned that producing too many work units too fast can overflow the consumer, which is a problem that akka actors do not solve on their own, and you’ll need to work on a custom implementation if you need to mitigate these scenarios. Fortunately, one of the main features of Reactive Streams specification (and thus Akka Streams) is the ability to tackle situations like this. Let’s demonstrate by adding some buffering to our example.

OverflowStrategy.backpressure means that our stream will backpressure the source until we have less than 100 elements left in the queue for processing. Note that if we use an actor as a source we’ll have to handle this ourselves.

For instance, we can rewrite our producer like this:

If we tried to call onNext when totalDemand is 0 we’d get an IllegalStateException. There’s a number of alternative strategies we could use for buffering, like failing the entire stream if the buffer is ever filled or dropping excessive elements. We can also rewrite our Sink as a consumer actor to have more control over the backpressure strategy:

Here, we define our own request strategy, that will accept elements from stream in batches of 10 (or less, if the producer requested a smaller number, which is represented by remainingRequested). requestDemand method here returns the number of elements the consumer is ready to process. There’s a number of standard strategies already implemented.

Putting everything together and making computation recursive

For this, we just need to pass the results from Sink to Source and add some condition to exit recursion. We’ll start with defining a Url class:

where depth is the depth of recursion we’ll use for fetching pages. We’ll also keep track of the pages we’ve already visited within our producer:

Next, we’ll connect our consumer so it sends its results back to the producer

Our final flow will then look like this:

And that’s it! We have a parallel web crawler with buffering and end to end backpressure with just a few lines of code.

Limitations

Akka streams do not currently work with akka clustering. However, you can combine the two by running separate streams inside different cluster nodes and using conventional actor mechanisms for communication. What you can not do, however, is have a single stream spanning several cluster instances with backpressure working across these instances (because the stream can get deadlocked in case of lost messages).

References

Akka streams
Reactive streams
Full demo sources on Github

Originally published at tech.smartling.com on March 31, 2016.

--

--