精选文章 akka 集群分片

akka 集群分片

作者:weixin_30888027 时间: 2019-07-17 11:08:00
weixin_30888027 2019-07-17 11:08:00


akka 集群 Sharding分片

分片上下级结构

集群(多台节点机) —> 每台节点机(1个片区) —> 每个片区(多个分片) —> 每个分片(多个实体)

akka 集群分片1

 

实体: 分片管理的 Actor
Shards :分片是统一管理的一组实体
ShardRegion : 片区,部署在每个集群节点上,对分片进行管理
ShardCoordinator : cluster-singleton 集群单例, 决定分片属于哪个片区


工作原理

ShardRegion 在节点上启动

带实体ID的消息--> 片区ShardRegion ,请求分片位置-->ShardCoordinator-->决定哪个ShardRegion将拥有Shard-->

ShardRegion 确认请求并创建 Shard supervisor 做为子actor -->shard actor 创建 entity -->ShardRegion和Shard 定位entity

 

分区的分片规则

所有片区组成分布式分片管理层,带实体ID的消息直接发给本机片区,分片管理层路由消息, ShardRegion创建需要提供基于ShardRegion.MessageExtractor的实现 ,必须提供从消息抽取分片和实体ID的函数

 

示例实现

package shard

import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.japi.Option
import akka.japi.pf.ReceiveBuilder
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.io.Serializable
import java.time.Clock.system
import java.time.Clock.system
import java.util.*
import java.time.Clock.system


/**
 * Created by: tankx
 * Date: 2019/7/16
 * Description: 集群分片示例
 */


//分布到集群环境中
class DogActor : AbstractActor() {

    var log = LoggerFactory.getLogger(DogActor::class.java)

    override fun createReceive(): Receive {
        return ReceiveBuilder.create().matchAny(this::receive).build()
    }

    fun receive(obj: Any) {

        log.info("收到消息: $obj")
        if (obj is DogMsg) {
            log.info("${obj.id} ${obj.msg}")
        }

    }

}

//定义消息(必须带有实体ID进行分片)
data class DogMsg(var id: Int, var msg: String) : Serializable

//分片规则
class ShardExtractor : ShardRegion.MessageExtractor {
    //提取实体ID,实体对应的actor
    override fun entityId(message: Any?): String {
        if (message is DogMsg) {
            return message.id.toString()
        } else {
            throw RuntimeException("无法识别消息类型 $message")
        }
    }

    //根据实体ID,计算出对应分片ID
    override fun shardId(message: Any?): String {
        //var numberOfShards: Int = 10 //简单的分区数取模 return message.id%numberOfShards
        if (message is DogMsg) {
            //return (message.id % 10).toString()
            return message.id.toString()
        } else {
            throw RuntimeException("无法识别消息类型 $message")
        }
    }

    //对消息可进行拆封操作
    override fun entityMessage(message: Any): Any {
        return message
    }

}
//分区停止时会派发的消息类型
object handOffStopMessage

fun createActorSystem(port: Int): ActorSystem {
    val config = ConfigFactory.parseString(
        "akka.remote.artery.canonical.port=$port"
    ).withFallback(
        ConfigFactory.load()
    )

    var actorSystem = ActorSystem.create("custerA", config);

    return actorSystem

}

fun startShardRegion(port: Int) {


    var actorSystem = createActorSystem(port)

    val settings = ClusterShardingSettings.create(actorSystem)//.withRole("ClusterShardRole")

    val shardReg = ClusterSharding.get(actorSystem).start(
        "dogShard",
        Props.create(DogActor::class.java),
        settings,
        ShardExtractor(),
        ShardCoordinator.LeastShardAllocationStrategy(10, 1),
        handOffStopMessage
    )

    for (i in 1..10) {

        shardReg.tell(DogMsg(i, " wang"), ActorRef.noSender())

        Thread.sleep(3000)
    }

}


fun shardRegProxy() {

    var actorSystem = createActorSystem(2663)

    //startProxy 代理模式,即它不会承载任何实体本身,但知道如何将消息委托到正确的位置
    ClusterSharding.get(actorSystem)
        .startProxy("dogShard", Optional.empty(), ShardExtractor())
        .let { println("  shard proxy $it started.") }

    Thread.sleep(3000)

    var shardReg = ClusterSharding.get(actorSystem).shardRegion("dogShard")


    for (i in 1..100) {

        shardReg.tell(DogMsg(i, "C wang"), ActorRef.noSender())

        Thread.sleep(1500)
    }
}

 

再分别启动入口

fun main() {
    startShardRegion(2661)
}
fun main() {
    startShardRegion(2662)
}
fun main() {

    shardRegProxy()

}

 

配置文件:

akka {
  actor {
    provider = "cluster"
  }

  # For the sample, just bind to loopback and do not allow access from the network
  # the port is overridden by the logic in main class
  remote.artery {
    enabled = on
    transport = tcp
    canonical.port = 0
    canonical.hostname = 127.0.0.1
  }

  cluster {
    seed-nodes = [
      "akka://custerA@127.0.0.1:2661",
      "akka://custerA@127.0.0.1:2662"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    auto-down-unreachable-after = 10s
  }
}

 

注意

以上示例需在同属一个集群,消息才能正常中转,所以一定要保障 ActorSystem.create(name,config) name 为一致!(调了半天消息一直没有发送成功,原来是这里的问题,KAO!)

同一个集群同一个ActorSystem name!

不然会报异常:

No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed

 

转载于:https://www.cnblogs.com/tankaixiong/p/11204329.html

勿删,copyright占位
分享文章到微博
分享文章到朋友圈

您可能感兴趣

  • Redis是用C语言开发的一个开源的高性能键值对(key-value)数据库

    字符串类型--string redis的字符串是二进制安全的,什么是二进制安全?简单理解就是存入什么数据取出的还是什么数据。 商品编号、订单号采用string的递增数字特性生成。 散列类型--hash hash叫散列类型,它提供了字段和字段值的映射。字段值只能是字符串类型,不支持散列类型、集合类型等其它类型。 商品信息(应用) 商品id、商品名称、商品描述、商品库存、商品好评 列表类型--l...

  • 对云桌面、桌面云、私有云的一些看法

    最近整理了一下自2017年以来自己对私有云服务平台构建的一些认识,并以刚刚为某后勤保障部队单位提供的小规模私有云服务平台为例阐述。 私有云平台、云计算服务与虚拟化: 我个人认为这三个名称分别指涉了三个不同的事物: 私有云平台:在实践中,私有云平台时包含了服务端软硬件集成和客户端软硬件集成的一整套特定业务需求场景的、已经处于落地实施阶段的解决方案项目。 比如某后勤保障部队为简化征兵流程、节省相...

  • 210道面试官必问经典面试题总结分享|Java+JVM+数据库+算法+Spring+中间件+设计模式超详细整理

    “金九银十”的秋招热潮已经来了,由于疫情影响、今年的秋招来的格外早。小编从今天六月份开始备战,不负众望的成功拿下了一些大厂的offer。经过这么多次的面试,凭借自己的面试成功经验和这两个月的准备,小编整理了一份面试清单分享给大家,希望能给和我一样为了秋招奋战多时的码农亲人们一点帮助(java方向),觉得有帮助的同学可以转发点个赞哦~~祝大家面试顺利,一举拿下大厂offer! 一、JAVA基础...

  • PHP高并发和大流量怎么解决?(life)

    PHP高并发和大流量的解决方案 一 高并发的概念 在互联网时代,并发,高并发通常是指并发访问。也就是在某个时间点,有多少个访问同时到来。 我的官方群点击此处。 二 高并发架构相关概念 1、QPS (每秒查询率) : 每秒钟请求或者查询的数量,在互联网领域,指每秒响应请求数(指HTTP请求) 2、PV(Page View):综合浏览量,即页面浏览量或者点击量,一个访客在24小时内访问的页面数量...

  • 抢滩直播基地:有人黯然退场,有人闪亮进场,还有人只当跳板

    图片来源于网络 文|陈小江 来源 | 螳螂财经(ID:TanglangFin) 一支穿云箭,千军万马来相见;两副忠义胆,刀山火海提命现。 当年因星爷《功夫》爆红网络的经典台词,正成为如今火爆天际的直播经济的真实写照。 受疫情影响,众多城市和平台纷纷加码直播电商,各路英雄齐聚直播带货——明星、企业家、政府官员、学者、网红和草根接连登场,全民直播迅速燎原。正是众人拾柴,将直播经济这把火越烧越旺。...

  • 为什么更多人选择FastReport .NET?FastReport开源代码不香吗?两者有何区别?

    报表生成器FastReport .NET是适用于.NET Core 3,ASP.NET,MVC和Windows窗体的全功能报告库。使用FastReport .NET,您可以创建独立于应用程序的.NET报告。 开源在我们这个时代非常流行。软件市场的全球巨头对其普及起到了相当大的作用。毕竟,这些公司正在开发高端商业软件,而它们的开源项目是基于经过验证的解决方案和最佳实践的。 FastReport...

  • 我用 Redis 干掉了一摞简历

    如果你是一位后端工程师,面试时八成会被问到 Redis,特别是那些大型互联网公司,不仅要求面试者能简单使用 Redis,还要深入理解其底层实现原理,具备解决常见问题的能力。可以说,熟练使用 Redis 就是后端工程师的必备技能。 但我发现,在工作或面试时,大家还是会有这样那样的疑问,比如:如何用 Redis 实现分布式锁?Redis 怎样处理过期键?缓存雪崩、穿透、热点问题怎么解决?持久化、...

  • 【Python基础】如何编写简洁美观的Python代码

    作者 | ANIRUDDHA BHANDARI 编译 | VK 来源 | Analytics Vidhya 概述 Python风格教程将使你能够编写整洁漂亮的Python代码 在这个风格教程中学习不同的Python约定和Python编程的其他细微差别 介绍 你有没有遇到过一段写得很糟糕的Python代码?我知道你们很多人都会点头的。 编写代码是数据科学家或分析师角色的一部分。另一方面,编写漂...

华为云40多款云服务产品0元试用活动

免费套餐,马上领取!
CSDN

CSDN

中国开发者社区CSDN (Chinese Software Developer Network) 创立于1999年,致力为中国开发者提供知识传播、在线学习、职业发展等全生命周期服务。