Akka并发编程-Akka特性,并实现两个进程之间的通信(含完整Demo)
Akka介绍
Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用的工具包。Akka是使用scala开发的库,同时可以使用scala和Java语言来开发基于Akka的应用程序。
Akka特性
- 提供基于异步非阻塞、高性能的事件驱动编程模型
- 内置容错机制,允许Actor在出错时进行恢复或者重置操作
- 超级轻量级的事件处理(每GB堆内存几百万Actor)
- 使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序。
Akka通信过程
以下图片说明了Akka Actor的并发编程模型的基本流程:
- 学生创建一个ActorSystem
- 通过ActorSystem来创建一个ActorRef(老师的引用),并将消息发送给ActorRef
- ActorRef将消息发送给Message Dispatcher(消息分发器)
- Message Dispatcher将消息按照顺序保存到目标Actor的MailBox中
- Message Dispatcher将MailBox放到一个线程中
- MailBox按照顺序取出消息,最终将它递给TeacherActor接受的方法中
入门Demo
基于Akka创建两个Actor,Actor之间可以互相发送消息。
实现步骤
- 创建Maven模块
- 创建并加载Actor
- 发送/接收消息
1. 创建Maven模块
使用Akka需要导入Akka库,我们这里使用Maven来管理项目
- 创建Maven模块
- 打开pom.xml文件,导入akka Maven依赖和插件
2. 创建并加载Actor
创建两个Actor
- SenderActor:用来发送消息
- ReceiveActor:用来接收,回复消息
创建Actor
- 创建ActorSystem
- 创建自定义Actor
- ActorSystem加载Actor
3. 发送/接收消息
- 使用样例类封装消息
- SubmitTaskMessage——提交任务消息
- SuccessSubmitTaskMessage——任务提交成功消息
- 使用类似于之前学习的Actor方式,使用
!
发送异步消息
完整代码:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.akka</groupId>
<artifactId>akkatest2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.14</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.14</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.xu
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object ScalaTest {
case class SubmitTaskMessage(msg:String)
case class SuccessSubmitTaskMessage(msg:String)
object SenderActor extends Actor {
println("d")
override def preStart(): Unit = println("执行发送,开始啦!!!")
println("e")
override def receive: Receive = {
case "start" =>
val receiveActor = this.context.actorSelection("/user/receiverActor")
receiveActor ! SubmitTaskMessage("请完成#001任务!")
case SuccessSubmitTaskMessage(msg) =>
println(s"接收到来自${sender.path}的d消息: $msg")
}
}
object ReceiverActor extends Actor {
override def preStart(): Unit = println("开始接受消息啦!!")
override def receive: Receive = {
case SubmitTaskMessage(msg) =>
println(s"接收到来自${sender.path}的发送者的消息: $msg")
sender ! SuccessSubmitTaskMessage("完成提交")
case _ => println("未匹配的消息类型")
}
}
def main(args: Array[String]): Unit = {
println("a")
val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
println("b")
val senderActor: ActorRef = actorSystem.actorOf(Props(SenderActor), "senderActor")
println("c")
val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
println("f")
senderActor ! "start"
println("15")
}
}
Akka定时任务
Akka中,提供一个scheduler对象来实现定时调度功能。使用ActorSystem.scheduler.schedule方法,可以启动一个定时任务。
第一种:发送消息
def schedule(
initialDelay: FiniteDuration, // 延迟多久后启动定时任务
interval: FiniteDuration, // 每隔多久执行一次
receiver: ActorRef, // 给哪个Actor发送消息
message: Any) // 要发送的消息
(implicit executor: ExecutionContext) // 隐式参数:需要手动导入
第二种:自定义实现
def schedule(
initialDelay: FiniteDuration, // 延迟多久后启动定时任务
interval: FiniteDuration // 每隔多久执行一次
)(f: ⇒ Unit) // 定期要执行的函数,可以将逻辑写在这里
(implicit executor: ExecutionContext) // 隐式参数:需要手动导入
定义一个Actor,每1秒发送一个消息给Actor,Actor收到后打印消息 使用发送消息方式实现
package com.xu
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object ScalaTest {
object RecevieActor extends Actor {
override def receive: Receive = {
case x => println(x)
}
}
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
val receiveActor = actorSystem.actorOf(Props(RecevieActor))
// 3. 启动scheduler,定期发送消息给Actor
// 导入一个隐式转换
import scala.concurrent.duration._
// 导入隐式参数
import actorSystem.dispatcher
actorSystem.scheduler.schedule(0 seconds,
1 seconds,
receiveActor, "hello")
}
}
自定义实现:
package com.xu
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object ScalaTest {
object RecevieActor extends Actor {
override def receive: Receive = {
case x => println(x)
}
}
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
val senderActor: ActorRef = actorSystem.actorOf(Props(RecevieActor), "recevieActor")
import actorSystem.dispatcher
import scala.concurrent.duration._
actorSystem.scheduler.schedule(0 seconds, 1 seconds) {
senderActor ! "timer"
}
}
}
实现两个进程之间的通信
worker内容:
WorkerActor
package com.xu
import akka.actor.Actor
object WorkerActor extends Actor {override def receive: Receive = {
case "setup" => {
println("WorkerActor:接收到消息setup")
// 发送消息给Master
// 1. 获取到MasterActor的引用
// Master的引用路径:akka.tcp://actorSystem@127.0.0.1:8888/user/masterActor
val masterActor = context.actorSelection("akka.tcp://actorSystem@127.0.0.1:8888/user/masterActor")
// 2. 再发送消息给MasterActor
masterActor ! "connect"
}
case "success" => {
println("WorkerActor:收到success消息")
}
}
}
import com.typesafe.config.ConfigFactory
import akka.actor.{Actor, ActorSystem, Props}
import com.xu.WorkerActor
object WorkerTest {
def main(args: Array[String]): Unit = {
// 1. 创建一个ActorSystem
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
// 2. 加载Actor
val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")
// 3. 发送消息给Actor
workerActor ! "setup"
}
}
application.conf
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "9999"
MasterActor
package com.xu
import akka.actor.Actor
object MasterActor extends Actor {
override def receive: Receive = {
case "connect" => {
println("MasterActor: 接收到的connect的消息")
//获取发送者Actor的引用
sender !"success"
}
}
}
package com.xu
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object ScalaTest {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
//加载actor
val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
application.conf
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "8888"
pom.xml是两个项目共有的:
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.14</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.14</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
先启动master,再启动worker:
结果: