Brown Bag Session 27/11/13
Gael Martin

What is AKKA?

"Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on the JVM" (www.akka.io)

Also is:
It is the name of a beautiful Swedish mountain up in the northern part of Sweden called Laponia. The mountain is also sometimes called "The Queen of Laponia"

Akka is available for JAVA and SCALA

Actor Model

Created by Carl Hewitt some 35 years ago at MIT, the Actor model is a good way to implement systems that involve multiple simultaneous communications.

The actor model in computer science is a mathematical model of concurrent computation that treats "actors" as the universal primitives of concurrent digital computation.

 In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message receive.

I/O Sync - vs - I/O ASYNC

SYNC: One thread is dedicated to each request, and that thread does all the processing for the request until a response is sent. Any I/O, such as a call to a remote service, is typically synchronous, which means the thread will become blocked and sit around idle until the I/O completes.

ASYNC: Typically use only a single thread per CPU core. The idea is to ensure that these scarce threads are never blocked: all I/O is asynchronous, so instead of waiting, the thread can process other requests, and only come back to the current one when the response from the I/O call is ready.

i/o SYNC Issues!

"sizing the thread pool"
  • Not enough threads -> no more requests can be processed, even if threads idles waiting.
  • Too many threads -> too much memory usage overhead becomes costly

"dead lock, live lock"
  • Threads waiting for each other to complete

"race conditions" AKA "it works on my machine"
  • Sharing data (mutable) between threads
  • Synchronising  threads incurs more waiting/blocking


I/O ASYNC WORLD

"Less latency"
no more waiting for all actions to complete sequentially.

"Less threads"
 no more blocked threads doing not much

"More throughput"
 same amount of thread able to handle more requests

"More scalable"
 more cores -> more threads -> more requests

WHAT is an ACTOR?


  • An active entity with an Inbox = Queue
  • A State  =  Storage
  • State is not shared, only accessible through...
  • ...messages passing
    • one message is executed at a time
    • messages are processed sequentially 

WHAT CAN I USE ACTOR FOR?

Actors can be an alternative to:

  • A thread
  • An object or Component
  • A callback or listener
  • A singleton or service
  • A router / load balancer / pool
  • A Java EE session bean
  • A out of process service
  • A Finite State Machine



ACtors operations


  1. CREATE
  2. SEND
  3. BECOME
  4. SUPERVISE

CREATE


  • Extremely lightweight (2.7 Million per Gb RAM)

  • Very strong encapsulation - encapsulates:
  1. state
  2. behaviour
  3. inbox

  • State & Behaviour is indistinguishable from each other

  • Observe state is by sending an actor a message 

DEFINE

public class Greeting implements Serializable {
  public final String who;
  public Greeting(String who) {
    this.who = who;
  }
}
public class GreetingActor extends UntypedActor {
  LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  public void onReceive(Object message) throws Exception {
    if (message instanceof Greeting)
      log.info("Hello {}", ((Greeting) message).who);
  }
} 

CREATE

ActorSystem system = ActorSystem.create("MySystem");
ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter"); 

ACTOR HIERARCHY

Actors can form hierarchies

Name resolution - like a file-system

SEND

  • Sends a message to an Actor

  • Asynchronous and Non-blocking - Fire-forget

  • Everything happens Reactively

  • An Actor is passive until a message is sent to it, which triggers behaviour within the Actor

  • Everything is asynchronous and lockless

SEND


ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter"); 
greeter.tell(new Greeting("Charlie Parker")); 

REPLY

public class GreetingActor extends UntypedActor {
  LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  public void onReceive(Object message) throws Exception {
    if (message instanceof Greeting)
      log.info("Hello {}", ((Greeting) message).who);
      getSender.tell("Thanks");
  }
}  

BECOME

  • Dynamically redefines Actor’s behaviour

  • Triggered reactively by receive of message

  • In a type system analogy it is as if the object changed type - changed interface, protocol & implementation

  • Will now react differently to the messages it receives

  • Behaviours are stacked & can be pushed and popped

WHY BECOME?

  •  Implement an FSM (Finite State Machine)

  • Spawn up generic Actor that can become whatever the Master currently needs

  • Let a highly contended Actor adaptively transform himself into an Actor Pool or a Router

  • Implement graceful degradation

BECOME - BEHAVIOURS

 
Procedure<Object> angry = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("bar")) {
getSender().tell("I am already angry?", getSelf());
} else if (message.equals("foo")) {
getContext().become(happy);
}
}
};
 
Procedure<Object> happy = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("bar")) {
getSender().tell("I am already happy :-)", getSelf());
} else if (message.equals("foo")) {
getContext().become(angry);
}
}
};
 

BECOME - RECV Message


  public void onReceive(Object message) {
if (message.equals("bar")) {
getContext().become(angry);
} else if (message.equals("foo")) {
getContext().become(happy);
} else {
unhandled(message);
}
}

SUPERVISE

Traditional failure management:

• You are given a SINGLE thread of control

• Need to do all explicit error handling WITHIN this single thread

• Errors do not propagate between threads

• Leads to DEFENSIVE programming with error handling TANGLED with business logic

SUPERVISE

"Let it crash!"

• Error handling in actors is handle by letting Actors monitor (supervise) each other for failure

• If an Actor crashes, a notification will be sent to his supervisor, who can react upon the failure (Restart, Resume, Stop, Escalate)

  • Provides clean separation of processing and error handling

SUPERVISE - Strategy

private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
 
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}

SUPERVISE - Hooks

public void preStart() {
}
 
public void preRestart(Throwable reason, scala.Option<Object> message) {
for (ActorRef each : getContext().getChildren()) {
getContext().unwatch(each);
getContext().stop(each);
}
postStop();
}
 
public void postRestart(Throwable reason) {
preStart();
}
 
public void postStop() {
}

ROUTERS - LOAD BALANCERS

ActorRef roundRobinRouter = getContext().actorOf(
Props.create(PrintlnActor.class).withRouter(new RoundRobinRouter(5)),
"router");

for (int i = 1; i <= 10; i++) {
roundRobinRouter.tell(i, getSelf());
}

THROTTLING

  // A simple actor that prints whatever it receives
ActorRef printer = system.actorOf(Props.create(Printer.class));
// The throttler for this example, setting the rate
ActorRef throttler = system.actorOf(Props.create(TimerBasedThrottler.class,
new Throttler.Rate(3, Duration.create(1, TimeUnit.SECONDS))));
// Set the target
throttler.tell(new Throttler.SetTarget(printer), null);
// These three messages will be sent to the target immediately
throttler.tell("1", null);
throttler.tell("2", null);
throttler.tell("3", null);
// These two will wait until a second has passed
throttler.tell("4", null);
throttler.tell("5", null);
 
//A simple actor that prints whatever it receives
public class Printer extends UntypedActor {
@Override
public void onReceive(Object msg) {
System.out.println(msg);
}
}

CIRCUIT BREAKERS

public class DangerousJavaActor extends UntypedActor {
 
private final CircuitBreaker breaker;
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
public DangerousJavaActor() {
this.breaker = new CircuitBreaker(
getContext().dispatcher(), getContext().system().scheduler(),
5, Duration.create(10, "s"), Duration.create(1, "m"))
.onOpen(new Runnable() {
public void run() {
notifyMeOnOpen();
}
});
}
 
public void notifyMeOnOpen() {
log.warning("My CircuitBreaker is now open, and will not close for one minute");
}

AKKA

By Gael Martin