精选文章 akka 集群分片

akka 集群分片

作者:weixin_30888027 时间: 2021-02-05 10:08:34
weixin_30888027 2021-02-05 10:08:34
【摘要】akka 集群 Sharding分片 
  分片上下级结构 
  集群(多台节点机) —> 每台节点机(1个片区) —> 每个片区(多个分片) —> 每个分片(多个实体) 
  
   
 实体: 分片管理的 ActorShards :分片是统一管理的一组实体ShardRegion : 片区,部署在每个集群节点上,对分片进行管理ShardCoordinator : cluster-singl...


akka 集群 Sharding分片

分片上下级结构

集群(多台节点机) —> 每台节点机(1个片区) —> 每个片区(多个分片) —> 每个分片(多个实体)

akka 集群分片1

 

实体: 分片管理的 Actor
Shards :分片是统一管理的一组实体
ShardRegion : 片区,部署在每个集群节点上,对分片进行管理
ShardCoordinator : cluster-singleton 集群单例, 决定分片属于哪个片区


工作原理

ShardRegion 在节点上启动

带实体ID的消息--> 片区ShardRegion ,请求分片位置-->ShardCoordinator-->决定哪个ShardRegion将拥有Shard-->

ShardRegion 确认请求并创建 Shard supervisor 做为子actor -->shard actor 创建 entity -->ShardRegion和Shard 定位entity

 

分区的分片规则

所有片区组成分布式分片管理层,带实体ID的消息直接发给本机片区,分片管理层路由消息, ShardRegion创建需要提供基于ShardRegion.MessageExtractor的实现 ,必须提供从消息抽取分片和实体ID的函数

 

示例实现

package shard

import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.japi.Option
import akka.japi.pf.ReceiveBuilder
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.io.Serializable
import java.time.Clock.system
import java.time.Clock.system
import java.util.*
import java.time.Clock.system


/**
 * Created by: tankx
 * Date: 2019/7/16
 * Description: 集群分片示例
 */


//分布到集群环境中
class DogActor : AbstractActor() { var log = LoggerFactory.getLogger(DogActor::class.java)
 override fun createReceive(): Receive { return ReceiveBuilder.create().matchAny(this::receive).build() } fun receive(obj: Any) { log.info("收到消息: $obj") if (obj is DogMsg) { log.info("${obj.id} ${obj.msg}") } }

}

//定义消息(必须带有实体ID进行分片)
data class DogMsg(var id: Int, var msg: String) : Serializable

//分片规则
class ShardExtractor : ShardRegion.MessageExtractor { //提取实体ID,实体对应的actor override fun entityId(message: Any?): String { if (message is DogMsg) { return message.id.toString() } else { throw RuntimeException("无法识别消息类型 $message") } } //根据实体ID,计算出对应分片ID override fun shardId(message: Any?): String { //var numberOfShards: Int = 10 //简单的分区数取模 return message.id%numberOfShards if (message is DogMsg) { //return (message.id % 10).toString() return message.id.toString() } else { throw RuntimeException("无法识别消息类型 $message") } } //对消息可进行拆封操作
 override fun entityMessage(message: Any): Any { return message }

}
//分区停止时会派发的消息类型
object handOffStopMessage

fun createActorSystem(port: Int): ActorSystem { val config = ConfigFactory.parseString( "akka.remote.artery.canonical.port=$port" ).withFallback( ConfigFactory.load() ) var actorSystem = ActorSystem.create("custerA", config); return actorSystem

}

fun startShardRegion(port: Int) { var actorSystem = createActorSystem(port) val settings = ClusterShardingSettings.create(actorSystem)//.withRole("ClusterShardRole")
 val shardReg = ClusterSharding.get(actorSystem).start( "dogShard", Props.create(DogActor::class.java), settings, ShardExtractor(), ShardCoordinator.LeastShardAllocationStrategy(10, 1), handOffStopMessage ) for (i in 1..10) { shardReg.tell(DogMsg(i, " wang"), ActorRef.noSender()) Thread.sleep(3000) }

}


fun shardRegProxy() { var actorSystem = createActorSystem(2663) //startProxy 代理模式,即它不会承载任何实体本身,但知道如何将消息委托到正确的位置
 ClusterSharding.get(actorSystem) .startProxy("dogShard", Optional.empty(), ShardExtractor()) .let { println("  shard proxy $it started.") } Thread.sleep(3000) var shardReg = ClusterSharding.get(actorSystem).shardRegion("dogShard") for (i in 1..100) { shardReg.tell(DogMsg(i, "C wang"), ActorRef.noSender()) Thread.sleep(1500) }
}

 

再分别启动入口

fun main() { startShardRegion(2661)
}
fun main() { startShardRegion(2662)
}
fun main() { shardRegProxy()

}

 

配置文件:

akka {
  actor { provider = "cluster"
  } # For the sample, just bind to loopback and do not allow access from the network
  # the port is overridden by the logic in main class
  remote.artery { enabled = on transport = tcp canonical.port = 0 canonical.hostname = 127.0.0.1
  } cluster { seed-nodes = [ "akka://custerA@127.0.0.1:2661", "akka://custerA@127.0.0.1:2662"] # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. auto-down-unreachable-after = 10s
  }
}

 

注意

以上示例需在同属一个集群,消息才能正常中转,所以一定要保障 ActorSystem.create(name,config) name 为一致!(调了半天消息一直没有发送成功,原来是这里的问题,KAO!)

同一个集群同一个ActorSystem name!

不然会报异常:

No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed

 

转载于:https://www.cnblogs.com/tankaixiong/p/11204329.html

勿删,copyright占位
分享文章到微博
分享文章到朋友圈

上一篇:Vue-cli+axios: 跨域

下一篇:docker swarm basic usage

您可能感兴趣

  • WEB+windows集群

    建立windows server 2003群集后,就可配置Windows Server 2003 IIS的群集服务器。  1.分别依次启动DC、Node1、Node2,在Node1和Node2上分别安装好的Internet 信息服务(IIS),在Node1上打开群集管理器,确认群集当前所有者是Node1。 如果不是请先通过“移动组”将当前所有者改为Node1。   2.然后在共享磁盘E上创建...

  • 单实例/多实例(N+1)(N+M)集群

    独特的集群配置   SQL Server集群具有如下4种独特的配置:   · 单实例——正式名称为活动/被动式——意思是,客户应用程序作为一个单个的虚拟服务器,这个节点是在线的,而另外一个则是处于备用的状态。在线的节点(叫做主节点)拥有所有的资源,而备用的节点(叫做次节点)是离线的,不拥有任何资源。   · 多实例——正式名称为活动/活动式——意思是,多个节点上的多个虚拟服务器可以同时在线,...

  • IP分片抓包实验!

    我们都知道,在以太网中,如果源主机向目标主机发送的数据包大于网关MTU,则该数据包在传输过程中会被IP协议分片传输,具体的分片过程是怎样的呢?我们通过协议分析软件抓包来进行详细的查看(抓包软件使用科来网络分析系统)。 因为以太网默认的MTU值为1500Byte,所以,为了达到分片的效果,我们应该传输大于1500Byte字节的数据包,才能使该数据包分段传输。 我们从本机(192.168.6.1...

  • 智能DNS解析+JBOSS集群(一)

    智能DNS+JBOSS集群 最近公司准备新上一个系统,领导要求自己做智能DNS服务器进行域名智能解析以解决南北互联的问题,同时还要考虑大并发,以下是前期规划的网络拓扑图(操作系统为Centos 5.2): 一、        智能DNS设置 1、安装openssl tar -zxvf openssl-0.9.8d.tar.gz cd openssl-0.9.8d ./config --p...

  • RHCM 使用gnbd仲裁磁盘 建立HA高可用性集群 【实验补充】

    【 建立基于仲裁的cluster 】实验环境:192.168.0.10 node10.zhou.com gnbd192.168.0.11 node11.zhou.com apache1192.168.0.12 node12.zhou.com apache21、创建gnbd共享存储。192.168.0.10: gnbd_serv -n[root@node10 /]# gnbd_expor...

  • 解决HIS集群系统的性能问题一例

    摘要   我院在2007年9月21日成功将HIS系统从win2000,oracle9.2.01升级到两台IBM P55A (操作系统AIX),oracle 10.2.01 RAC组成的并行集群系统,随着一个新大楼的启用,客户端的电脑从550台增加到了800多台,集群系统出现了严重的性能问题,在业务高峰期经常死机,经过半个月左右的调试,终于彻底解决了性能问题,满足了医院医院业务发展的要求。 ...

  • Oracle 11g Release 2 RAC集群系统在Solaris10_x86_64和Openfiler网络共享存储上的安装 (第1部分)...

    第1部分:总体介绍[@more@] Oracle 11g Release 2推出已经有一段时间了,在Oracle对Sun公司并购后又重新推出了for Solaris10_x86_64的版本,是针对普通x86平台的64位Solaris10的版本。在11g的第一版中取消了Solaris_x86的版本,所以之前主要是在Solaris中安装10g RAC系统或在RedHat Enterprise ...

  • Oracle 11g Release 2 RAC集群系统在Solaris10_x86_64和Openfiler网络共享存储上的安装 (第2部分)...

    第2部分:安装Solaris10操作系统[@more@] 一、安装Solaris10操作系统,在两个RAC节点的主机上安装,并在安装完毕后作相关的系统设置 1.安装Solaris10操作系统,安装的具体过程从略,只把要注意的几点一一指出 (1)主机名和IP地址的分配,在安装过程中要把2块网卡都选中,分别设置主机名和IP地址 每台主机都有2块网卡,把第一块作为公网使用的网卡,把第二块...

CSDN

CSDN

中国开发者社区CSDN (Chinese Software Developer Network) 创立于1999年,致力为中国开发者提供知识传播、在线学习、职业发展等全生命周期服务。

华为云40多款云服务产品0元试用活动

免费套餐,马上领取!
akka 集群分片介绍:华为云为您免费提供akka 集群分片在博客、论坛、帮助中心等栏目的相关文章,同时还可以通过 站内搜索 查询更多akka 集群分片的相关内容。| 移动地址: akka 集群分片 | 写博客