This article continues immediately above.
Scheme Two
Redis-based message receipt. The main process is divided into the following steps:
1) Temporarily store the message in redis, and set the expiration time of the message
2) The client acknowledges the message id to eliminate the temporary storage
3) Open a separate thread forum for the message in step 1). Resend the message according to the time of the message. If the message is stored for the first time and has a heavy rain validity period (customized 10 seconds), parse the to in the message to find whether the user is still online. If it is, it will be dropped (because it ignores important commands of the service for a long time), if it is not online, the message will be placed in the offline table.
OK, let’s take a look at the message storage format first.
1.MESSAGE message user collection
SADD SOGU:[username] [VALUE(messageID)] [VALUE(messageID)] …
2. Read message device collection
SADD RT:[terminalid] [VALUE(messageID)] [VALUE(messageID)] …
3. Message content
HMSET OGM:[messageID] CREATIONDATE [VALUE] UPDATEDATE [VALUE] STANZA [VALUE]
4. User and device association
SADD URT:[USERNAME] [VALUE(terminalid) ] …….
(First find the time according to the message id, then find the stanza after sorting in java)
MESSAGE
—Offline table
ZADD OFOFFLINE:[username] [INDEX(time stamp)] [VALUE(messageID)], [VALUE], [VALUE]… [VALUE]
HMSET OFOFFLINE:[messageID] STANZA[VALUE]
CREATIONDATE [VALUE] MESSAGESIZ[VALUE]
Store the message temporarily:
public void storeMessage(String username, Packet packet) {Jedis jedis = XMPPServer.getInstance(). getGroupRedisManager().getJedis( ); String packetID = ""; if (packet instanceof Message) packetID = ((Message)packet).getID(); else if (packet instanceof IQ) packetID = ((IQ)packet).getID(); else return; try {jedis.sadd("SOGU:" + username, packetID); Maphash = new HashMap (); hash.put("STANZA", packet.toXML()); hash .put("CREATIONDATE", StringUtils.dateToMillis(new Date())); jedis.hmset("OGM:" + packetID, hash);} finally {XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis); } htp.execute(addMessagesToDB(packet));} private Runnable addMessagesToDB(final Packet packet) {return new Runnable() {@Override public void run() {MyDBopt.insertMessage(packet);}
The client receives the message and acknowledges the operation of the server
private void handle(IQ packet) {JID recipientJID = packet.getTo() ; if (IQ.Type.crs != packet.getType()) { // Check if the packet was sent to the server hostname if (recipientJID != null && recipientJID.getNode() == null && recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {Element childElement = packet.getChildElement(); if (childElement != null && childElement.element("addresses") != null) {// to route this packet multicastRouter.route(packet); return;}}} if (IQ. Type.crs == packet.getType()) {String username = packet.getFrom().getNode(); String terminal = packet.getFrom().getTerminal(); String msgId = packet.getID(); if (username == null || msgId == null || "".equals(msgId)) {return;} if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000;} Jedis jedis = XMPPServer.getInstance().getGroupRe disManager().getJedis(); try {jedis.sadd("URT:" + username, terminal); jedis.sadd("RT:" + terminal, packet.getID());} finally {XMPPServer.getInstance() .getGroupRedisManager().returnRes(jedis);} threadPool.execute(createTask(msgId, username, terminal)); return;} if (packet.getID() != null && (IQ.Type.result == packet.getType () || IQ.Type.error == packet.getType())) {// The server got an answer to an IQ packet that was sent from the server IQResultListener iqResultListener = resultListeners.remove(packet.getID()); if (iqResultListener != null) {resultTimeout.remove(packet.getID()); if (iqResultListener != null) {try {iqResultListener.receivedAnswer(packet);} catch (Exception e) {Log.error( "Error process ing answer of remote entity. Answer: "+ packet.toXML(), e);} return;}}} try {// Check for registered components, services or remote servers if (recipientJID != null && (routingTable.hasComponentRoute( recipientJID) || routingTable.hasServerRoute(recipientJID))) {// A component/service/remote server was found that can handle the Packet routingTable.routePacket(recipientJID, packet, false); return;} if (isLocalServer(recipientJID)) {// Let the server handle the Packet Element childElement = packet.getChildElement(); String namespace = null; if (childElement != null) {namespace = childElement.getNamespaceURI();} if (namespace == null) { if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {// Do nothing. We can't handle queries outside of a valid namespace Log.warn(" Unknown packet "+ packet.toXML());}} else {// Check if communication to local users is allowed if (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {PrivacyList list = PrivacyListManager.getInstance ().getDefaultPrivacyList(recipientJID.getNode()); if (list != null && list.shouldBlockPacket(packet)) {// Communication is blocked if (IQ.Type.set == packet.getType() || IQ. Type.get == packet.getType()) {// Answer that the service is unavailable sendErrorPacket(packet, PacketEr ror.Condition.service_unavailable);} return;}} IQHandler handler = getHandler(namespace); if (handler == null) {if (recipientJID == null) {// Answer an error since the server can't handle the requested namespace sendErrorPacket(packet, PacketError.Condition.service_unavailable);} else if (recipientJID.getNode() == null || "".equals(recipientJID.getNode())) {// Answer an error if JID is of the formsendErrorPacket(packet, PacketError.Condition.feature_not_implemented);} else {// JID is of the form // Answer an error since the server can't handle packets sent to a node sendErrorPacket(packet, PacketError.Condition.service_unavailable);}} else {handler.process(packet);}}} else {// JID is of the form or belongs to a remote server // or to an uninstalled component routingTable.routePacket(recipientJID, packet, false);}} catch (Exception e) {......} }
Offline Messages
The optimization of offline messages.
XMPP can also be expanded. For example
The client can communicate like this when getting offline messages.
1) First ask the server for the basic status of my total offline messages (how big and how many)
< /pre> 2) The server returns
1024b> 128> 1001 ,1002...> 3) The client sends commands to get in batches, and send me 10 commands at a time until they are finished.
10> 4) The server starts to send messages
...... ...... .....5) Tell the client that I am Send out
6) Client local verification, receipt of the received message
here I just made a simple idea. If you need more precision, you might as well think carefully about message processing and format.
Offline message storage.
Store the message in redis:
public void addMessageToRedis(Message message) {if (message == null) {return;} JID recipient = message.getTo(); String username = recipient.getNode(); // If the username is null (such as when an anonymous user), don't store. if (username == null || !UserManager. getInstance().isRegisteredUser(recipient)) {return;} else if (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {// Do not store messages sent to users of remote servers return;} String msgXML = message.getElement().asXML(); Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis(); try {String newDate = StringUtils.dateToMillis(new java.util. Date()); String id = MessageIdTactics.mid(username); jedis.zadd("OFOFFLINE:" + username, Long.valueOf(new Date), id Maphash = new HashMap (); hash.put("STANZA", msgXML); hash.put("MESSAGESIZ", String.valueOf(msgXML.length() )); hash.put("CREATIONDATE", newDate); jedis.hmset("OFOFFLINE:" + id, hash);} finally {XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);} if (sizeCache .containsKey(username)) {int size = sizeCache.get(username); size += msgXML.length(); sizeCache.put(username, size);} htp.execute(addMessageToDB(message)); } This is the end of Redis optimization. The main thing to do is:
First: To store users or MUC, Group, etc., the life cycle of message storage needs to be set. When the user is not active or not logged in for a long time. To be raised from redis. Avoid wasting resources. When the user reloads, he will be placed in redis
Second: Separate the need for receipt message and offline message. The message that requires a receipt needs to set its life cycle. The offline watch is best to be a timer. Poll for messages. The messages within the timeout range (for example, the period is one week) are synchronized to the relational table. The offline message here needs to separate the user's device.
Here we need to consider different device terminals After many different scenarios, the problem will be more convoluted. Welcome everyone to communicate with me via email.