Actor API

Actor trait 只定义了一个抽象方法,就是上面提到的 receive, 用来实现actor的行为。 如果当前 actor 的行为与收到的消息不匹配,则会调用 unhandled, 它的缺省实现是向actor系统的事件流中发布一条 akka.actor.UnhandledMessage(message, sender, recipient)。 另外,它还包括:

  • self 代表本actor的 ActorRef
  • sender 代表最近收到的消息的发送actor
  • supervisorStrategy 用户可重写它来定义对子actor的监管策略
  • context 暴露actor和当前消息的上下文信息,如:
    • 用于创建子actor的工厂方法 (actorOf)
    • actor所属的系统
    • 父监管者
    • 所监管的子actor
    • 生命周期监控
    • hotswap行为栈

你可以import context 的成员来避免总是要加上 context.前缀

class ActorOneSon extends Actor {
import context._
val myActorSon = actorOf(Props[ActorOne], name = “actoroneson”)
def receive = {
case x ⇒ myActorSon ! x
}
}

使用DeathWatch进行生命周期监控

为了在其它actor结束时收到通知, actor可以将自己注册为其它actor在终止时所发布的 Terminated 消息 的接收者 . 这个服务是由actor系统的 DeathWatch 组件提供的。 注册一个监控器很简单:

class WatchActor extends Actor {
import context._
val system = ActorSystem(“MySystem”)
val child = actorOf(Props.empty, “child”)
context.watch(child) // <-- 这是注册所需要的唯一调用
var lastSender = system.deadLetters
def receive = {
case “kill” ⇒
stop(child); lastSender = sender
case Terminated(`child`) ⇒ lastSender ! “finished”
}
}

要注意 Terminated 消息的产生与注册和终止行为所发生的顺序无关。多次注册并不表示会有多个消息产生,也不保证有且只有一个这样的消息被接收到:如果被监控的actor已经生成了消息并且已经进入了队列, 在这个消息被处理之前又发生了另一次注册,则会有第二个消息进入队列,因为一个已经终止的actor注册监控器会立刻导致Terminated 消息的发生。 可以使用 context.unwatch(target)来停止对另一个actor的生存状态的监控, 但很明显这不能保证不会接收到Terminated 消息因为该消息可能已经进入了队列。

启动 Hook

actor启动后,它的 preStart 会被立即执行。

重启 Hook

所有的Actor都是被监管的, i.e. 以某种失败处理策略与另一个actor链接在一起。 如果在处理一个消息的时候抛出的异常,Actor将被重启。这个重启过程包括上面提到的Hook:

  1. 要被重启的actor的 preRestart 被调用,携带着导致重启的异常以及触发异常的消息; 如果重启并不是因为消息的处理而发生的,所携带的消息为 None , 例如,当一个监管者没有处理某个异常继而被它自己的监管者重启时。 这个方法是用来完成清理、准备移交给新的actor实例的最佳位置。 它的缺省实现是终止所有的子actor并调用 postStop.
  2. 最初 actorOf 调用的工厂方法将被用来创建新的实例。
  3. 新的actor的 postRestart 方法被调用,携带着导致重启的异常信息。 By default the preStart is called, just as in the normal start-up case.

actor的重启会替换掉原来的actor对象; 重启不影响邮箱的内容, 所以对消息的处理将在 postRestart hook 返回后继续. 触发异常的消息不会被重新接收。在actor重启过程中所有发送到该actor的消息将象平常一样被放进邮箱队列中。

终止 Hook

一个Actor终止后,它的 postStop hook将被调用, 这可以用来取消该actor在其它服务中的注册. 这个hook保证在该actor的消息队列被禁止后才运行, i.e. 之后发给该actor的消息将被重定向到 ActorSystem的deadLetters 中。

标识 Actor

每个actor拥有一个唯一的逻辑路径, 此路径是由从actor系统的根开始的父子链构成;它还拥有一个物理路径,如果监管链包含有远程监管者,此路径可能会与逻辑路径不同。这些路径用来在系统中查找actor,例如,当收到一个远程消息时查找收件者, 但是它们的更直接的用处在于:actor可以通过指定绝对或相对路径(逻辑的或物理的)来查找其它的actor并随结果获取一个 ActorRef

context.actorFor(“/user/serviceA/aggregator”) // 查找绝对路径
context.actorFor(“…/joe”) // 查找同一父监管者下的兄弟

其中指定的路径被解释为一个 java.net.URI, 它以 / 分隔成路径段. 如果路径以 /开始, 表示一个绝对路径,从根监管者 ( "/user"的父亲)开始查找; 否则是从当前actor开始。如果某一个路径段为 …, 会找到当前所遍历到的actor的上一级, 否则则会向下一级寻找具有该名字的子actor。 必须注意的是 actor路径中的… 总是表示逻辑结构,也就是其监管者。 如果要查找的路径不存在,会返回一个特殊的actor引用,它的行为与actor系统的死信队列类似,但是保留其身份(i.e. 查找路径)。 如果开启了远程调用,则远程actor地址也可以被查找。:

context.actorFor(“akka://app@otherhost:1234/user/serviceB”)

这些查找动作立即返回一个(可能是远程的)actor引用, 所以你必须向它发送一个消息并等待其响应,来确认serviceB 是真正可访问和运行的。