Akka 学习(四)Remote Actor
目录
参考文章
- Gitter Chat,Akka 在线交流平台
- Akka Forums,Akka 论坛
- Akka in GitHub,Akka 开源项目仓库
- Akka Official Website,Akka 官网
- Akka Java API,Akka 应用程序编程接口
- 《Akka入门与实践》 [加]Jason Goodwin(贾森·古德温)
文章系列
- Akka 学习(一)Actor 初步认识与环境搭建 已完成
- Akka 学习(二)第一个入门程序 已完成
- Akka 学习(三)Actor的基本使用 已完成
- Akka 学习(四)Remote Actor 已完成
- Akka 学习(五)消息传递的方式 已完成
- Akka 学习(六)Actor的监督机制 已完成
- Akka 学习(七)Actor的生命周期 已完成
- Akka 学习(八)路由与Dispatcher 已完成
- Akka 学习(九)Akka Cluster 已完成
Akka 基础篇就此结束了,Akka基础篇主要介绍Akka的基本概念与一些基本术语,使用方式
代码:https://github.com/Eason-shu/Akka
一 介绍
1.1 Remote Actor
虽然Akka在单机上可以运行上百万的Actor,但出于容错、负载均衡、灰度发布、提高并行度等等原因,我们仍然需要能在多个不同的服务器上运行Actor。所以Akka提供了akka-remoting的扩展包,屏蔽底层网络传输的细节,让上层以及其简单的方式使用远程的Actor调度。
Akka Remoting 是一个以点对点方式连接 actor 系统的通信模块,它是 Akka 集群的基础。远程处理的设计由两个(相关的)设计决策驱动:
- 相关系统之间的通信是对称的:如果系统 A 可以连接到系统 B,那么系统 B 也必须能够独立连接到系统 A。
- 通信系统的角色在连接模式方面是对称的:没有只接受连接的系统,也没有只发起连接的系统。
这些决定的结果是不可能安全地创建具有预定义角色的纯客户端-服务器设置(违反假设 2)。对于客户端-服务器设置,最好使用 HTTP 或 Akka I/O。
重要提示:使用涉及网络地址转换、负载平衡器或 Docker 容器的设置违反了假设 1,除非在网络配置中采取额外步骤以允许相关系统之间的对称通信。在这种情况下,Akka 可以配置为绑定到与用于在 Akka 节点之间建立连接的地址不同的网络地址。请参阅NAT 后面或 Docker 容器中的 Akka。
1.2 适用场景
- remoting的存在其实是为akka cluster做底层支持的,通常并不会直接去使用remoting的包。但为了了解cluster的底层原理,还是有必要看下remoting。
- 同时,remoting被设计为Peer-to-Peer而非Client-Server,所以不适用于基于后者的系统开发,比如我们无法在一个provider为local的Actor里去查找一个remote actor发送消息,必须两者均为remote actor,才满足对等。
1.3 踩坑点
- Akka版本需要与Scala版本匹配
maven仓库地址:
注意版本匹配,不然会疯狂报错,运行不起来
我的版本:
scala:2.13.0
Akka版本:2.13
- 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hc</groupId>
<artifactId>ActorDemo03</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ActorDemo03</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<scala.version>2.11.7</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.13</artifactId>
<version>2.5.23</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>2.5.23</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.13</artifactId>
<version>2.5.23</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.11</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
二 实战
2.1 需求
- 我们还将创建一个数据库客户端,用于展示如何请求服务器,以及如何从远程Actor中获取Future。
- 服务器端的服务接收到客户端的请求后将返回Future。
- 这样我们就已经编写了一个可以使用的键值存储数据库(和redis很类似)以及一个可以使用该数据库的远程客户端。
2.2 Java 版本
2.2.1 效果图
- 服务端启动前
- 客服端启动
- 服务端收到请求后
2.2.2 实体类
- SetRequest
package pojo;
import java.io.Serializable;
/**
* @description: 设置消息
* @author: shu
* @createDate: 2022/11/28 11:51
* @version: 1.0
*/
public class SetRequest implements Serializable {
public final String key;
public final Object value;
public SetRequest(String key, Object value) {
this.key = key;
this.value = value;
}
}
- GetRequest
package pojo;
import java.io.Serializable;
/**
* @description: 获取消息
* @author: shu
* @createDate: 2022/11/28 11:52
* @version: 1.0
*/
public class GetRequest implements Serializable {
public final String key;
public GetRequest(String key) {
this.key = key;
}
}
- 错误消息
package pojo;
import java.io.Serializable;
/**
* @description:
* @author: shu
* @createDate: 2022/11/28 11:52
* @version: 1.0
*/
public class KeyNotFoundException extends Exception implements
Serializable {
public final String key;
public KeyNotFoundException(String key) {
this.key = key;
}
}
2.2.3 服务端Actor 处理
import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import pojo.GetRequest;
import pojo.KeyNotFoundException;
import pojo.SetRequest;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author: shu
* @createDate: 2022/11/28 12:31
* @version: 1.0
*/
public class RequestActor extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
protected final Map<String, Object> map = new HashMap<>();
@Override
public void preStart() throws Exception {
log.info("ToFindRemoteActor is starting");
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
// 设置消息
.match(SetRequest.class, message -> {
// 打印消息
log.info("Received Set request: {}", message.key);
// 缓存消息
map.put(message.key, message.value);
// 回应消息
sender().tell(new Status.Success(message.key), self());
})
// 得到消息
.match(GetRequest.class, message -> {
// 打印日志
log.info("Received Get request: {}", message.key);
// 获取消息
Object value = (Object) map.get(message.key);
Object response = (value!= null)
? value
: new Status.Failure(new KeyNotFoundException(message.key));
// 响应消息
sender().tell(response, self());
})
// 未找到消息
.matchAny(o ->
sender().tell(new Status.Failure(new ClassNotFoundException()), self())
)
.build();
}
/**
* 测试
* @param args
*/
public static void main(String[] args) {
// Config config = ConfigFactory.parseString(
// "akka.remote.netty.tcp.port=" + 2551)
// .withFallback(ConfigFactory.load("application.conf"));
// Create an Akka system
ActorSystem system = ActorSystem.create("akkademy");
// Create an actor
ActorRef ref = system.actorOf(Props.create(RequestActor.class), "akkademy-db");
System.out.println(ref);
}
}
2.2.4 服务端配置文件
akka {
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
}
}
log-sent-messages = on
log-received-messages = on
}
2.2.5 客服端Actor处理
package client;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.GetRequest;
import pojo.SetRequest;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static scala.compat.java8.FutureConverters.toJava;
/**
* @description:
* @author: shu
* @createDate: 2022/11/28 16:10
* @version: 1.0
*/
public class JClient {
private final ActorSystem system = ActorSystem.create("LocalSystem");
private final ActorSelection remoteDb;
public JClient(String remoteAddress) {
remoteDb = system.actorSelection("akka.tcp://akkademy@" +
remoteAddress + "/user/akkademy-db");
}
/**
* 缓存消息
* @param key
* @param value
* @return
*/
public CompletionStage set(String key, Object value) {
return toJava(new AskableActorSelection(remoteDb).ask(new SetRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));
}
/**
* 获取缓存消息
* @param key
* @return
*/
public CompletionStage get(String key){
return toJava(new AskableActorSelection(remoteDb).ask(new GetRequest(key), Timeout.apply(5000, TimeUnit.SECONDS)));
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import client.JClient;
import org.junit.Test;
import pojo.SetRequest;
import java.util.concurrent.CompletableFuture;
public class AkkademyDbTest {
/**
* 测试注意需要放在不同的两个项目进行测试,不然会Caused by: java.net.BindException: Address already in use: bind
* @throws Exception
*/
@Test
public void itShouldSetRecord() throws Exception {
JClient client = new JClient("127.0.0.1:2552");
client.set("123", 123);
Integer result = (Integer) ((CompletableFuture) client.
get("123")).get();
System.out.println("获取的结果:"+result);
assert(result == 123);
}
}
2.2.6 客服端配置文件
akka {
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
log-sent-messages = on
log-received-messages = on
}
}
2.2.7 测试
注意:需要放在两个不一样的配置项目中
- 服务端项目启动,等待请求的到来,注意配置文件
- 客服端项目启动,发送AKKA请求给服务端
- 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端
2.3 Scala 版本
2.3.1 效果
- 启动服务前
- 客户端启动
- 服务端收到请求
2.2.3 服务端Actor处理
import akka.actor.{Actor, Status}
import akka.event.{Logging, LoggingAdapter}
import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable
/**
* @description:
* @author: shu
* @createDate: 2022/11/28 12:41
* @version: 1.0
*/
class ScalaRequest extends Actor {
protected val log: LoggingAdapter = Logging.getLogger(context.system, this)
val map: mutable.Map[String, Object] = new mutable.HashMap[String, Object]
override def receive = {
case SetRequest(key, value) =>
log.info("received SetRequest - key: {} value: {}", key, value)
map.put(key, value)
sender() ! Status.Success
case GetRequest(key) =>
log.info("received GetRequest - key: {}", key)
val response: Option[Object] = map.get(key)
response match{
case Some(x) => sender() ! x
case None => sender() ! Status.Failure(new KeyNotFoundException(key))
}
case o => Status.Failure(new ClassNotFoundException)
}
}
case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
配置文件跟Java一样
2.3.4 客户端Actor处理
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
/**
* @description:
* @author: shu
* @createDate: 2022/12/1 11:43
* @version: 1.0
*/
class SClient(remoteAddress: String) {
private implicit val timeout = Timeout(2 seconds)
private implicit val system = ActorSystem("LocalSystem")
private val remoteDb = system.actorSelection(
s"akka.tcp://akkademy@$remoteAddress/user/akkademy-db")
def set(key: String, value: Object) = {
remoteDb ? SetRequest(key, value)
}
def get(key: String) = {
remoteDb ? GetRequest(key)
}
}
case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await
import scala.language.postfixOps
/**
* @description:
* @author: shu
* @createDate: 2022/12/1 11:44
* @version: 1.0
*/
object Main extends App {
val client = new SClient("127.0.0.1:2552")
client.set("123", new Integer(123))
val futureResult = client.get("123")
val result = Await.result(futureResult, 10 seconds)
}
2.3.5 测试
注意:需要放在两个不一样的配置项目中
- 服务端项目启动,等待请求的到来,注意配置文件
- 客服端项目启动,发送AKKA请求给服务端
- 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端