Akka是一个用于构建高并发、分布式和弹性消息驱动Java或Scala应用的框架。它基于Scala开发,但也可在Java中使用。Akka采用actor模型,比线程更细粒度且易于开发并发程序。它提供容错机制,支持单机和分布式部署,并具备位置透明的Actor定位服务。Actor模型通过消息传递驱动任务执行,具有线程安全和轻量级特点。Akka中的actor有层级关系,每个actor都属于某个parent,通过特定方法创建和管理。Actor还有生命周期管理API,如preStart和postStop,用于在actor启动和停止时执行特定操作。
Akka构建高并发程序
Akka是一个用于构建高并发,分布式的,弹性的消息驱动的Java或Scala应用。
简介
Akka是使用Scala语言开发的,因为Scala也是运行在JVM之上的语言,所以我们也可以在Java中使用Akka。
Akka是使用actor模型,其粒度比线程更小,但是却更容易去开发并发的程序。除此之外,Akka还提供了一套容错机制,允许在Actor出现异常的时候进行一些回复或者重置操作,akka除了能在单机上构建高并发程序之外,还能在网络中构建分布式程序,并提供位置透明的Actor定位服务。
听起来是不是很强大,那么一起开启Actor之旅吧。
Actor模型
在Akka中,消息在actor之间进行传递和处理,以此驱动任务的执行,不同于常见的OOP需要调用某个对象的方法才能做某事。
Actor的方式更像是问答的:
比如老师问同学1+1等于?然后同学听到了,回答说是2. actor之间的通信方式就如同这样。
Actor也拥有线程安全和轻量级的特点:
线程安全: actor运行于线程池之上,单个actor总是线程安全的,其内部的邮箱保证只有一个消息处理完之后,才会发送下一个消息。并且本身在处理接收到的消息时是串行的
轻量级: 大型应用中,可能同时运行着成千上万个Actor,在Akka中,每个Actor只占用300字节左右。即使单机内存不够用了,也可以方便的切换成分布式模式。
Actor的层级
Akka中的actor一直都会属于某个Parent,一般我们使用如下方式创建Actor:
getContext().actorOf()
这种方式会在已经存在的actor下面创建一个子actor,当前的actor就是新创建actor的父亲。
那么谁是第一个actor呢?
一般所有的actor都有一个共同的父亲,新的actor也可以通过如下方式创建新的actor实例
system.actorOf()
如果我们创建了一个名叫:“someActor”的actor,那么它的引用路径就是:/user/someActor
实际上在我们创建actor之前,akka就已经默认的创建了3个actors,他们用于监管接下来新创建的子actor。
Root guardian:整个ActorSystem的根,它是所有actor的父亲,并且在系统终止的时候,它也是最后一个被停止的。
/user 这个是我们最常见到的,所有的通过Actorsystem.actorOf()方法创建的Actor都属于该分支下,这个是我们能手动创建的最高级别Actor。 其他通过ActorContext.actorOf() 方法创建的Actor都是其子级。
/system 系统层面创建的,主要与系统的整体行为有关,在开发阶段不需要对其有过多的关注。
另外需要注意的是尽量保证每个应用程序内部只需要一个ActorSystem对象
通过一个demo来演示actor的层级关系:
import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class ActorHierarchyDemo { public static void main(String[] args) throws java.io.IOException { ActorSystem system = ActorSystem.create("testSystem"); ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor"); System.out.println("First: " + firstRef); firstRef.tell("printit", ActorRef.noSender()); System.out.println(">>> Press ENTER to exit <<<"); try { System.in.read(); } finally { system.terminate(); } } static class PrintMyActorRefActor extends AbstractActor { static Props props() { return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals( "printit", p -> { ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor"); System.out.println("Second: " + secondRef); }) .build(); } } }
结果:
akka是协议的前缀,两个路径都是akka的协议
testSystem是我们创建的ActorSystem名称,我们也可以去指定其他的名称
因为second-actor是通过first-actor创建的,所以在路径上是它的孩子
最后的那些数字标识符不重要,一般我们可以忽略掉
Actor的生命周期
每个actor都会经历“生老病死”的阶段, 在特定的阶段,我们要做一些正确的事情。Actor暴露了一些生命周期管理的API,但是最常用的是preStart与postStop
preStart():在actor启动但是还没有处理第一条消息之前运行
postStop():在actor停止之前,这个时候已经没有任何的消息可以处理了
想要去停止一个actor,最好在actor内部去使用
getContext().stop(getSelf())
而不要在另外一个actor去停止其他的actor:getContext().stop(actorRef),这样的做法可能会导致一定的风险,一般我们是给要停止的actor发送一个停止消息。
来演示下:
import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class ActorLifecycleDemo { static class StartStopActor1 extends AbstractActor { static Props props() { return Props.create(StartStopActor1.class, StartStopActor1::new); } @Override public void preStart() { System.out.println("first started"); getContext().actorOf(StartStopActor2.props(), "second"); } @Override public void postStop() { System.out.println("first stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals( "stop", s -> { getContext().stop(getSelf()); }) .build(); } } static class StartStopActor2 extends AbstractActor { static Props props() { return Props.create(StartStopActor2.class, StartStopActor2::new); } @Override public void preStart() { System.out.println("second started"); } @Override public void postStop() { System.out.println("second stopped"); } // Actor.emptyBehavior is a useful placeholder when we don't // want to handle any messages in the actor. @Override public Receive createReceive() { return receiveBuilder().build(); } } public static void main(String[] args) { ActorSystem system = ActorSystem.create("testSystem"); ActorRef first = system.actorOf(StartStopActor1.props(), "first"); first.tell("stop", ActorRef.noSender()); } }
当我们想要停止first actor的时候,它会首先去停止它的子actor。
错误处理
父子actor通过它们的生命周期连接在一起,当某个actor出现错误的时候,它会被临时的挂起,当失败传递到其父亲actor的时候,父亲actor会判断要怎么处理这个错误信息。从这个角度来看得话,父亲actor是作为子actor的监管,默认的监管策略是停止然后重启子Actor,当然我们也可以自定义自己的监管策略。
证明下默认的监管策略吧:
import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class SupervisorDemo { // 监管的 static class SupervisingActor extends AbstractActor { static Props props() { return Props.create(SupervisingActor.class, SupervisingActor::new); } ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor"); @Override public Receive createReceive() { return receiveBuilder() .matchEquals( "failChild", f -> { child.tell("fail", getSelf()); }) .build(); } } // 被监管的 static class SupervisedActor extends AbstractActor { static Props props() { return Props.create(SupervisedActor.class, SupervisedActor::new); } @Override public void preStart() { System.out.println("supervised actor started"); } @Override public void postStop() { System.out.println("supervised actor stopped"); } @Override public Receive createReceive() { return receiveBuilder() .matchEquals( "fail", f -> { System.out.println("supervised actor fails now"); throw new Exception("I failed!"); }) .build(); } } public static void main(String[] args) { ActorSystem system = ActorSystem.create("testSystem"); ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor"); supervisingActor.tell("failChild", ActorRef.noSender()); } }
可以看到在supervised失败抛出异常之后,又重新的启动了起来。
最后
这里我们只是聊了下Akka的特点,包括其核心的Actor特点,然后给了几个案例来证明它的actor特点。希望能帮到大家
参考
注:
本文独家发布自金蝶云社区