Akka 学习(四)Remote Actor


参考文章

文章系列

Akka 基础篇就此结束了,Akka基础篇主要介绍Akka的基本概念与一些基本术语,使用方式
代码:https://github.com/Eason-shu/Akka

一 介绍

1.1 Remote Actor

虽然Akka在单机上可以运行上百万的Actor,但出于容错、负载均衡、灰度发布、提高并行度等等原因,我们仍然需要能在多个不同的服务器上运行Actor。所以Akka提供了akka-remoting的扩展包,屏蔽底层网络传输的细节,让上层以及其简单的方式使用远程的Actor调度。
Akka Remoting 是一个以点对点方式连接 actor 系统的通信模块,它是 Akka 集群的基础。远程处理的设计由两个(相关的)设计决策驱动:

  1. 相关系统之间的通信是对称的:如果系统 A 可以连接到系统 B,那么系统 B 也必须能够独立连接到系统 A。
  2. 通信系统的角色在连接模式方面是对称的:没有只接受连接的系统,也没有只发起连接的系统。

这些决定的结果是不可能安全地创建具有预定义角色的纯客户端-服务器设置(违反假设 2)。对于客户端-服务器设置,最好使用 HTTP 或 Akka I/O。
重要提示:使用涉及网络地址转换、负载平衡器或 Docker 容器的设置违反了假设 1,除非在网络配置中采取额外步骤以允许相关系统之间的对称通信。在这种情况下,Akka 可以配置为绑定到与用于在 Akka 节点之间建立连接的地址不同的网络地址。请参阅NAT 后面或 Docker 容器中的 Akka

1.2 适用场景

  • remoting的存在其实是为akka cluster做底层支持的,通常并不会直接去使用remoting的包。但为了了解cluster的底层原理,还是有必要看下remoting。
  • 同时,remoting被设计为Peer-to-Peer而非Client-Server,所以不适用于基于后者的系统开发,比如我们无法在一个provider为local的Actor里去查找一个remote actor发送消息,必须两者均为remote actor,才满足对等。

1.3 踩坑点

  • Akka版本需要与Scala版本匹配

maven仓库地址:
image.png
注意版本匹配,不然会疯狂报错,运行不起来
我的版本:
scala:2.13.0
Akka版本:2.13

  • 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hc</groupId>
  <artifactId>ActorDemo03</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <name>ActorDemo03</name>
  <description>Demo project for Spring Boot</description>

  <properties>
    <java.version>1.8</java.version>
    <scala.version>2.11.7</scala.version>
  </properties>



  <dependencies>

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_2.13</artifactId>
      <version>2.5.23</version>
    </dependency>


    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-remote_2.13</artifactId>
      <version>2.5.23</version>
    </dependency>

    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-testkit_2.13</artifactId>
      <version>2.5.23</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang.modules</groupId>
      <artifactId>scala-java8-compat_2.11</artifactId>
      <version>1.0.2</version>
    </dependency>







    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.testng</groupId>
      <artifactId>testng</artifactId>
      <version>RELEASE</version>
      <scope>compile</scope>
    </dependency>


  </dependencies>



  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>



</project>

二 实战

2.1 需求

epub_22651331_22.jpg

  • 我们还将创建一个数据库客户端,用于展示如何请求服务器,以及如何从远程Actor中获取Future。
  • 服务器端的服务接收到客户端的请求后将返回Future。
  • 这样我们就已经编写了一个可以使用的键值存储数据库(和redis很类似)以及一个可以使用该数据库的远程客户端。

2.2 Java 版本

2.2.1 效果图

  • 服务端启动前

image.png

  • 客服端启动

image.png

  • 服务端收到请求后

image.png

2.2.2 实体类

  • SetRequest
package pojo;

import java.io.Serializable;

/**
 * @description: 设置消息
 * @author: shu
 * @createDate: 2022/11/28 11:51
 * @version: 1.0
 */
public class SetRequest implements Serializable {
    public final String key;
    public final Object value;
    public SetRequest(String key, Object value) {
        this.key = key;
        this.value = value;
    }

}

  • GetRequest
package pojo;

import java.io.Serializable;

/**
 * @description: 获取消息
 * @author: shu
 * @createDate: 2022/11/28 11:52
 * @version: 1.0
 */
public class GetRequest implements Serializable {
    public final String key;
    public GetRequest(String key) {
        this.key = key;
    }
}
  • 错误消息
package pojo;

import java.io.Serializable;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/11/28 11:52
 * @version: 1.0
 */
public class KeyNotFoundException extends Exception implements
        Serializable {
    public final String key;
    public KeyNotFoundException(String key) {
        this.key = key;
    }
}

2.2.3 服务端Actor 处理


import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import pojo.GetRequest;
import pojo.KeyNotFoundException;
import pojo.SetRequest;

import java.util.HashMap;
import java.util.Map;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/11/28 12:31
 * @version: 1.0
 */
public class RequestActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);


    protected final Map<String, Object> map = new HashMap<>();

    @Override
    public void preStart() throws Exception {
        log.info("ToFindRemoteActor is starting");
    }

    @Override
    public Receive createReceive() {
        return  ReceiveBuilder.create()
                // 设置消息
                .match(SetRequest.class, message -> {
                    // 打印消息
                    log.info("Received Set request: {}", message.key);
                    // 缓存消息
                    map.put(message.key, message.value);
                    // 回应消息
                    sender().tell(new Status.Success(message.key), self());
                })
                // 得到消息
                .match(GetRequest.class, message -> {
                    // 打印日志
                    log.info("Received Get request: {}", message.key);
                    // 获取消息
                    Object value = (Object) map.get(message.key);
                    Object response = (value!= null)
                            ? value
                            : new Status.Failure(new KeyNotFoundException(message.key));
                    // 响应消息
                    sender().tell(response, self());
                })
                // 未找到消息
                .matchAny(o ->
                        sender().tell(new Status.Failure(new ClassNotFoundException()), self())
                )
                .build();

    }


    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {

//        Config config = ConfigFactory.parseString(
//                        "akka.remote.netty.tcp.port=" + 2551)
//                .withFallback(ConfigFactory.load("application.conf"));

        // Create an Akka system
        ActorSystem system = ActorSystem.create("akkademy");

        // Create an actor
        ActorRef ref = system.actorOf(Props.create(RequestActor.class), "akkademy-db");

        System.out.println(ref);

    }



}

2.2.4 服务端配置文件

image.png

akka {
  stdout-loglevel = "DEBUG"
  loglevel = "DEBUG"
  actor {
  provider = "akka.remote.RemoteActorRefProvider"
}
remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
  hostname = "127.0.0.1"
}
}
log-sent-messages = on
log-received-messages = on
}

2.2.5 客服端Actor处理

package client;



import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.GetRequest;
import pojo.SetRequest;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static scala.compat.java8.FutureConverters.toJava;


/**
* @description:
* @author: shu
* @createDate: 2022/11/28 16:10
* @version: 1.0
*/
public class JClient {
    private final ActorSystem system = ActorSystem.create("LocalSystem");
    private final ActorSelection remoteDb;

    public JClient(String remoteAddress) {
        remoteDb = system.actorSelection("akka.tcp://akkademy@" +
                                         remoteAddress + "/user/akkademy-db");
    }

    /**
* 缓存消息
* @param key
* @param value
* @return
*/
    public CompletionStage set(String key, Object value) {
        return toJava(new AskableActorSelection(remoteDb).ask(new SetRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));
    }

    /**
* 获取缓存消息
* @param key
* @return
*/
    public CompletionStage get(String key){
        return   toJava(new AskableActorSelection(remoteDb).ask(new GetRequest(key), Timeout.apply(5000, TimeUnit.SECONDS)));
    }

}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import client.JClient;
import org.junit.Test;
import pojo.SetRequest;

import java.util.concurrent.CompletableFuture;
public class AkkademyDbTest {

    /**
     * 测试注意需要放在不同的两个项目进行测试,不然会Caused by: java.net.BindException: Address already in use: bind
     * @throws Exception
     */
    @Test
    public void itShouldSetRecord() throws Exception {
            JClient client = new JClient("127.0.0.1:2552");
                client.set("123", 123);
                Integer result = (Integer) ((CompletableFuture) client.
                        get("123")).get();
                System.out.println("获取的结果:"+result);
                assert(result == 123);

        }


}

2.2.6 客服端配置文件

akka {
  stdout-loglevel = "DEBUG"
  loglevel = "DEBUG"
  actor {
  provider = "akka.remote.RemoteActorRefProvider"
}
remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
  hostname = "127.0.0.1"
  port = 0
}
log-sent-messages = on
log-received-messages = on
}
}

2.2.7 测试

注意:需要放在两个不一样的配置项目中

  1. 服务端项目启动,等待请求的到来,注意配置文件
  2. 客服端项目启动,发送AKKA请求给服务端
  3. 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端

2.3 Scala 版本

2.3.1 效果

  • 启动服务前

image.png

  • 客户端启动

image.png

  • 服务端收到请求

image.png

2.2.3 服务端Actor处理

import akka.actor.{Actor, Status}
import akka.event.{Logging, LoggingAdapter}

import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable

/**
 * @description:
 * @author: shu
 * @createDate: 2022/11/28 12:41
 * @version: 1.0
 */
class ScalaRequest extends Actor {
  protected val log: LoggingAdapter = Logging.getLogger(context.system, this)
   val map: mutable.Map[String, Object] = new mutable.HashMap[String, Object]

  override def receive = {

    case SetRequest(key, value) =>
      log.info("received SetRequest - key: {} value: {}", key, value)
      map.put(key, value)
      sender() ! Status.Success

    case GetRequest(key) =>
      log.info("received GetRequest - key: {}", key)
      val response: Option[Object] = map.get(key)

      response match{
        case Some(x) => sender() ! x
        case None => sender() ! Status.Failure(new KeyNotFoundException(key))
      }

    case o => Status.Failure(new ClassNotFoundException)
  }

}


case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception

配置文件跟Java一样

2.3.4 客户端Actor处理

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/1 11:43
 * @version: 1.0
 */
class SClient(remoteAddress: String) {
  private implicit val timeout = Timeout(2 seconds)
  private implicit val system = ActorSystem("LocalSystem")
  private val remoteDb = system.actorSelection(
    s"akka.tcp://akkademy@$remoteAddress/user/akkademy-db")
  def set(key: String, value: Object) = {
    remoteDb ? SetRequest(key, value)
  }
  def get(key: String) = {
    remoteDb ? GetRequest(key)
  }
}



case class SetRequest(key: String, value: Object)
case class GetRequest(key: String)
case class KeyNotFoundException(key: String) extends Exception
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await
import scala.language.postfixOps

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/1 11:44
 * @version: 1.0
 */

object Main extends App {
  val client = new SClient("127.0.0.1:2552")
  client.set("123", new Integer(123))
  val futureResult = client.get("123")
  val result = Await.result(futureResult, 10  seconds)

}

2.3.5 测试

注意:需要放在两个不一样的配置项目中

  1. 服务端项目启动,等待请求的到来,注意配置文件
  2. 客服端项目启动,发送AKKA请求给服务端
  3. 服务端收到客服端的请求,缓存请求数据,把缓存结果返回给客服端