Akka构建高并发程序之-Actor原创
金蝶云社区-艾贺521
艾贺521
2人赞赏了该文章 1,019次浏览 未经作者许可,禁止转载编辑于2019年05月21日 18:01:08
summary-icon摘要由AI智能服务提供

Akka是一个用于构建高并发、分布式和弹性消息驱动Java或Scala应用的框架。它基于Scala开发,但也可在Java中使用。Akka采用actor模型,比线程更细粒度且易于开发并发程序。它提供容错机制,支持单机和分布式部署,并具备位置透明的Actor定位服务。Actor模型通过消息传递驱动任务执行,具有线程安全和轻量级特点。Akka中的actor有层级关系,每个actor都属于某个parent,通过特定方法创建和管理。Actor还有生命周期管理API,如preStart和postStop,用于在actor启动和停止时执行特定操作。

Akka构建高并发程序

Akka是一个用于构建高并发,分布式的,弹性的消息驱动的Java或Scala应用。

1558424302668.png


简介

Akka是使用Scala语言开发的,因为Scala也是运行在JVM之上的语言,所以我们也可以在Java中使用Akka。

Akka是使用actor模型,其粒度比线程更小,但是却更容易去开发并发的程序。除此之外,Akka还提供了一套容错机制,允许在Actor出现异常的时候进行一些回复或者重置操作,akka除了能在单机上构建高并发程序之外,还能在网络中构建分布式程序,并提供位置透明的Actor定位服务。


听起来是不是很强大,那么一起开启Actor之旅吧。



Actor模型

在Akka中,消息在actor之间进行传递和处理,以此驱动任务的执行,不同于常见的OOP需要调用某个对象的方法才能做某事。

Actor的方式更像是问答的:

比如老师问同学1+1等于?然后同学听到了,回答说是2. actor之间的通信方式就如同这样。


Actor也拥有线程安全和轻量级的特点:

  • 线程安全: actor运行于线程池之上,单个actor总是线程安全的,其内部的邮箱保证只有一个消息处理完之后,才会发送下一个消息。并且本身在处理接收到的消息时是串行的

  • 轻量级: 大型应用中,可能同时运行着成千上万个Actor,在Akka中,每个Actor只占用300字节左右。即使单机内存不够用了,也可以方便的切换成分布式模式。


Actor的层级

Akka中的actor一直都会属于某个Parent,一般我们使用如下方式创建Actor:

getContext().actorOf()

这种方式会在已经存在的actor下面创建一个子actor,当前的actor就是新创建actor的父亲。


那么谁是第一个actor呢?

一般所有的actor都有一个共同的父亲,新的actor也可以通过如下方式创建新的actor实例

system.actorOf()

如果我们创建了一个名叫:“someActor”的actor,那么它的引用路径就是:/user/someActor

1558430406108.png

实际上在我们创建actor之前,akka就已经默认的创建了3个actors,他们用于监管接下来新创建的子actor。

  • Root guardian:整个ActorSystem的根,它是所有actor的父亲,并且在系统终止的时候,它也是最后一个被停止的。

  • /user 这个是我们最常见到的,所有的通过Actorsystem.actorOf()方法创建的Actor都属于该分支下,这个是我们能手动创建的最高级别Actor。 其他通过ActorContext.actorOf() 方法创建的Actor都是其子级。

  • /system 系统层面创建的,主要与系统的整体行为有关,在开发阶段不需要对其有过多的关注。


另外需要注意的是尽量保证每个应用程序内部只需要一个ActorSystem对象


通过一个demo来演示actor的层级关系:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class ActorHierarchyDemo {
    public static void main(String[] args) throws java.io.IOException {
        ActorSystem system = ActorSystem.create("testSystem");

        ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor");
        System.out.println("First: " + firstRef);
        firstRef.tell("printit", ActorRef.noSender());

        System.out.println(">>> Press ENTER to exit <<<");
        try {
            System.in.read();
        } finally {
            system.terminate();
        }
    }

    static class PrintMyActorRefActor extends AbstractActor {
        static Props props() {
            return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new);
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "printit",
                            p -> {
                                ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
                                System.out.println("Second: " + secondRef);
                            })
                    .build();
        }
    }
}

结果:

1558431156817.png

  • akka是协议的前缀,两个路径都是akka的协议

  • testSystem是我们创建的ActorSystem名称,我们也可以去指定其他的名称

  • 因为second-actor是通过first-actor创建的,所以在路径上是它的孩子

  • 最后的那些数字标识符不重要,一般我们可以忽略掉


Actor的生命周期

每个actor都会经历“生老病死”的阶段, 在特定的阶段,我们要做一些正确的事情。Actor暴露了一些生命周期管理的API,但是最常用的是preStart与postStop


  • preStart():在actor启动但是还没有处理第一条消息之前运行

  • postStop():在actor停止之前,这个时候已经没有任何的消息可以处理了


想要去停止一个actor,最好在actor内部去使用

getContext().stop(getSelf())

而不要在另外一个actor去停止其他的actor:getContext().stop(actorRef),这样的做法可能会导致一定的风险,一般我们是给要停止的actor发送一个停止消息。


来演示下:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class ActorLifecycleDemo {
    static class StartStopActor1 extends AbstractActor {
        static Props props() {
            return Props.create(StartStopActor1.class, StartStopActor1::new);
        }

        @Override
        public void preStart() {
            System.out.println("first started");
            getContext().actorOf(StartStopActor2.props(), "second");
        }

        @Override
        public void postStop() {
            System.out.println("first stopped");
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "stop",
                            s -> {
                                getContext().stop(getSelf());
                            })
                    .build();
        }
    }

    static class StartStopActor2 extends AbstractActor {

        static Props props() {
            return Props.create(StartStopActor2.class, StartStopActor2::new);
        }

        @Override
        public void preStart() {
            System.out.println("second started");
        }

        @Override
        public void postStop() {
            System.out.println("second stopped");
        }

        // Actor.emptyBehavior is a useful placeholder when we don't
        // want to handle any messages in the actor.
        @Override
        public Receive createReceive() {
            return receiveBuilder().build();
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("testSystem");
        ActorRef first = system.actorOf(StartStopActor1.props(), "first");
        first.tell("stop", ActorRef.noSender());
    }
}


1558431817660.png


当我们想要停止first actor的时候,它会首先去停止它的子actor。


错误处理

父子actor通过它们的生命周期连接在一起,当某个actor出现错误的时候,它会被临时的挂起,当失败传递到其父亲actor的时候,父亲actor会判断要怎么处理这个错误信息。从这个角度来看得话,父亲actor是作为子actor的监管,默认的监管策略是停止然后重启子Actor,当然我们也可以自定义自己的监管策略。


证明下默认的监管策略吧:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

public class SupervisorDemo {

    // 监管的
    static class SupervisingActor extends AbstractActor {
        static Props props() {
            return Props.create(SupervisingActor.class, SupervisingActor::new);
        }

        ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor");

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "failChild",
                            f -> {
                                child.tell("fail", getSelf());
                            })
                    .build();
        }
    }

    // 被监管的
    static class SupervisedActor extends AbstractActor {
        static Props props() {
            return Props.create(SupervisedActor.class, SupervisedActor::new);
        }

        @Override
        public void preStart() {
            System.out.println("supervised actor started");
        }

        @Override
        public void postStop() {
            System.out.println("supervised actor stopped");
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "fail",
                            f -> {
                                System.out.println("supervised actor fails now");
                                throw new Exception("I failed!");
                            })
                    .build();
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("testSystem");
        ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor");
        supervisingActor.tell("failChild", ActorRef.noSender());
    }
}

1558432367814.png


可以看到在supervised失败抛出异常之后,又重新的启动了起来。


最后

这里我们只是聊了下Akka的特点,包括其核心的Actor特点,然后给了几个案例来证明它的actor特点。希望能帮到大家


参考


注:

本文独家发布自金蝶云社区


赞 2