Akka Cluster Sharding Sharding
Sharding upper and lower structure
Cluster (multiple node machines) —> Each node machine (1 zone) —> Each area (multiple pieces) —> Each piece (multiple entities)
Entity: Shards managed Actor
Shards: Shards are a group of entities managed uniformly
ShardRegion: Shards are deployed on each cluster node to manage shards
ShardCoordinator: cluster-singleton cluster singleton, which determines which shard belongs to
Working principle
< p>ShardRegion is started on the node
Message with entity ID–>ShardRegion, request the location of the shard–>ShardCoordinator–>Decide which ShardRegion will own the Shard–>
< p>ShardRegion confirms the request and creates a Shard supervisor as a child actor –>shard actor creates entity –>ShardRegion and Shard locate entity
Sharding rules for partitions
All shards form a distributed sharding management layer. Messages with entity IDs are sent directly to the local shards. The sharding management layer routes messages. ShardRegion creation needs to provide an implementation based on ShardRegion.MessageExtractor, and it must provide for extracting points from messages. Function of slice and entity ID
Sample implementation
package shard
import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.japi.Option
import akka.japi.pf.ReceiveBuilder
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.io.Serializable
import java.time.Clock.system
import java.time.Clock.system
import java.util.*
import java.time.Clock.system
/**
* Created by: tankx
* Date: 2019/7/16
* Description: Cluster fragmentation example
*/
//distributed to a cluster environment
class DogActor: AbstractActor() {
var log = LoggerFactory.getLogger(DogActor::class.java)
override fun createReceive(): Receive {
return ReceiveBuilder.create().matchAny(this ::receive).build()
}
fun receive(obj: Any) {
log.info("Received message: $obj")
if (obj is DogMsg) {
log.info("${obj.id} ${obj.msg}")
}
}
}
//Define the message (must be fragmented with an entity ID)
data class DogMsg(var id: Int, var msg: String): Serializable
//Sharding rules
class ShardExtractor: ShardRegion.MessageExtractor {
//Extract the entity ID, the actor corresponding to the entity
override fun entityId(message: Any?): String {
if (message is DogMsg) {
return message.id.toString()
} else {
throw RuntimeException("Unrecognized message type $message")
}
}
//According to the entity ID, calculate the corresponding shard ID span>
override fun shardId(message: Any?): String {
//var numberOfShards: Int = 10 //simple partition number Modulo return message.id%numberOfShards
if (message is DogMsg) {
//return (message.id% 10).toString()
return message.id.toString()
} else {
throw RuntimeException("Unrecognized message type $message")
}
}
//The message can be unpacked
override fun entityMessage(message: Any): Any {
return message
}
}
//The type of message that will be sent when the partition stops
object handOffStopMessage
fun createActorSystem(port: Int): ActorSystem {
val config = ConfigFactory.parseString(
"akka.remote.artery.canonical.port=$port"
).withFallback(
ConfigFactory.load()
)
var actorSystem = ActorSystem.create("custerA", config);
return actorSystem
}
fun startShardRegion(port: Int) {
var actorSystem = createActorSystem(port)
val settings = ClusterShardingSettings.create(actorSystem)//.withRole( "ClusterShardRole")
val shardReg = ClusterSharding.get(actorSystem).start(
"dogShard",
Props.create(DogActor::class.java),
settings,
ShardExtractor(),
ShardCoordinator.LeastShardAllocationStrategy(10, 1),
handOffStopMessage
)
for (i in 1..10) {
shardReg.tell(DogMsg(i, " wang"), ActorRef.noSender())
Thread.sleep(3000)
}
}
fun shardRegProxy() {
var actorSystem = createActorSystem(2663)
//startProxy proxy mode, that is, it does not host any entity itself , But knows how to delegate the message to the right place
ClusterSharding.get(actorSystem)
.startProxy("dogShard", Optional.empty(), ShardExtractor())
.let {println(" shard proxy $it started.")}
Thread.sleep(3000)
var shardReg = ClusterSharding.get(actorSystem).shardRegion("dogShard")
for (i in 1..100) {
shardReg.tell(DogMsg(i, "C wang"), ActorRef.noSender())
Thread.sleep(1500)
}
}
Start the entry separately
fun main() {
startShardRegion(2661)
}
fun main() {
startShardRegion(2662)
}
fun main() {
shardRegProxy()
}
Configuration file:
akka {
actor {
provider = "cluster"
}
# For the sample, just bind to loopback and do not allow access from the network
# the port is overridden by the logic in main class
remote.artery {
enabled = on
transport = tcp
canonical.port = 0
canonical.hostname = 127.0.0.1
}
cluster {
seed-nodes< /strong> = [
"akka://[email protected]:2661",
"akka://[email protected]:2662"]
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
auto-down-unreachable-after = 10s
}
}
Note
The above examples must belong to the same cluster. Messages can be transferred normally, so we must ensure that ActorSystem.create(name,config) name is consistent! (After adjusting for a long time, the message has not been sent successfully. It turned out to be the problem here, KAO!)
The same ActorSystem name in the same cluster!
Otherwise, an exception will be reported:
No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed
package shard
import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.japi.Option
import akka.japi.pf.ReceiveBuilder
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.io.Serializable
import java.time.Clock.system
import java.time.Clock.system
import java.util.*
import java.time.Clock.system
/**
* Created by: tankx
* Date: 2019/7/16
* Description: Cluster fragmentation example
*/
//distributed to a cluster environment
class DogActor: AbstractActor() {
var log = LoggerFactory.getLogger(DogActor::class.java)
override fun createReceive(): Receive {
return ReceiveBuilder.create().matchAny(this ::receive).build()
}
fun receive(obj: Any) {
log.info("Received message: $obj")
if (obj is DogMsg) {
log.info("${obj.id} ${obj.msg}")
}
}
}
//Define the message (must be fragmented with an entity ID)
data class DogMsg(var id: Int, var msg: String): Serializable
//Sharding rules
class ShardExtractor: ShardRegion.MessageExtractor {
//Extract the entity ID, the actor corresponding to the entity
override fun entityId(message: Any?): String {
if (message is DogMsg) {
return message.id.toString()
} else {
throw RuntimeException("Unrecognized message type $message")
}
}
//According to the entity ID, calculate the corresponding shard ID span>
override fun shardId(message: Any?): String {
//var numberOfShards: Int = 10 //simple partition number Modulo return message.id%numberOfShards
if (message is DogMsg) {
//return (message.id% 10).toString()
return message.id.toString()
} else {
throw RuntimeException("Unrecognized message type $message")
}
}
//The message can be unpacked
override fun entityMessage(message: Any): Any {
return message
}
}
//The type of message that will be sent when the partition stops
object handOffStopMessage
fun createActorSystem(port: Int): ActorSystem {
val config = ConfigFactory.parseString(
"akka.remote.artery.canonical.port=$port"
).withFallback(
ConfigFactory.load()
)
var actorSystem = ActorSystem.create("custerA", config);
return actorSystem
}
fun startShardRegion(port: Int) {
var actorSystem = createActorSystem(port)
val settings = ClusterShardingSettings.create(actorSystem)//.withRole( "ClusterShardRole")
val shardReg = ClusterSharding.get(actorSystem).start(
"dogShard",
Props.create(DogActor::class.java),
settings,
ShardExtractor(),
ShardCoordinator.LeastShardAllocationStrategy(10, 1),
handOffStopMessage
)
for (i in 1..10) {
shardReg.tell(DogMsg(i, " wang"), ActorRef.noSender())
Thread.sleep(3000)
}
}
fun shardRegProxy() {
var actorSystem = createActorSystem(2663)
//startProxy proxy mode, that is, it does not host any entity itself , But knows how to delegate the message to the right place
ClusterSharding.get(actorSystem)
.startProxy("dogShard", Optional.empty(), ShardExtractor())
.let {println(" shard proxy $it started.")}
Thread.sleep(3000)
var shardReg = ClusterSharding.get(actorSystem).shardRegion("dogShard")
for (i in 1..100) {
shardReg.tell(DogMsg(i, "C wang"), ActorRef.noSender())
Thread.sleep(1500)
}
}
fun main() {
startShardRegion(2661)
}
fun main() {
startShardRegion(2662)
}
fun main() {
shardRegProxy()
}
akka {
actor {
provider = "cluster"
}
# For the sample, just bind to loopback and do not allow access from the network
# the port is overridden by the logic in main class
remote.artery {
enabled = on
transport = tcp
canonical.port = 0
canonical.hostname = 127.0.0.1
}
cluster {
seed-nodes< /strong> = [
"akka://[email protected]:2661",
"akka://[email protected]:2662"]
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
auto-down-unreachable-after = 10s
}
}
No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed