Why Streams?
In software development, there can be cases where we need to handle the potentially large amount of data. So while handling these kinds of scenarios there can be issues such as `out of memory` exceptions so we should divide the data in chunks and handle the chunks independently.
There come Akka streams for rescue to do this in a more predictable and less chaotic manner.
Introduction
Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.
- Source – this is the Source of data. It has exactly one output. We can think of Source as Publisher.
- Sink – this is the Receiver of data. It has exactly one input. We can think of Sink as Receiver.
- Flow – this is the Transformation that acts on the Source. It has exactly one input and one output.
Here Flow sits in between the Source and Sink as they are the Transformations applied on the Source data.
A very good thing is that we can combine these elements to obtain another one e.g combine Source and Flow to obtain another Source.
Akka streams are called reactive streams because of its backpressure handling capabilities.
What are Reactive Streams?
Applications developed using streams can run into problems if Source is generating data too fast than the Sink can handle. This causes Sink to buffer the data – but the problem is if data is too large then Sink buffer will also grow and can lead to memory issues.
So to handle this Sink need to communicate with the Source – to slow down the generation of data until it finished handling of current data. This handle of communication between Publisher and Receiver is called as Backpressure handling. And Streams that handle this mechanism are called Reactive Streams.
Example using Akka Stream:
In this example, let’s try to find out prime numbers between 1 to 10000 using Akka stream. Akka stream version used is 2.5.11.
package example.akka
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Future
object AkkaStreamExample {
def isPrime(i :Int) : Boolean = {
if (i <= 1) false
else if (i == 2) true
else !(2 until i).exists(x => i % x == 0)
}
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("actor-system")
implicit val materializer = ActorMaterializer()
val numbers = 1 to 10000
//Source that will iterate over the number sequence
val numberSource: Source[Int, NotUsed] = Source.fromIterator(() => numbers.iterator)
//Flow for Prime number detection
val isPrimeFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(num => isPrime(num))
//Source from original Source with Flow applied
val primeNumbersSource: Source[Int, NotUsed] = numberSource.via(isPrimeFlow)
//Sink to print the numbers
val consoleSink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
//Connect the Source with the Sink and run it using the materializer
primeNumbersSource.runWith(consoleSink)
}
}
Above example illustrated as a diagram:
- `Source` – based on the number iterator
`Source`, as explained already, represents a stream. Source takes two type parameters. The first one represents the type of data it emits and the second one is the type of the auxiliary value it can produce when ran/materialized. If we don’t produce any we use the NotUsed type provided by Akka.
The static methods to create Source are
- fromIterator – its will accepts elements till iterator is empty
- fromPublisher – uses object that provides publisher functionality
- fromFuture – new Source from a given future
- fromGraph – Graph is also a Source.
- `Flow` – filters out only prime numbers
Basically, `Flow` is an ordered set of transformations to the provided input. It takes 3 type parameters – input datatype, output datatype & auxiliary datatype.
We can create a Source by combining existing one and a Flow- as used in code
val primeNumbersSource: Source[Int, NotUsed] = numberSource.via(isPrimeFlow)
- `Sink` – prints numbers to the console
It is basically subscriber of the data and the last element of the Stream steps.
The sink is basically a Flow which uses foreach or fold function to run a procedure over its input elements and propagate the auxiliary value.
As with Source and Flow, the companion object provides a method for creating an instance of it. As mentioned above the two main methods of doing so are:
- forEach – run the given function for each received element
- foreachParallel – same as forEach – except runs in parallel
- fold – run the given function for each received element, propagating the resulting value to the next iteration.
The runWith method produces a Future that will be completed when the Source is empty and Sink is finished with the processing of elements. If processing fails it returns Failure.
We can also create a RunnableGraph instance and run it manually using toMat (or viaMat).
- `ActorSystem` and `ActorMaterializer` are needed as Akka Stream uses Akka Actor model.
The `ActorMaterializer` class instance is needed to materialize a Flow into a Processor which represents a processing stage, which is a construct from the Reactive Streams standard, which Akka Streams implements.
In fact, Akka Streams employs back-pressure as described in the Reactive Streams standard mentioned above. Source, Flow, Sink get eventually transformed into low-level Reactive Streams constructs via the process of materialization.