AKKA cluster shard

Akka Cluster Sharding Sharding

Sharding upper and lower structure

Cluster (multiple node machines) —> Each node machine (1 zone) —> Each area (multiple pieces) —> Each piece (multiple entities)

Share pictures

Entity: Shards managed Actor
Shards: Shards are a group of entities managed uniformly
ShardRegion: Shards are deployed on each cluster node to manage shards
ShardCoordinator: cluster-singleton cluster singleton, which determines which shard belongs to

Working principle

< p>ShardRegion is started on the node

Message with entity ID–>ShardRegion, request the location of the shard–>ShardCoordinator–>Decide which ShardRegion will own the Shard–>

< p>ShardRegion confirms the request and creates a Shard supervisor as a child actor –>shard actor creates entity –>ShardRegion and Shard locate entity

Sharding rules for partitions

All shards form a distributed sharding management layer. Messages with entity IDs are sent directly to the local shards. The sharding management layer routes messages. ShardRegion creation needs to provide an implementation based on ShardRegion.MessageExtractor, and it must provide for extracting points from messages. Function of slice and entity ID

Sample implementation

package  shard


import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.japi.Option
import akka.japi.pf.ReceiveBuilder
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.io.Serializable
import java.time.Clock.system
import java.time.Clock.system
import java.util.*
import java.time.Clock.system


/**
* Created by: tankx
* Date: 2019/7/16
* Description: Cluster fragmentation example
*/


//distributed to a cluster environment
class DogActor: AbstractActor() {

var log
= LoggerFactory.getLogger(DogActor::class.java)

override fun createReceive(): Receive {
return ReceiveBuilder.create().matchAny(this ::receive).build()
}

fun receive(obj: Any) {

log.info(
"Received message: $obj")
if (obj is DogMsg) {
log.info(
"${obj.id} ${obj.msg}")
}

}

}

//Define the message (must be fragmented with an entity ID)
data class DogMsg(var id: Int, var msg: String): Serializable

//Sharding rules
class ShardExtractor: ShardRegion.MessageExtractor {
//Extract the entity ID, the actor corresponding to the entity
override fun entityId(message: Any?): String {
if (message is DogMsg) {
return message.id.toString()
}
else {
throw RuntimeException("Unrecognized message type $message")
}
}

//According to the entity ID, calculate the corresponding shard ID span>
override fun shardId(message: Any?): String {
//var numberOfShards: Int
= 10 //simple partition number Modulo return message.id%numberOfShards
if (message is DogMsg) {
//
return (message.id% 10).toString()
return message.id.toString()
} else {
throw RuntimeException("Unrecognized message type $message")
}
}

//The message can be unpacked
override fun entityMessage(message: Any): Any {
return message
}

}
//The type of message that will be sent when the partition stops
object handOffStopMessage

fun createActorSystem(port: Int): ActorSystem {
val config
= ConfigFactory.parseString(
"akka.remote.artery.canonical.port=$port"
).withFallback(
ConfigFactory.load()
)

var actorSystem
= ActorSystem.create("custerA", config);

return actorSystem

}

fun startShardRegion(port: Int) {


var actorSystem
= createActorSystem(port)

val settings
= ClusterShardingSettings.create(actorSystem)//.withRole( "ClusterShardRole")

val shardReg
= ClusterSharding.get(actorSystem).start(
"dogShard",
Props.create(DogActor::
class.java),
settings,
ShardExtractor(),
ShardCoordinator.LeastShardAllocationStrategy(
10, 1),
handOffStopMessage
)

for (i in 1..10) {

shardReg.tell(DogMsg(i,
" wang"), ActorRef.noSender())

Thread.sleep(
3000)
}

}


fun shardRegProxy() {

var actorSystem
= createActorSystem(2663)

//startProxy proxy mode, that is, it does not host any entity itself , But knows how to delegate the message to the right place
ClusterSharding.get(actorSystem)
.startProxy(
"dogShard", Optional.empty(), ShardExtractor())
.let {println(
" shard proxy $it started.")}

Thread.sleep(
3000)

var shardReg
= ClusterSharding.get(actorSystem).shardRegion("dogShard")


for (i in 1..100) {

shardReg.tell(DogMsg(i,
"C wang"), ActorRef.noSender())

Thread.sleep(
1500)
}
}

Start the entry separately

fun main() {

startShardRegion(
2661)
}

fun main() {

startShardRegion(
2662)
}

fun main() {


shardRegProxy()

}

Configuration file:

akka {

actor {
provider
= "cluster"
}

# For the sample, just bind to loopback and
do not allow access from the network
# the port is overridden by the logic in main
class
remote.artery {
enabled
= on
transport
= tcp
canonical.port
= 0
canonical.hostname
= 127.0.0.1
}

cluster {
seed
-nodes< /strong> = [
"akka://[email protected]:2661",
"akka://[email protected]:2662"]

# auto downing is NOT safe
for production deployments.
# you may want to use it during development, read more about it in the docs.
auto
-down-unreachable-after = 10s
}
}

Note

The above examples must belong to the same cluster. Messages can be transferred normally, so we must ensure that ActorSystem.create(name,config) name is consistent! (After adjusting for a long time, the message has not been sent successfully. It turned out to be the problem here, KAO!)

The same ActorSystem name in the same cluster!

Otherwise, an exception will be reported:

No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed

package shard


import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.japi.Option
import akka.japi.pf.ReceiveBuilder
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.io.Serializable
import java.time.Clock.system
import java.time.Clock.system
import java.util.*
import java.time.Clock.system


/**
* Created by: tankx
* Date: 2019/7/16
* Description: Cluster fragmentation example
*/


//distributed to a cluster environment
class DogActor: AbstractActor() {

var log
= LoggerFactory.getLogger(DogActor::class.java)

override fun createReceive(): Receive {
return ReceiveBuilder.create().matchAny(this ::receive).build()
}

fun receive(obj: Any) {

log.info(
"Received message: $obj")
if (obj is DogMsg) {
log.info(
"${obj.id} ${obj.msg}")
}

}

}

//Define the message (must be fragmented with an entity ID)
data class DogMsg(var id: Int, var msg: String): Serializable

//Sharding rules
class ShardExtractor: ShardRegion.MessageExtractor {
//Extract the entity ID, the actor corresponding to the entity
override fun entityId(message: Any?): String {
if (message is DogMsg) {
return message.id.toString()
}
else {
throw RuntimeException("Unrecognized message type $message")
}
}

//According to the entity ID, calculate the corresponding shard ID span>
override fun shardId(message: Any?): String {
//var numberOfShards: Int
= 10 //simple partition number Modulo return message.id%numberOfShards
if (message is DogMsg) {
//
return (message.id% 10).toString()
return message.id.toString()
} else {
throw RuntimeException("Unrecognized message type $message")
}
}

//The message can be unpacked
override fun entityMessage(message: Any): Any {
return message
}

}
//The type of message that will be sent when the partition stops
object handOffStopMessage

fun createActorSystem(port: Int): ActorSystem {
val config
= ConfigFactory.parseString(
"akka.remote.artery.canonical.port=$port"
).withFallback(
ConfigFactory.load()
)

var actorSystem
= ActorSystem.create("custerA", config);

return actorSystem

}

fun startShardRegion(port: Int) {


var actorSystem
= createActorSystem(port)

val settings
= ClusterShardingSettings.create(actorSystem)//.withRole( "ClusterShardRole")

val shardReg
= ClusterSharding.get(actorSystem).start(
"dogShard",
Props.create(DogActor::
class.java),
settings,
ShardExtractor(),
ShardCoordinator.LeastShardAllocationStrategy(
10, 1),
handOffStopMessage
)

for (i in 1..10) {

shardReg.tell(DogMsg(i,
" wang"), ActorRef.noSender())

Thread.sleep(
3000)
}

}


fun shardRegProxy() {

var actorSystem
= createActorSystem(2663)

//startProxy proxy mode, that is, it does not host any entity itself , But knows how to delegate the message to the right place
ClusterSharding.get(actorSystem)
.startProxy(
"dogShard", Optional.empty(), ShardExtractor())
.let {println(
" shard proxy $it started.")}

Thread.sleep(
3000)

var shardReg
= ClusterSharding.get(actorSystem).shardRegion("dogShard")


for (i in 1..100) {

shardReg.tell(DogMsg(i,
"C wang"), ActorRef.noSender())

Thread.sleep(
1500)
}
}

fun main() {

startShardRegion(
2661)
}

fun main() {

startShardRegion(
2662)
}

fun main() {


shardRegProxy()

}

akka {

actor {
provider
= "cluster"
}

# For the sample, just bind to loopback and
do not allow access from the network
# the port is overridden by the logic in main
class
remote.artery {
enabled
= on
transport
= tcp
canonical.port
= 0
canonical.hostname
= 127.0.0.1
}

cluster {
seed
-nodes< /strong> = [
"akka://[email protected]:2661",
"akka://[email protected]:2662"]

# auto downing is NOT safe
for production deployments.
# you may want to use it during development, read more about it in the docs.
auto
-down-unreachable-after = 10s
}
}

No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed

WordPress database error: [Table 'yf99682.wp_s6mz6tyggq_comments' doesn't exist]
SELECT SQL_CALC_FOUND_ROWS wp_s6mz6tyggq_comments.comment_ID FROM wp_s6mz6tyggq_comments WHERE ( comment_approved = '1' ) AND comment_post_ID = 216 ORDER BY wp_s6mz6tyggq_comments.comment_date_gmt ASC, wp_s6mz6tyggq_comments.comment_ID ASC

Leave a Comment

Your email address will not be published.