Akka并发编程-Akka特性,并实现两个进程之间的通信(含完整Demo)

Akka介绍

Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用的工具包。Akka是使用scala开发的库,同时可以使用scala和Java语言来开发基于Akka的应用程序。

Akka特性

  • 提供基于异步非阻塞、高性能的事件驱动编程模型
  • 内置容错机制,允许Actor在出错时进行恢复或者重置操作
  • 超级轻量级的事件处理(每GB堆内存几百万Actor)
  • 使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序。

Akka通信过程

以下图片说明了Akka Actor的并发编程模型的基本流程:

  1. 学生创建一个ActorSystem
  2. 通过ActorSystem来创建一个ActorRef(老师的引用),并将消息发送给ActorRef
  3. ActorRef将消息发送给Message Dispatcher(消息分发器)
  4. Message Dispatcher将消息按照顺序保存到目标Actor的MailBox中
  5. Message Dispatcher将MailBox放到一个线程中
  6. MailBox按照顺序取出消息,最终将它递给TeacherActor接受的方法中

在这里插入图片描述

入门Demo

基于Akka创建两个Actor,Actor之间可以互相发送消息。
在这里插入图片描述

实现步骤

  1. 创建Maven模块
  2. 创建并加载Actor
  3. 发送/接收消息

1. 创建Maven模块

使用Akka需要导入Akka库,我们这里使用Maven来管理项目

  1. 创建Maven模块
  2. 打开pom.xml文件,导入akka Maven依赖和插件

2. 创建并加载Actor

创建两个Actor

  • SenderActor:用来发送消息
  • ReceiveActor:用来接收,回复消息

创建Actor

  1. 创建ActorSystem
  2. 创建自定义Actor
  3. ActorSystem加载Actor

3. 发送/接收消息

  • 使用样例类封装消息
  • SubmitTaskMessage——提交任务消息
  • SuccessSubmitTaskMessage——任务提交成功消息
  • 使用类似于之前学习的Actor方式,使用!发送异步消息

完整代码:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.akka</groupId>
    <artifactId>akkatest2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
package com.xu

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object ScalaTest {

  case class SubmitTaskMessage(msg:String)
  case class SuccessSubmitTaskMessage(msg:String)
  object SenderActor extends Actor {
    println("d")
    override def preStart(): Unit = println("执行发送,开始啦!!!")
    println("e")
    override def receive: Receive = {
      case "start" =>
        val receiveActor = this.context.actorSelection("/user/receiverActor")
        receiveActor ! SubmitTaskMessage("请完成#001任务!")
      case SuccessSubmitTaskMessage(msg) =>
        println(s"接收到来自${sender.path}的d消息: $msg")
    }
  }

  object ReceiverActor extends Actor {
    override def preStart(): Unit = println("开始接受消息啦!!")
    override def receive: Receive = {
      case SubmitTaskMessage(msg) =>
        println(s"接收到来自${sender.path}的发送者的消息: $msg")
        sender ! SuccessSubmitTaskMessage("完成提交")
      case _ => println("未匹配的消息类型")
    }
  }

  def main(args: Array[String]): Unit = {
    println("a")
    val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
    println("b")
    val senderActor: ActorRef = actorSystem.actorOf(Props(SenderActor), "senderActor")
    println("c")
    val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
    println("f")
    senderActor ! "start"
    println("15")
  }

}

Akka定时任务

Akka中,提供一个scheduler对象来实现定时调度功能。使用ActorSystem.scheduler.schedule方法,可以启动一个定时任务。
第一种:发送消息

def schedule(
    initialDelay: FiniteDuration,		// 延迟多久后启动定时任务
    interval: FiniteDuration,			// 每隔多久执行一次
    receiver: ActorRef,					// 给哪个Actor发送消息
    message: Any)						// 要发送的消息
(implicit executor: ExecutionContext)	// 隐式参数:需要手动导入

第二种:自定义实现

def schedule(
    initialDelay: FiniteDuration,			// 延迟多久后启动定时任务
    interval: FiniteDuration				// 每隔多久执行一次
)(f: ⇒ Unit)								// 定期要执行的函数,可以将逻辑写在这里
(implicit executor: ExecutionContext)		// 隐式参数:需要手动导入

定义一个Actor,每1秒发送一个消息给Actor,Actor收到后打印消息 使用发送消息方式实现

package com.xu
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object ScalaTest {
  object RecevieActor extends Actor {
    override def receive: Receive = {
      case x => println(x)
    }
  }
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    val receiveActor = actorSystem.actorOf(Props(RecevieActor))
    // 3. 启动scheduler,定期发送消息给Actor
    // 导入一个隐式转换
    import scala.concurrent.duration._
    // 导入隐式参数
    import actorSystem.dispatcher
    actorSystem.scheduler.schedule(0 seconds,
      1 seconds,
      receiveActor, "hello")
  }
}

自定义实现:

package com.xu
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object ScalaTest {
  object RecevieActor extends Actor {
    override def receive: Receive = {
      case x => println(x)
    }
  }
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
    val senderActor: ActorRef = actorSystem.actorOf(Props(RecevieActor), "recevieActor")
    import actorSystem.dispatcher
    import scala.concurrent.duration._
    actorSystem.scheduler.schedule(0 seconds, 1 seconds) {
      senderActor ! "timer"
    }
  }
}

实现两个进程之间的通信

在这里插入图片描述

worker内容:

在这里插入图片描述

WorkerActor

package com.xu
import akka.actor.Actor
object WorkerActor extends Actor {override def receive: Receive = {
    case "setup" => {
      println("WorkerActor:接收到消息setup")
      // 发送消息给Master
      // 1. 获取到MasterActor的引用
      // Master的引用路径:akka.tcp://actorSystem@127.0.0.1:8888/user/masterActor
      val masterActor = context.actorSelection("akka.tcp://actorSystem@127.0.0.1:8888/user/masterActor")

      // 2. 再发送消息给MasterActor
      masterActor ! "connect"
    }
    case "success" => {
      println("WorkerActor:收到success消息")
    }
  }
}

import com.typesafe.config.ConfigFactory
import akka.actor.{Actor, ActorSystem, Props}
import com.xu.WorkerActor
object WorkerTest {
  def main(args: Array[String]): Unit = {
    // 1. 创建一个ActorSystem
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())

    // 2. 加载Actor
    val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")

    // 3. 发送消息给Actor
    workerActor ! "setup"
  }
}

application.conf

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "9999"

在这里插入图片描述

MasterActor

package com.xu
import akka.actor.Actor
object MasterActor extends Actor {
  override def receive: Receive = {
    case "connect" => {
      println("MasterActor: 接收到的connect的消息")
      //获取发送者Actor的引用
      sender !"success"
    }
  }
}

package com.xu
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object ScalaTest {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    //加载actor
    val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")
  }
}

application.conf

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "8888"

pom.xml是两个项目共有的:

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

先启动master,再启动worker:

结果:
在这里插入图片描述

在这里插入图片描述