In-depth understanding of Dojo’s server push technology



Learn more about Dojo’s server push technology

dojox.cometd tool

Server push technology has been It has been out for a while, and there are many open source implementations based on this technology (it should be said that it is a design pattern) in the industry, but it is more troublesome to transplant or apply to your own projects. Dojo, a large-scale Web2.0 development framework, provides a set of packaged specific implementations based on server-side push technology (including server-side Java and client-side Web and JavaScript). It is based on the Bayeux protocol and provides some simple and powerful The interface allows you to quickly build your own server-side push function. The client implementation is Dojo’s Cometd front-end component, which encapsulates interfaces such as connection establishment and message subscription. The server builds a message push mechanism based on Jetty and annotation, and also encapsulates a relatively simple but practical message push interface, which works with the Cometd interface of the front-end Dojox. This article will focus on how Dojo’s server-side push mechanism works, and how we should build our own server-side push function based on Dojo’s Cometd toolkit.

Zhou Xiang , Software Engineer, IBM

Close[x]

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.pngZhou Xiang is an IBM China Shanghai A software engineer in the UT / Dojo department. He used to work in the IBM Lotus Mashups department and the UT / Click Track department. Now he is a member of the FIT / Dojo department. He is mainly engaged in the research and development of the Dojo control library. Including the development of Dojo’s claro theme and the development of Dojox’s visual components.

February 13, 2012

  • +Content

< div id="ibm-pcon">

Introduction to server push technology and Bayeux protocol

The basic idea of ​​server push technology is to change the browser’s initiative to query information to the server’s initiative to send information. The server sends a batch of data, and the browser displays the data while ensuring the connection with the server. When the server needs to send a batch of data again, the browser displays the data and keeps the connection. In the future, the server can still send batch data, the browser continues to display the data, and so on. Based on this idea, here we have to elicit the Bayeux protocol.

Bayeux is a set of communication protocols based on Publish/Subscribe mode that transmits events between the browser and the server in JSON format. This agreement stipulates the two-way communication mechanism between the browser and the server, and overcomes the shortcomings of the traditional Web communication mode.

The Bayeux protocol is mainly based on HTTP to transmit low-latency, asynchronous event messages. These messages are delivered through Channels, which can be transmitted from the server to the client, from the client to the server, or from one client to another through the server. The main purpose of the Bayeux protocol is to achieve highly responsive user interaction for Web clients that use Ajax and Comet technologies. The Bayeux protocol aims to reduce the complexity of developing Comet applications by allowing performers to achieve interoperability more easily. It solves the common message publishing and routing problems, and provides a gradual improvement and expansion mechanism.

Under normal circumstances, in the HTTP protocol, if the Client wants to obtain the server’s message, it must first send a Request by itself, and then the Server will give a Response. The Bayeux protocol changes this situation. It allows the Server side to asynchronously push its own messages to the Client side. So as to realize the two-way operation mode between Client and Server.


< h2 id="major2">A simple implementation of server push technology

There are many ways to implement server push technology based on the Bayeux protocol. You can use Flex or Java Applet. Based on these two technologies, we can establish a service socket interface on the client. The “two-way operation mode” is naturally easy to implement, but these methods require the support of the operating environment other than the browser. Here we hope to adopt a pure scripting method. It is impossible to establish a service socket interface in this way. How to implement server push based on the Bayeux protocol? In fact, it can be implemented in simulation. There are two main ways:

1. Long polling based on HTTP for message communication (long-polling based on Ajax).

2. Streaming based on Iframe and htmlfile.

Here we use the first way to achieve, that is: the client sends an HTTP Request to the server first, the server receives it, blocks it there, and when the server has a message, it returns an HTTP Request The HTTP Response is sent to the client. After the client receives it, it disconnects, and then sends a second HTTP Request. This is repeated to maintain this “long polling”. During this period, if the connection times out, it will be disconnected and reconnected to maintain the connection.

Based on the above ideas, let’s take a look at a simple implementation, which is based on PHP. The example is very simple, even if you haven’t used PHP, you can easily read it, and we will explain them one by one later.

This example mainly implements such a function:

We open three windows in the browser and visit the same page. Modify the content on one of the pages, and the content on the other two pages will also change immediately (note: there is no need to refresh the page here). This will give us a feeling that the data is pushed by the server.

Figure 1. Simple server push example – before content modification

Figure 1. Simple server push example - before content modification

We modify the first The content of each window (upper left) (input “222” and click the “Send” button to send to the background). At this time, not only the content of the first window changes, but also the content of the other two windows.

Figure 2. Simple server push example – content modification

Figure 2. Simple server push example - content modification

Next, let’s take a look at an example Code:

List 1. Simple server push-front-end code HTML
 

This is We have seen the input box and submit button, you can pay attention to its “onsubmit” method: when we enter content and click submit, it will execute the “comet.doRequest($(‘word’).value)” method Initiate a request to the backend (in fact, we have established long polling with the server before this and can start pushing data from the server at any time). Next, let’s take a look at what this “comet” looks like and the specific implementation of his Request:

List 2. Simple server push – front-end code JavaScript

< div class="codesection">

 var Comet = Class.create();Comet.prototype = {timestamp: 0, url:'./backend.php', noerror: true, initialize: function (){ }, connect: function(){ this.ajax = new Ajax.Request(this.url, {method:'get', parameters: {'timestamp': this.timestamp }, onSuccess: function(transport){ var response = transport.responseText.evalJSON(); this.comet.timestamp = response['timestamp']; this.comet.handleResponse(response); this.comet.noerror = true; }, onComplete: function(transport){ if (!this.comet.noerror) setTimeout(function(){ comet.connect() }, 5000); else this.comet.connect(); this.comet.noerror = false;} }); this.ajax. comet = this; }, handleResponse: function(response){ $('content').innerHTML +='
' + response['msg'] + '
'; }, doRequest: function(request){ new Ajax.Request(this.url, {method:'get', parameters: {'msg': request} }); }}var comet = new Comet();comet.connect();

Let’s look at the last two pieces of code first. Here is the code that will be executed when the page is initialized. In fact, here, we have established a server-side Long polling, let’s take a look at the implementation of the “connect” method:

The “connect” method here is to send an Ajax request, and then set the return processing and request when successful (onSuccess) Processing on completion (onComplete) (note that onComplete will be executed regardless of success or failure). We want to hang on the onComplete method here. It can be seen that when the request is completed, if there is a problem with the connection, it will reconnect after 5 seconds; if there is no problem, it will reconnect immediately.

I believe you should be a little eye-catching when you see this. There is actually no so-called constant connection (similar to the TCP method). Its real realization is achieved through constant Ajax requests.

So, when we opened 3 windows, we actually opened 3 simulated uninterrupted connections between the client and the server, so they would instantly understand the information on the server without refreshing the page.

Let’s take a look at the implementation of the server again and see how it pushes:

List 3. Simple server push – back-end code PHP
 $filename = dirname(__FILE__).'/data.txt'; // save the new message in the file $msg = isset($_GET[' msg'])? $_GET['msg']:''; if ($msg !='') {file_put_contents($filename,$msg); die();} // This is an infinite loop, once found When the file is modified, it will jump out of the loop and return to the file to modify the data. If the file has not been modified, it will always // be in a loop detection state. At this time, the Ajax connection will also be kept until the file is modified. This is the so-called "long polling". $lastmodif = isset($_GET['timestamp'])? $_GET['timestamp']: 0; $currentmodif = filemtime($filename); while ($currentmodif <= $lastmodif) // Check whether the file has been modified { usleep(10000); // sleep 10ms to unload the CPU clearstatcache(); $currentmodif = filemtime($filename);} // return JSON array $response = array(); $response['msg'] = file_get_contents($ filename); $response['timestamp'] = $currentmodif; echo json_encode($response); flush();

We can refer to the comments above to understand the code, but it is not necessary How much PHP knowledge. Server-side push technology is not a control library for development, but an idea. The while loop here illustrates how the server push retains the so-called "long polling".

Now everyone should understand why the three windows will change simultaneously. The main core idea is that the server "holds" long polling, and then "let go" at the appropriate time.


< h2 id="major3">Introduction to Dojo’s Cometd Toolkit

Before we implemented a simple Cometd application based on JavaScript. We spent a lot of code to build a Cometd framework, which is really used for processing The code of our own business logic is actually the line in "handleResponse". Can we omit these common codes? The answer is yes. Dojo has already encapsulated Cometd. Based on Dojo's Cometd package, we don't need to waste a lot of code to build the Cometd framework. For the front-end script code, we only need to add a simple interface code of the Cometd package, and then we can start adding our own business logic code.

Of course, Dojo’s Cometd package also includes the back-end code, which can be found in Dojo’s official website download. It is not released together with the Dojo package. It is a separate server-side open source code based on Java and Jetty. , Readers who are interested can download it and study it.

Through these two parts of Dojo code, we can quickly build our Cometd framework, and all we need to do is to add our business logic.


< h2 id="major4">Front end of Dojo’s Cometd toolkit

Next, let’s take a look at the front-end package of Dojo’s Cometd toolkit:

Listing 4 . Cometd front-end initialization
 dojox.cometd.init("http://www.xxx.com/cometd");

< /div>

This interface is used to establish and initialize the handshake connection with the server (Bayeux handshake, which initializes the "Bayeux communication" message communication). The establishment of this connection is based on the Bayeux protocol, which has two main tasks:

  1. The client and the server negotiate the type of message to be transmitted.
  2. If the negotiation is successful, the server will notify the client of the specific request parameter configuration.
  3. If the negotiation fails, the client re-initiates the negotiation process.

We can see the specific implementation process of the handshake connection when we dig into Dojo’s init method. Its implementation is also to repeatedly send the client's Ajax request uninterruptedly, which is similar to our previous self-made case. Interested students can refer to the following code (extracted part):

List 5. Cometd internal mechanism
 this.init = function(...){............ var bindArgs = {url: this.url, handleAs: this.handleAs, content: {"message": dojo. toJson([props]) }, load: dojo.hitch(this,function(msg){ this._backon(); this._finishInit(msg); }), error: dojo.hitch(this,function(e){ this._backoff(); this._finishInit(e); }), timeout: this.expectedNetworkDelay };.............. if(this._isXD){ r = dojo.io. script.get(bindArgs); }else{ r = dojo.xhrPost(bindArgs); }..............}this._finishInit = function(data){...... ............ if(successful){ ........ //ajax request inside this.tunnelInit = transport.tunnelInit && dojo.hitch(transport, "tunnelInit"); this .tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport, "tunnelCollapse"); trans port.startup(data); }else{ if(!this._advice || this._advice["reconnect"] != "none"){ setTimeout(dojo.hitch(this, "init", this.url, this ._props), this._interval());} }....................}

Visible, their callback The method contains a call to itself, and the "init" method here is no exception. Careful readers may also find that, in fact, it can be seen from the example: Dojo's Cometd also supports cross-domain, and its cross-domain is realized through the "script" method. There is one point for everyone to understand. Our default server push implementation method is long-polling mode. When encountering cross-domain, "long-polling" is no longer applicable, and it is converted to "script"-based return. Call (callback-polling) mode.

Next, let’s take a look at some interfaces about message push in Cometd. These message communications are mainly based on channels:

List 6. Cometd front-end publishing messages< /h5>

 dojox.cometd.publish("/service/echo", {msg: msg });

The so-called "publishing a message" here means sending a message to the back end for the front end to actively push it to the back end.

The first parameter here is the identification of the channel that sends the message. There are three types of this "channel":

1. Meta channels: Example "/Meta/connect" (usually starting with "/meta/"). Meta-channels are not mainly used for message transmission, but for client monitoring, such as handshake connection or network connection errors. Usually we call "addListener()" on the client to open the monitoring meta-channel, which can start the monitoring before the establishment of the handshake connection, and this message monitoring is synchronous.

2. Service channels: Example "/service/connect" (usually starting with "/service/"). It is mainly used for private message communication, mainly one-to-one communication. Usually we call "subscribe()" on the client to subscribe to service channel messages. The service channel can only be opened after the handshake connection is established, and it communicates asynchronously.

3. Normal channels: Example "/foo/bar" (unlimited). There are no restrictions on this channel, and it is mainly used to broadcast messages, that is, if multiple clients subscribe to a service, the service can broadcast messages through ordinary channels.

Channel is the basic mode of communication, and we can choose the corresponding channel mode according to our needs.

The second parameter is the message object, where "msg" is the message content.

One thing to note: The "publish" here is based on the Bayeux protocol and uses an asynchronous message transmission mechanism, so it returns before the server (Bayeux server) receives the message. So the return of publish does not mean that the server has received your publish message.

Dojo's Cometd also supports batch sending of messages. Through this interface, unnecessary network message transmission waste can be effectively avoided:

List 7. Cometd front-end batch Publish message
 // Method 1 cometd.batch(function() {cometd.publish('/channel1', {product:'foo'} ); cometd.publish('/channel2', {notificationType:'all' }); cometd.publish('/channel3', {update: false }); }); // Method 2 cometd.startBatch() cometd. publish('/channel1', {product:'foo' }); cometd.publish('/channel2', {notificationType:'all' }); cometd.publish('/channel3', {update: false }); cometd.endBatch()

The above two schemes can send messages in batches. Method 1 is recommended.

Next, let’s look at the message push on the server side:

List 8. Cometd front-end subscription message
 dojox.cometd.subscribe("/service/echo",echoRpcReturn); function echoRpcReturn(msg){ dojo.byId("responses").innerHTML += msg; }

The so-called "subscription message" here is actually to receive the message pushed by the server, which is actively pushed by the back end to the front end. This is also the essence of server push, and it is also a very simple line of code.

Here we see a familiar method --- "subscribe", we have introduced before, it is mainly used to subscribe to service channel private messages, here is an example of its usage. The corresponding server Service pushes messages to the corresponding front-end subscribers. Here, the message is pushed to the front-end through the "echo" channel. He will call back the "echoRpcReturn" method and pass in the pushed message as an actual parameter. For each push from the back-end, the front-end "echoRpcReturn" method is called.


< h2 id="major5">Dojo’s Cometd toolkit backend

Dojo’s Cometd toolkit backend implementation is based on Java and Jetty components. Through Dojo’s server Cometd component, we can also Build the Cometd framework quickly. All we need to do is to add our business logic code.

Let’s take a look at the configuration parameters of web.xml first:

List 9. Basic configuration parameters (web.xml)
   cometd  org.cometd.server.continuation.ContinuationCometdServlet   timeout< /param-name> 60000    cometd / cometd/*   cross-origin org.eclipse.jetty.servlets.CrossOriginFilter < /filter>  cross-origin /cometd/*  < /pre> 

Here we first take a look at "ContinuationCometdServlet", this Servlet is mainly used to explain the Bayeux protocol, so its configuration is necessary. There are many other configuration parameters based on "ContinuationCometdServlet", such as:

Timeout: the expiration time of long polling. If there is no client message after this time, the server will push an empty message.

Interval: Polling interval time. The interval between the end of the previous request and the sending of the next request by the client.

maxInterval: The maximum waiting time of the server. That is: when a connection is established, if a new long-polling connection request is not received after this time, the server will consider the client to be invalid or closed.

logLevel: Log level. "0 = warn, 1 = info, 2 = debug".

The above are the main configuration parameters. There are many other configuration parameters. I will not introduce them one by one here. Readers in need can refer to Dojo's help documentation. In addition, we also configure a "cross-origin" in the last few lines, corresponding to the "CrossOriginFilter" class, which is used to support cross-domain JavaScript requests. If you want to support cross-domain server push in your project, please add this configuration .

Next, let’s take a look at some advanced configuration parameters:

List 10. Advanced configuration parameters (web.xml)
  cometd org.cometd.java.annotation.AnnotationCometdServlet  logLevel 1   services org.cometd.examples.ChatService  1   cometd /cometd/*   cometdDemo  org.cometd.examples.CometdDemoServlet 2 

Here we mainly need to pay attention to three Places:

1. "CometdDemoServlet": It is a servlet used to start the server-side Cometd framework, we will introduce it later. Since he has configured the "load-on-startup" parameter, our Cometd server is already set up when the service container starts, and then we will focus on the behavior of his "init" method.

2. "AnnotationCometdServlet": This Servlet configuration here means that our server code is based on annotation. This is a very practical Servlet. Through this Servlet, you will find that what we have to do is to define a few Service classes and implement a few of them. Even a lot of code calling the Cometd framework API interface is omitted.

3. "ChatService": A Service class is declared here, and its purpose is to process messages from service channels. The function declared here is equivalent to "processor.process(new ChatService())" in the code.

After the configuration is complete, we can look at the code next. After passing the above configuration, you will find that the code we will write next is very simple and refined:

List 11. Service class initialization init
 public void init() throws ServletException {final BayeuxServerImpl bayeux = (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE); if (bayeux==null) throw new UnavailableException("No BayeuxServer !"); ................. // Create extension point bayeux.addExtension(new TimesyncExtension()); bayeux.addExtension(new AcknowledgedMessagesExtension()); // Set Handshake connection permission bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer( GrantAuthorizer.GRANT_PUBLISH); // Start service channel ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux); processor.process(new EchoRPC()); processor.process(new Monitor( )); //processor.process(new ChatService()); bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer() {public void configureChannel(ConfigurableServerChannel channel) {channel.setPersistent(tr ue);} }); if (bayeux.getLogger().isDebugEnabled()) System.err.println(bayeux.dump()); .................} 

Here we introduce three knowledge points:

1. Extension: Extension is a function, it will be called before the message is sent or after it is received, for special use To modify the content of the message, such as adding some special attributes (most of these attributes are in the ext attribute of the message). Note that most of these attributes are application-independent, such as recording the number of long polls and so on. Here "TimesyncExtension" and "AcknowledgedMessagesExtension" are two more commonly used Extensions:

  • 1) "Timesync Extension" is used to calculate the client The deviation between the event and the server time. The client needs to introduce the "dojox.cometd.timesync" class at the same time. The Extension enables the client and the server to exchange their respective clock information each time they shake hands or connect. This is also the client can accurately calculate his The offset from the server clock. The message format is as follows:

    {ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}

    TC: The time when the client sent the message (the time since the January 1970 issue, in milliseconds)

    TS: The time when the server received the message

  • 2) "Acknowledge Extension" is used to provide a reliable sequential message mechanism. Once "Acknowledge Extension" is added, the server will block non-long polling client requests, which will make your server more efficient. Note: The client needs to introduce the "dojox.cometd.ack" class at the same time to work with it.

2. Authorizer: Set the handshake connection authority, here the set value is "GrantAuthorizer.GRANT_PUBLISH", which means that all clients are allowed to establish a handshake connection.

3. Process Service: Start the service channel "processor.process(new EchoRPC())". Through these service channel classes, we can start service channels to process client requests. This is the key to our server-side push technology, and our business logic code is also mainly placed in these service channel classes.

Next, let’s take a look at the specific implementation of these service channel classes:

List 12. Echo Service implementation
 @Service("echo")public static class EchoRPC{ @Session private ServerSession _session; @SuppressWarnings("unused") @Configure("/service/echo") private void configureEcho(ConfigurableServerChannel channel ) {channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);} @Listener("/service/echo") public void doEcho(ServerSession session, ServerMessage message) {Map data = message.getDataAsMap(); Log.info ("ECHO from "+session+" "+data); for(int i = 0; i <50; i++){ session.deliver(_session, message.getChannel(), data, null);} }}

We can set the permissions supported by the service channel in "configureEcho". We mainly take a look at the "doEcho" method, which is identified as "@Listener("/service/echo")", so it can be used to push messages like the "echo" service channel on the client side. Our previous client code The code in the example is as follows: "dojox.cometd.subscribe("/service/echo",echoRpcReturn)" is specifically used to process the messages pushed by this service channel. The message is pushed through the "deliver" method, and the pushed message information is placed in " data" in the actual parameter.

Let’s take a look at the Monitor class again:

List 13. Monitor Service implementation
 @Service("monitor") public static class Monitor {@Listener("/meta/subscribe") public void monitorSubscribe(ServerSession session, ServerMessage message) {Log.info("Monitored Subscribe from "+session+" for "+message .get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get( Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }< /pre>       

Monitor 渠道类与之前的Echo 服务渠道类比较类似,不过它主要用于处理meta 渠道,与业务逻辑无关。

最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:

清单 14. ChatService 的配置
   ...............    services  org.cometd.examples.ChatService    1  

细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:

清单 15. ChatService 实现
 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _members.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new Ser verSession.RemoveListener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/members/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Mapnew_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }

以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。


服务器推送技术之比较

其实有很多种方式实现服务器推送,它们各有各的优缺点:

  1. 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
  2. Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
  3. 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
  4. 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
  5. HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。 HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。

结束语

这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。



服务器推送技术已经出来一段时间了,业界上也有不少基于这种技术(应该说是设计模式)的开源实现,但是要移植或者说应用到自己的项目上都比较麻烦。 Dojo 这样一个大型的 Web2.0 开发框架提供了一套封装好的基于服务端推送技术的具体实现(包括服务端 Java 和客户端 Web 和 JavaScript),它基于 Bayeux 协议,提供了一些简单而且强大的接口可以让你快速构建自己的服务端推送功能。客户端实现即 Dojo 的 Cometd 前端组件,它封装了建立连接、消息订阅等等接口。服务端基于 Jetty 和 annotation,组建消息推送机制,同样也封装了比较简单但实用的消息推送接口,与前端 Dojox 的 Cometd 接口协同工作。这篇文章将重点介绍 Dojo 的服务端推送机制是如何运作的,以及我们应该如何基于 Dojo 的 Cometd 工具包构建自己的服务端推送功能。

周 翔, 软件工程师, IBM

关闭 [x]

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.png周翔 是 IBM 中国上海 UT / Dojo 部门的一名软件工程师,之前在 IBM Lotus Mashups 部门 和 UT / Click Track 部门工作,现在是 FIT / Dojo 部门的一员,主要从事 Dojo 控件库的研发工作,包括 Dojo 的claro主题的开发,dojox的可视化组件的开发。

2012 年 2 月 13 日

  • +内容

服务器推送技术已经出来一段时间了,业界上也有不少基于这种技术(应该说是设计模式)的开源实现,但是要移植或者说应用到自己的项目上都比较麻烦。 Dojo 这样一个大型的 Web2.0 开发框架提供了一套封装好的基于服务端推送技术的具体实现(包括服务端 Java 和客户端 Web 和 JavaScript),它基于 Bayeux 协议,提供了一些简单而且强大的接口可以让你快速构建自己的服务端推送功能。客户端实现即 Dojo 的 Cometd 前端组件,它封装了建立连接、消息订阅等等接口。服务端基于 Jetty 和 annotation,组建消息推送机制,同样也封装了比较简单但实用的消息推送接口,与前端 Dojox 的 Cometd 接口协同工作。这篇文章将重点介绍 Dojo 的服务端推送机制是如何运作的,以及我们应该如何基于 Dojo 的 Cometd 工具包构建自己的服务端推送功能。

周 翔, 软件工程师, IBM

关闭 [x]

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.png周翔 是 IBM 中国上海 UT / Dojo 部门的一名软件工程师,之前在 IBM Lotus Mashups 部门 和 UT / Click Track 部门工作,现在是 FIT / Dojo 部门的一员,主要从事 Dojo 控件库的研发工作,包括 Dojo 的claro主题的开发,dojox的可视化组件的开发。

2012 年 2 月 13 日

  • +内容

关闭 [x]

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.png周翔 是 IBM 中国上海 UT / Dojo 部门的一名软件工程师,之前在 IBM Lotus Mashups 部门 和 UT / Click Track 部门工作,现在是 FIT / Dojo 部门的一员,主要从事 Dojo 控件库的研发工作,包括 Dojo 的claro主题的开发,dojox的可视化组件的开发。

关闭 [x]

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.png周翔 是 IBM 中国上海 UT / Dojo 部门的一名软件工程师,之前在 IBM Lotus Mashups 部门 和 UT / Click Track 部门工作,现在是 FIT / Dojo 部门的一员,主要从事 Dojo 控件库的研发工作,包括 Dojo 的claro主题的开发,dojox的可视化组件的开发。

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.png周翔 是 IBM 中国上海 UT / Dojo 部门的一名软件工程师,之前在 IBM Lotus Mashups 部门 和 UT / Click Track 部门工作,现在是 FIT / Dojo 部门的一员,主要从事 Dojo 控件库的研发工作,包括 Dojo 的claro主题的开发,dojox的可视化组件的开发。

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.png周翔 是 IBM 中国上海 UT / Dojo 部门的一名软件工程师,之前在 IBM Lotus Mashups 部门 和 UT / Click Track 部门工作,现在是 FIT / Dojo 部门的一员,主要从事 Dojo 控件库的研发工作,包括 Dojo 的claro主题的开发,dojox的可视化组件的开发。

/developerworks/cn/web/1107_zhouxiang_dojostateful/author_small.png周翔 是 IBM 中国上海 UT / Dojo 部门的一名软件工程师,之前在 IBM Lotus Mashups 部门 和 UT / Click Track 部门工作,现在是 FIT / Dojo 部门的一员,主要从事 Dojo 控件库的研发工作,包括 Dojo 的claro主题的开发,dojox的可视化组件的开发。

  • +内容

  • +内容

服务器推送技术和 Bayeux 协议简介

服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。

Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。

Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。 Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。 Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。

一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。


服务器推送技术的一个简单实现

基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:

1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。

2. 基于 Iframe 及 htmlfile 的流(streaming)方式。

这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。

基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。

这个示例主要实现这样一个功能:

我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。

图 1. 简单服务器推送示例 -- 内容修改前

图 1. 简单服务器推送示例 -- 内容修改前

我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。

图 2. 简单服务器推送示例 -- 内容修改

图 2. 简单服务器推送示例 -- 内容修改

接下来我们来看看示例代码吧:

清单 1. 简单服务器推送 -- 前端代码 HTML
 

这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:

清单 2. 简单服务器推送 -- 前端代码 JavaScript
 var Comet = Class.create();Comet.prototype = {	timestamp: 0,	url: './backend.php',	noerror: true,	initialize: function(){	},	connect: function(){		this.ajax = new Ajax.Request(this.url, {			method: 'get',			parameters: {				'timestamp': this.timestamp			},			onSuccess: function(transport){				var response = transport.responseText.evalJSON();				this.comet.timestamp = response['timestamp'];				this.comet.handleResponse(response);				this.comet.noerror = true;			},			onComplete: function(transport){				if (!this.comet.noerror) setTimeout(function(){					                     comet.connect()				                         }, 5000);				else 				this.comet.connect();				this.comet.noerror = false;			}		});		this.ajax.comet = this;	},	handleResponse: function(response){		$('content').innerHTML += '
' + response['msg'] + ''; }, doRequest: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); }}var comet = new Comet();comet.connect();

我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:

“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。

相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。

所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。

我们再来看看服务端的实现,看看他是如何推送的:

清单 3. 简单服务器推送 -- 后端代码 PHP
 $filename  = dirname(__FILE__).'/data.txt';  // 将新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : '';  if ($msg != '')  {   file_put_contents($filename,$msg);   die();  }  // 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一  // 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。 $lastmodif    = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;  $currentmodif = filemtime($filename);  while ($currentmodif <= $lastmodif) // 检测文件是否被修改 {   usleep(10000); // sleep 10ms to unload the CPU   clearstatcache();   $currentmodif = filemtime($filename);  }  // 返回 JSON 数组 $response = array();  $response['msg']       = file_get_contents($filename);  $response['timestamp'] = $currentmodif;  echo json_encode($response);  flush();

我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。

现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。


Dojo 的 Cometd 工具包简介

之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢? The answer is yes. Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。

当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。

通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。


Dojo 的 Cometd 工具包之前端

接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:

清单 4. Cometd 前端初始化
 dojox.cometd.init("http://www.xxx.com/cometd");

这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:

  1. 客户端与服务端协商传输的消息类型。
  2. 如果协商成功,服务端会通知客户端具体的请求参数配置。
  3. 如果协商失败,客户端重新发起协商流程。

我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):

清单 5. Cometd 内部机制
 this.init = function(...){............	var bindArgs = {		url: this.url,		handleAs: this.handleAs,		content: { "message": dojo.toJson([props]) },		load: dojo.hitch(this,function(msg){			this._backon();			this._finishInit(msg);		}),		error: dojo.hitch(this,function(e){			this._backoff();			this._finishInit(e);		}),		timeout: this.expectedNetworkDelay	};..............	if(this._isXD){		r = dojo.io.script.get(bindArgs);	}else{		r = dojo.xhrPost(bindArgs);	}..............}this._finishInit = function(data){..................	if(successful){		........		//ajax request inside		this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,        "tunnelInit");		this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,		"tunnelCollapse");		transport. startup(data);	}else{		if(!this._advice || this._advice["reconnect"] != "none"){			setTimeout(dojo.hitch(this, "init", this.url, this._props),			this._interval());		}	}....................}

可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。

接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:

清单 6. Cometd 前端发布消息
 dojox.cometd.publish("/service/echo", { msg: msg });

这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。

这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:

1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。

2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。

3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。

渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。

第二个参数则是消息对象,这里的“msg”则是消息内容。

有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。

Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:

清单 7. Cometd 前端批量发布消息
 // 方法 1  cometd.batch(function()  {     cometd.publish('/channel1', { product: 'foo' });     cometd.publish('/channel2', { notificationType: 'all' });     cometd.publish('/channel3', { update: false });  });  // 方法 2  cometd.startBatch()  cometd.publish('/channel1', { product: 'foo' });  cometd.publish('/channel2', { notificationType: 'all' });  cometd.publish('/channel3', { update: false });  cometd.endBatch()

上述两种方案都可以实现消息的批量发送,推荐使用方法 1。

接下来我们看看服务端的消息推送:

清单 8. Cometd 前端订阅消息
 dojox.cometd.subscribe("/service/echo",echoRpcReturn);  function echoRpcReturn(msg){  dojo.byId("responses").innerHTML += msg;  }

这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。

这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。


Dojo 的 Cometd 工具包之后端

Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。

先来看看 web.xml 的配置参数:

清单 9. 基本配置参数(web.xml)
               cometd           org.cometd.server.continuation.ContinuationCometdServlet                        timeout             60000                            cometd         /cometd/*                   cross-origin         org.eclipse.jetty.servlets.CrossOriginFilter                   cross-origin         /cometd/*       

这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:

Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。

Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。

maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。

logLevel:日志级别。 “0 = warn, 1 = info, 2 = debug”。

以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。

接下来我们再来看看一些高级配置参数:

清单 10. 高级配置参数(web.xml)
   cometd  org.cometd.java.annotation.AnnotationCometdServlet    logLevel  1      services  org.cometd.examples.ChatService    1      cometd  /cometd/*      cometdDemo  org.cometd.examples.CometdDemoServlet  2  

这里我们主要要注意三个地方:

1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。

2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。

3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。

配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:

清单 11. 服务类初始化 init
 public void init() throws ServletException  {  final BayeuxServerImpl bayeux =  (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);  if (bayeux==null)  throw new UnavailableException("No BayeuxServer!");  .................  // 创建扩展点 bayeux.addExtension(new TimesyncExtension());  bayeux.addExtension(new AcknowledgedMessagesExtension());  // 设定握手连接权限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(   GrantAuthorizer.GRANT_PUBLISH);  // 启动服务渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);  processor.process(new EchoRPC());  processor.process(new Monitor());  //processor.process(new ChatService());  bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()  {  public void configureChannel(ConfigurableServerChannel channel)  {  channel.setPersistent(true) ;  }  });  if (bayeux.getLogger().isDebugEnabled())  System.err.println(bayeux.dump());  .................  }

这里我们介绍三个知识点:

1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:

  • 1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:

    {ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}

    TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)

    TS:服务端收到消息的时间

  • 2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。

2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。

3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。

接下来我们来看看这些服务渠道类的具体实现:

清单 12. Echo Service 实现
 @Service("echo")public static class EchoRPC{	@Session	private ServerSession _session;	@SuppressWarnings("unused")	@Configure("/service/echo")	private void configureEcho(ConfigurableServerChannel channel)	{		channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);	}	@Listener("/service/echo")	public void doEcho(ServerSession session, ServerMessage message)	{		Map data = message.getDataAsMap();		Log.info("ECHO from "+session+" "+data);				for(int i = 0; i < 50; i++){			session.deliver(_session, message.getChannel(), data, null);		}	}}

我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。

再来看看 Monitor 类:

清单 13. Monitor Service 实现
 @Service("monitor")  public static class Monitor  {  @Listener("/meta/subscribe")  public void monitorSubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Subscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }

Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。

最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:

清单 14. ChatService 的配置
   ...............    services  org.cometd.examples.ChatService    1  

细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:

清单 15. ChatService 实现
 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _members.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new Server Session.RemoveListener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/members/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Mapnew_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }

以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。


服务器推送技术之比较

其实有很多种方式实现服务器推送,它们各有各的优缺点:

  1. 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
  2. Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
  3. 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
  4. 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
  5. HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。 HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。

结束语

这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。

服务器推送技术和 Bayeux 协议简介

服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。

Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。

Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。 Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。 Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。

一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。


服务器推送技术的一个简单实现

基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:

1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。

2. 基于 Iframe 及 htmlfile 的流(streaming)方式。

这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。

基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。

这个示例主要实现这样一个功能:

我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。

图 1. 简单服务器推送示例 -- 内容修改前

图 1. 简单服务器推送示例 -- 内容修改前

我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。

图 2. 简单服务器推送示例 -- 内容修改

图 2. 简单服务器推送示例 -- 内容修改

接下来我们来看看示例代码吧:

清单 1. 简单服务器推送 -- 前端代码 HTML
 

这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:

清单 2. 简单服务器推送 -- 前端代码 JavaScript
 var Comet = Class.create();Comet.prototype = {	timestamp: 0,	url: './backend.php',	noerror: true,	initialize: function(){	},	connect: function(){		this.ajax = new Ajax.Request(this.url, {			method: 'get',			parameters: {				'timestamp': this.timestamp			},			onSuccess: function(transport){				var response = transport.responseText.evalJSON();				this.comet.timestamp = response['timestamp'];				this.comet.handleResponse(response);				this.comet.noerror = true;			},			onComplete: function(transport){				if (!this.comet.noerror) setTimeout(function(){					                     comet.connect()				                         }, 5000);				else 				this.comet.connect();				this.comet.noerror = false;			}		});		this.ajax.comet = this;	},	handleResponse: function(response){		$('content').innerHTML += '
' + response['msg'] + '
'; }, doRequest: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); }}var comet = new Comet();comet.connect();

我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:

“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。

相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。

所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。

我们再来看看服务端的实现,看看他是如何推送的:

清单 3. 简单服务器推送 -- 后端代码 PHP
 $filename  = dirname(__FILE__).'/data.txt';  // 将新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : '';  if ($msg != '')  {   file_put_contents($filename,$msg);   die();  }  // 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一  // 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。 $lastmodif    = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;  $currentmodif = filemtime($filename);  while ($currentmodif <= $lastmodif) // 检测文件是否被修改 {   usleep(10000); // sleep 10ms to unload the CPU   clearstatcache();   $currentmodif = filemtime($filename);  }  // 返回 JSON 数组 $response = array();  $response['msg']       = file_get_contents($filename);  $response['timestamp'] = $currentmodif;  echo json_encode($response);  flush();

我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。

现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。


Dojo 的 Cometd 工具包简介

之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢? The answer is yes. Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。

当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。

通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。


Dojo 的 Cometd 工具包之前端

接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:

清单 4. Cometd 前端初始化
 dojox.cometd.init("http://www.xxx.com/cometd");

这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:

  1. 客户端与服务端协商传输的消息类型。
  2. 如果协商成功,服务端会通知客户端具体的请求参数配置。
  3. 如果协商失败,客户端重新发起协商流程。

我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):

清单 5. Cometd 内部机制
 this.init = function(...){............	var bindArgs = {		url: this.url,		handleAs: this.handleAs,		content: { "message": dojo.toJson([props]) },		load: dojo.hitch(this,function(msg){			this._backon();			this._finishInit(msg);		}),		error: dojo.hitch(this,function(e){			this._backoff();			this._finishInit(e);		}),		timeout: this.expectedNetworkDelay	};..............	if(this._isXD){		r = dojo.io.script.get(bindArgs);	}else{		r = dojo.xhrPost(bindArgs);	}..............}this._finishInit = function(data){..................	if(successful){		........		//ajax request inside		this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,        "tunnelInit");		this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,		"tunnelCollapse");		transport.start up(data);	}else{		if(!this._advice || this._advice["reconnect"] != "none"){			setTimeout(dojo.hitch(this, "init", this.url, this._props),			this._interval());		}	}....................}

可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。

接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:

清单 6. Cometd 前端发布消息
 dojox.cometd.publish("/service/echo", { msg: msg });

这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。

这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:

1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。

2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。

3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。

渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。

第二个参数则是消息对象,这里的“msg”则是消息内容。

有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。

Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:

清单 7. Cometd 前端批量发布消息
 // 方法 1  cometd.batch(function()  {     cometd.publish('/channel1', { product: 'foo' });     cometd.publish('/channel2', { notificationType: 'all' });     cometd.publish('/channel3', { update: false });  });  // 方法 2  cometd.startBatch()  cometd.publish('/channel1', { product: 'foo' });  cometd.publish('/channel2', { notificationType: 'all' });  cometd.publish('/channel3', { update: false });  cometd.endBatch()

上述两种方案都可以实现消息的批量发送,推荐使用方法 1。

接下来我们看看服务端的消息推送:

清单 8. Cometd 前端订阅消息
 dojox.cometd.subscribe("/service/echo",echoRpcReturn);  function echoRpcReturn(msg){  dojo.byId("responses").innerHTML += msg;  }

这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。

这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。


Dojo 的 Cometd 工具包之后端

Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。

先来看看 web.xml 的配置参数:

清单 9. 基本配置参数(web.xml)
               cometd           org.cometd.server.continuation.ContinuationCometdServlet                        timeout             60000                            cometd         /cometd/*                   cross-origin         org.eclipse.jetty.servlets.CrossOriginFilter                   cross-origin         /cometd/*       

这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:

Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。

Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。

maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。

logLevel:日志级别。 “0 = warn, 1 = info, 2 = debug”。

以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。

接下来我们再来看看一些高级配置参数:

清单 10. 高级配置参数(web.xml)
   cometd  org.cometd.java.annotation.AnnotationCometdServlet    logLevel  1      services  org.cometd.examples.ChatService    1      cometd  /cometd/*      cometdDemo  org.cometd.examples.CometdDemoServlet  2  

这里我们主要要注意三个地方:

1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。

2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。

3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。

配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:

清单 11. 服务类初始化 init
 public void init() throws ServletException  {  final BayeuxServerImpl bayeux =  (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);  if (bayeux==null)  throw new UnavailableException("No BayeuxServer!");  .................  // 创建扩展点 bayeux.addExtension(new TimesyncExtension());  bayeux.addExtension(new AcknowledgedMessagesExtension());  // 设定握手连接权限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(   GrantAuthorizer.GRANT_PUBLISH);  // 启动服务渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);  processor.process(new EchoRPC());  processor.process(new Monitor());  //processor.process(new ChatService());  bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()  {  public void configureChannel(ConfigurableServerChannel channel)  {  channel.setPersistent(true);  }  });  if (bayeux.getLogger().isDebugEnabled())  System.err.println(bayeux.dump());  .................  }

这里我们介绍三个知识点:

1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:

  • 1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:

    {ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}

    TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)

    TS:服务端收到消息的时间

  • 2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。

2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。

3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。

接下来我们来看看这些服务渠道类的具体实现:

清单 12. Echo Service 实现
 @Service("echo")public static class EchoRPC{	@Session	private ServerSession _session;	@SuppressWarnings("unused")	@Configure("/service/echo")	private void configureEcho(ConfigurableServerChannel channel)	{		channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);	}	@Listener("/service/echo")	public void doEcho(ServerSession session, ServerMessage message)	{		Map data = message.getDataAsMap();		Log.info("ECHO from "+session+" "+data);				for(int i = 0; i < 50; i++){			session.deliver(_session, message.getChannel(), data, null);		}	}}

我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。

再来看看 Monitor 类:

清单 13. Monitor Service 实现
 @Service("monitor")  public static class Monitor  {  @Listener("/meta/subscribe")  public void monitorSubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Subscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }

Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。

最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:

清单 14. ChatService 的配置
   ...............    services  org.cometd.examples.ChatService    1  

细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:

清单 15. ChatService 实现
 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _members.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new ServerSes sion.RemoveListener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/members/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Map new_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }

以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。


服务器推送技术之比较

其实有很多种方式实现服务器推送,它们各有各的优缺点:

  1. 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
  2. Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
  3. 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
  4. 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
  5. HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。 HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。

结束语

这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。

服务器推送技术和 Bayeux 协议简介

服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。

Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。

Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。 Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。 Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。

一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。


服务器推送技术的一个简单实现

基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:

1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。

2. 基于 Iframe 及 htmlfile 的流(streaming)方式。

这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。

基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。

这个示例主要实现这样一个功能:

我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。

图 1. 简单服务器推送示例 -- 内容修改前

图 1. 简单服务器推送示例 -- 内容修改前

我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。

图 2. 简单服务器推送示例 -- 内容修改

图 2. 简单服务器推送示例 -- 内容修改

接下来我们来看看示例代码吧:

清单 1. 简单服务器推送 -- 前端代码 HTML
 

这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:

清单 2. 简单服务器推送 -- 前端代码 JavaScript
 var Comet = Class.create();Comet.prototype = {	timestamp: 0,	url: './backend.php',	noerror: true,	initialize: function(){	},	connect: function(){		this.ajax = new Ajax.Request(this.url, {			method: 'get',			parameters: {				'timestamp': this.timestamp			},			onSuccess: function(transport){				var response = transport.responseText.evalJSON();				this.comet.timestamp = response['timestamp'];				this.comet.handleResponse(response);				this.comet.noerror = true;			},			onComplete: function(transport){				if (!this.comet.noerror) setTimeout(function(){					                     comet.connect()				                         }, 5000);				else 				this.comet.connect();				this.comet.noerror = false;			}		});		this.ajax.comet = this;	},	handleResponse: function(response){		$('content').innerHTML += '
' + response['msg'] + '
'; }, doRequest: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); }}var comet = new Comet();comet.connect();

我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:

“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。

相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。

所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。

我们再来看看服务端的实现,看看他是如何推送的:

清单 3. 简单服务器推送 -- 后端代码 PHP
 $filename  = dirname(__FILE__).'/data.txt';  // 将新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : '';  if ($msg != '')  {   file_put_contents($filename,$msg);   die();  }  // 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一  // 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。 $lastmodif    = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;  $currentmodif = filemtime($filename);  while ($currentmodif <= $lastmodif) // 检测文件是否被修改 {   usleep(10000); // sleep 10ms to unload the CPU   clearstatcache();   $currentmodif = filemtime($filename);  }  // 返回 JSON 数组 $response = array();  $response['msg']       = file_get_contents($filename);  $response['timestamp'] = $currentmodif;  echo json_encode($response);  flush();

我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。

现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。


Dojo 的 Cometd 工具包简介

之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢? The answer is yes. Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。

当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。

通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。


Dojo 的 Cometd 工具包之前端

接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:

清单 4. Cometd 前端初始化
 dojox.cometd.init("http://www.xxx.com/cometd");

这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:

  1. 客户端与服务端协商传输的消息类型。
  2. 如果协商成功,服务端会通知客户端具体的请求参数配置。
  3. 如果协商失败,客户端重新发起协商流程。

我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):

清单 5. Cometd 内部机制
 this.init = function(...){............	var bindArgs = {		url: this.url,		handleAs: this.handleAs,		content: { "message": dojo.toJson([props]) },		load: dojo.hitch(this,function(msg){			this._backon();			this._finishInit(msg);		}),		error: dojo.hitch(this,function(e){			this._backoff();			this._finishInit(e);		}),		timeout: this.expectedNetworkDelay	};..............	if(this._isXD){		r = dojo.io.script.get(bindArgs);	}else{		r = dojo.xhrPost(bindArgs);	}..............}this._finishInit = function(data){..................	if(successful){		........		//ajax request inside		this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,        "tunnelInit");		this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,		"tunnelCollapse");		transport.startup(da ta);	}else{		if(!this._advice || this._advice["reconnect"] != "none"){			setTimeout(dojo.hitch(this, "init", this.url, this._props),			this._interval());		}	}....................}

可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。

接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:

清单 6. Cometd 前端发布消息
 dojox.cometd.publish("/service/echo", { msg: msg });

这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。

这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:

1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。

2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。

3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。

渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。

第二个参数则是消息对象,这里的“msg”则是消息内容。

有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。

Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:

清单 7. Cometd 前端批量发布消息
 // 方法 1  cometd.batch(function()  {     cometd.publish('/channel1', { product: 'foo' });     cometd.publish('/channel2', { notificationType: 'all' });     cometd.publish('/channel3', { update: false });  });  // 方法 2  cometd.startBatch()  cometd.publish('/channel1', { product: 'foo' });  cometd.publish('/channel2', { notificationType: 'all' });  cometd.publish('/channel3', { update: false });  cometd.endBatch()

上述两种方案都可以实现消息的批量发送,推荐使用方法 1。

接下来我们看看服务端的消息推送:

清单 8. Cometd 前端订阅消息
 dojox.cometd.subscribe("/service/echo",echoRpcReturn);  function echoRpcReturn(msg){  dojo.byId("responses").innerHTML += msg;  }

这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。

这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。


Dojo 的 Cometd 工具包之后端

Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。

先来看看 web.xml 的配置参数:

清单 9. 基本配置参数(web.xml)
               cometd           org.cometd.server.continuation.ContinuationCometdServlet                        timeout             60000                            cometd         /cometd/*                   cross-origin         org.eclipse.jetty.servlets.CrossOriginFilter                   cross-origin         /cometd/*       

这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:

Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。

Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。

maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。

logLevel:日志级别。 “0 = warn, 1 = info, 2 = debug”。

以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。

接下来我们再来看看一些高级配置参数:

清单 10. 高级配置参数(web.xml)
   cometd  org.cometd.java.annotation.AnnotationCometdServlet    logLevel  1      services  org.cometd.examples.ChatService    1      cometd  /cometd/*      cometdDemo  org.cometd.examples.CometdDemoServlet  2  

这里我们主要要注意三个地方:

1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。

2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。

3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。

配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:

清单 11. 服务类初始化 init
 public void init() throws ServletException  {  final BayeuxServerImpl bayeux =  (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);  if (bayeux==null)  throw new UnavailableException("No BayeuxServer!");  .................  // 创建扩展点 bayeux.addExtension(new TimesyncExtension());  bayeux.addExtension(new AcknowledgedMessagesExtension());  // 设定握手连接权限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(   GrantAuthorizer.GRANT_PUBLISH);  // 启动服务渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);  processor.process(new EchoRPC());  processor.process(new Monitor());  //processor.process(new ChatService());  bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()  {  public void configureChannel(ConfigurableServerChannel channel)  {  channel.setPersistent(true);  }  });  if (bayeux.getLogger().isDebugEnabled())  System.err.println(bayeux.dump());  .................  }

这里我们介绍三个知识点:

1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:

  • 1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:

    {ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}

    TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)

    TS:服务端收到消息的时间

  • 2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。

2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。

3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。

接下来我们来看看这些服务渠道类的具体实现:

清单 12. Echo Service 实现
 @Service("echo")public static class EchoRPC{	@Session	private ServerSession _session;	@SuppressWarnings("unused")	@Configure("/service/echo")	private void configureEcho(ConfigurableServerChannel channel)	{		channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);	}	@Listener("/service/echo")	public void doEcho(ServerSession session, ServerMessage message)	{		Map data = message.getDataAsMap();		Log.info("ECHO from "+session+" "+data);				for(int i = 0; i < 50; i++){			session.deliver(_session, message.getChannel(), data, null);		}	}}

我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。

再来看看 Monitor 类:

清单 13. Monitor Service 实现
 @Service("monitor")  public static class Monitor  {  @Listener("/meta/subscribe")  public void monitorSubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Subscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }

Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。

最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:

清单 14. ChatService 的配置
   ...............    services  org.cometd.examples.ChatService    1  

细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:

清单 15. ChatService 实现
 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _members.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new ServerSessio n.RemoveListener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/members/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Mapnew_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }

以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。


服务器推送技术之比较

其实有很多种方式实现服务器推送,它们各有各的优缺点:

  1. 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
  2. Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
  3. 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
  4. 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
  5. HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。 HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。

结束语

这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。

服务器推送技术和 Bayeux 协议简介

服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。

Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。

Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。 Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。 Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。

一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。


服务器推送技术的一个简单实现

基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:

1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。

2. 基于 Iframe 及 htmlfile 的流(streaming)方式。

这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。

基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。

这个示例主要实现这样一个功能:

我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。

图 1. 简单服务器推送示例 -- 内容修改前

图 1. 简单服务器推送示例 -- 内容修改前

我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。

图 2. 简单服务器推送示例 -- 内容修改

图 2. 简单服务器推送示例 -- 内容修改

接下来我们来看看示例代码吧:

清单 1. 简单服务器推送 -- 前端代码 HTML
 

这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:

清单 2. 简单服务器推送 -- 前端代码 JavaScript
 var Comet = Class.create();Comet.prototype = {	timestamp: 0,	url: './backend.php',	noerror: true,	initialize: function(){	},	connect: function(){		this.ajax = new Ajax.Request(this.url, {			method: 'get',			parameters: {				'timestamp': this.timestamp			},			onSuccess: function(transport){				var response = transport.responseText.evalJSON();				this.comet.timestamp = response['timestamp'];				this.comet.handleResponse(response);				this.comet.noerror = true;			},			onComplete: function(transport){				if (!this.comet.noerror) setTimeout(function(){					                     comet.connect()				                         }, 5000);				else 				this.comet.connect();				this.comet.noerror = false;			}		});		this.ajax.comet = this;	},	handleResponse: function(response){		$('content').innerHTML += '
' + response['msg'] + '
'; }, doRequest: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); }}var comet = new Comet();comet.connect();

我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:

“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。

相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。

所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。

我们再来看看服务端的实现,看看他是如何推送的:

清单 3. 简单服务器推送 -- 后端代码 PHP
 $filename  = dirname(__FILE__).'/data.txt';  // 将新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : '';  if ($msg != '')  {   file_put_contents($filename,$msg);   die();  }  // 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一  // 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。 $lastmodif    = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;  $currentmodif = filemtime($filename);  while ($currentmodif <= $lastmodif) // 检测文件是否被修改 {   usleep(10000); // sleep 10ms to unload the CPU   clearstatcache();   $currentmodif = filemtime($filename);  }  // 返回 JSON 数组 $response = array();  $response['msg']       = file_get_contents($filename);  $response['timestamp'] = $currentmodif;  echo json_encode($response);  flush();

我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。

现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。


Dojo 的 Cometd 工具包简介

之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢? The answer is yes. Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。

当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。

通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。


Dojo 的 Cometd 工具包之前端

接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:

清单 4. Cometd 前端初始化
 dojox.cometd.init("http://www.xxx.com/cometd");

这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:

  1. 客户端与服务端协商传输的消息类型。
  2. 如果协商成功,服务端会通知客户端具体的请求参数配置。
  3. 如果协商失败,客户端重新发起协商流程。

我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):

清单 5. Cometd 内部机制
 this.init = function(...){............	var bindArgs = {		url: this.url,		handleAs: this.handleAs,		content: { "message": dojo.toJson([props]) },		load: dojo.hitch(this,function(msg){			this._backon();			this._finishInit(msg);		}),		error: dojo.hitch(this,function(e){			this._backoff();			this._finishInit(e);		}),		timeout: this.expectedNetworkDelay	};..............	if(this._isXD){		r = dojo.io.script.get(bindArgs);	}else{		r = dojo.xhrPost(bindArgs);	}..............}this._finishInit = function(data){..................	if(successful){		........		//ajax request inside		this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,        "tunnelInit");		this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,		"tunnelCollapse");		transport.startup(data);	}else{		if(!this._advice || this._advice["reconnect"] != "none"){			setTimeout(dojo.hitch(this, "init", this.url, this._props),			this._interval());		}	}....................}

可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。

接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:

清单 6. Cometd 前端发布消息
 dojox.cometd.publish("/service/echo", { msg: msg });

这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。

这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:

1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。

2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。

3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。

渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。

第二个参数则是消息对象,这里的“msg”则是消息内容。

有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。

Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:

清单 7. Cometd 前端批量发布消息
 // 方法 1  cometd.batch(function()  {     cometd.publish('/channel1', { product: 'foo' });     cometd.publish('/channel2', { notificationType: 'all' });     cometd.publish('/channel3', { update: false });  });  // 方法 2  cometd.startBatch()  cometd.publish('/channel1', { product: 'foo' });  cometd.publish('/channel2', { notificationType: 'all' });  cometd.publish('/channel3', { update: false });  cometd.endBatch()

上述两种方案都可以实现消息的批量发送,推荐使用方法 1。

接下来我们看看服务端的消息推送:

清单 8. Cometd 前端订阅消息
 dojox.cometd.subscribe("/service/echo",echoRpcReturn);  function echoRpcReturn(msg){  dojo.byId("responses").innerHTML += msg;  }

这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。

这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。


Dojo 的 Cometd 工具包之后端

Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。

先来看看 web.xml 的配置参数:

清单 9. 基本配置参数(web.xml)
               cometd           org.cometd.server.continuation.ContinuationCometdServlet                        timeout             60000                            cometd         /cometd/*                   cross-origin         org.eclipse.jetty.servlets.CrossOriginFilter                   cross-origin         /cometd/*       

这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:

Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。

Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。

maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。

logLevel:日志级别。 “0 = warn, 1 = info, 2 = debug”。

以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。

接下来我们再来看看一些高级配置参数:

清单 10. 高级配置参数(web.xml)
   cometd  org.cometd.java.annotation.AnnotationCometdServlet    logLevel  1      services  org.cometd.examples.ChatService    1      cometd  /cometd/*      cometdDemo  org.cometd.examples.CometdDemoServlet  2  

这里我们主要要注意三个地方:

1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。

2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。

3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。

配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:

清单 11. 服务类初始化 init
 public void init() throws ServletException  {  final BayeuxServerImpl bayeux =  (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);  if (bayeux==null)  throw new UnavailableException("No BayeuxServer!");  .................  // 创建扩展点 bayeux.addExtension(new TimesyncExtension());  bayeux.addExtension(new AcknowledgedMessagesExtension());  // 设定握手连接权限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(   GrantAuthorizer.GRANT_PUBLISH);  // 启动服务渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);  processor.process(new EchoRPC());  processor.process(new Monitor());  //processor.process(new ChatService());  bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()  {  public void configureChannel(ConfigurableServerChannel channel)  {  channel.setPersistent(true);  }  });  if (bayeux.getLogger().isDebugEnabled())  System.err.println(bayeux.dump());  .................  }

这里我们介绍三个知识点:

1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:

  • 1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:

    {ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}

    TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)

    TS:服务端收到消息的时间

  • 2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。

2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。

3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。

接下来我们来看看这些服务渠道类的具体实现:

清单 12. Echo Service 实现
 @Service("echo")public static class EchoRPC{	@Session	private ServerSession _session;	@SuppressWarnings("unused")	@Configure("/service/echo")	private void configureEcho(ConfigurableServerChannel channel)	{		channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);	}	@Listener("/service/echo")	public void doEcho(ServerSession session, ServerMessage message)	{		Map data = message.getDataAsMap();		Log.info("ECHO from "+session+" "+data);				for(int i = 0; i < 50; i++){			session.deliver(_session, message.getChannel(), data, null);		}	}}

我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。

再来看看 Monitor 类:

清单 13. Monitor Service 实现
 @Service("monitor")  public static class Monitor  {  @Listener("/meta/subscribe")  public void monitorSubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Subscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }

Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。

最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:

清单 14. ChatService 的配置
   ...............    services  org.cometd.examples.ChatService    1  

细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:

清单 15. ChatService 实现
 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _members.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new ServerSession.R emoveListener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/members/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Mapnew_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }

以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。


服务器推送技术之比较

其实有很多种方式实现服务器推送,它们各有各的优缺点:

  1. 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
  2. Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
  3. 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
  4. 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
  5. HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。 HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。

结束语

这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。

服务器推送技术和 Bayeux 协议简介

服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。

Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。

Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。 Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。 Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。

一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。


服务器推送技术的一个简单实现

基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:

1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。

2. 基于 Iframe 及 htmlfile 的流(streaming)方式。

这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。

基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。

这个示例主要实现这样一个功能:

我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。

图 1. 简单服务器推送示例 -- 内容修改前

图 1. 简单服务器推送示例 -- 内容修改前

我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。

图 2. 简单服务器推送示例 -- 内容修改

图 2. 简单服务器推送示例 -- 内容修改

接下来我们来看看示例代码吧:

清单 1. 简单服务器推送 -- 前端代码 HTML
 

这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:

清单 2. 简单服务器推送 -- 前端代码 JavaScript
 var Comet = Class.create();Comet.prototype = {	timestamp: 0,	url: './backend.php',	noerror: true,	initialize: function(){	},	connect: function(){		this.ajax = new Ajax.Request(this.url, {			method: 'get',			parameters: {				'timestamp': this.timestamp			},			onSuccess: function(transport){				var response = transport.responseText.evalJSON();				this.comet.timestamp = response['timestamp'];				this.comet.handleResponse(response);				this.comet.noerror = true;			},			onComplete: function(transport){				if (!this.comet.noerror) setTimeout(function(){					                     comet.connect()				                         }, 5000);				else 				this.comet.connect();				this.comet.noerror = false;			}		});		this.ajax.comet = this;	},	handleResponse: function(response){		$('content').innerHTML += '
' + response['msg'] + '
'; }, do Request: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); }}var comet = new Comet();comet.connect();

我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:

“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。

相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。

所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。

我们再来看看服务端的实现,看看他是如何推送的:

清单 3. 简单服务器推送 -- 后端代码 PHP
 $filename  = dirname(__FILE__).'/data.txt';  // 将新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : '';  if ($msg != '')  {   file_put_contents($filename,$msg);   die();  }  // 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一  // 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。 $lastmodif    = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;  $currentmodif = filemtime($filename);  while ($currentmodif <= $lastmodif) // 检测文件是否被修改 {   usleep(10000); // sleep 10ms to unload the CPU   clearstatcache();   $currentmodif = filemtime($filename);  }  // 返回 JSON 数组 $response = array();  $response['msg']       = file_get_contents($filename);  $response['timestamp'] = $currentmodif;  echo json_encode($response);  flush();

我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。

现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。


Dojo 的 Cometd 工具包简介

之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢? The answer is yes. Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。

当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。

通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。


Dojo 的 Cometd 工具包之前端

接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:

清单 4. Cometd 前端初始化
 dojox.cometd.init("http://www.xxx.com/cometd");

这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:

  1. 客户端与服务端协商传输的消息类型。
  2. 如果协商成功,服务端会通知客户端具体的请求参数配置。
  3. 如果协商失败,客户端重新发起协商流程。

我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):

清单 5. Cometd 内部机制
 this.init = function(...){............	var bindArgs = {		url: this.url,		handleAs: this.handleAs,		content: { "message": dojo.toJson([props]) },		load: dojo.hitch(this,function(msg){			this._backon();			this._finishInit(msg);		}),		error: dojo.hitch(this,function(e){			this._backoff();			this._finishInit(e);		}),		timeout: this.expectedNetworkDelay	};..............	if(this._isXD){		r = dojo.io.script.get(bindArgs);	}else{		r = dojo.xhrPost(bindArgs);	}..............}this._finishInit = function(data){..................	if(successful){		........		//ajax request inside		this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,        "tunnelInit");		this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,		"tunnelCollapse");		transport.startup(data);	}else {		if(!this._advice || this._advice["reconnect"] != "none"){			setTimeout(dojo.hitch(this, "init", this.url, this._props),			this._interval());		}	}....................}

可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。

接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:

清单 6. Cometd 前端发布消息
 dojox.cometd.publish("/service/echo", { msg: msg });

这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。

这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:

1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。

2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。

3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。

渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。

第二个参数则是消息对象,这里的“msg”则是消息内容。

有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。

Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:

清单 7. Cometd 前端批量发布消息
 // 方法 1  cometd.batch(function()  {     cometd.publish('/channel1', { product: 'foo' });     cometd.publish('/channel2', { notificationType: 'all' });     cometd.publish('/channel3', { update: false });  });  // 方法 2  cometd.startBatch()  cometd.publish('/channel1', { product: 'foo' });  cometd.publish('/channel2', { notificationType: 'all' });  cometd.publish('/channel3', { update: false });  cometd.endBatch()

上述两种方案都可以实现消息的批量发送,推荐使用方法 1。

接下来我们看看服务端的消息推送:

清单 8. Cometd 前端订阅消息
 dojox.cometd.subscribe("/service/echo",echoRpcReturn);  function echoRpcReturn(msg){  dojo.byId("responses").innerHTML += msg;  }

这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。

这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。


Dojo 的 Cometd 工具包之后端

Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。

先来看看 web.xml 的配置参数:

清单 9. 基本配置参数(web.xml)
               cometd           org.cometd.server.continuation.ContinuationCometdServlet                        timeout             60000                            cometd         /cometd/*                   cross-origin         org.eclipse.jetty.servlets.CrossOriginFilter                   cross-origin         /cometd/*       

这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:

Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。

Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。

maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。

logLevel:日志级别。 “0 = warn, 1 = info, 2 = debug”。

以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。

接下来我们再来看看一些高级配置参数:

清单 10. 高级配置参数(web.xml)
   cometd  org.cometd.java.annotation.AnnotationCometdServlet    logLevel  1      services  org.cometd.examples.ChatService    1      cometd  /cometd/*      cometdDemo  org.cometd.examples.CometdDemoServlet  2  

这里我们主要要注意三个地方:

1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。

2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。

3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。

配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:

清单 11. 服务类初始化 init
 public void init() throws ServletException  {  final BayeuxServerImpl bayeux =  (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);  if (bayeux==null)  throw new UnavailableException("No BayeuxServer!");  .................  // 创建扩展点 bayeux.addExtension(new TimesyncExtension());  bayeux.addExtension(new AcknowledgedMessagesExtension());  // 设定握手连接权限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(   GrantAuthorizer.GRANT_PUBLISH);  // 启动服务渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);  processor.process(new EchoRPC());  processor.process(new Monitor());  //processor.process(new ChatService());  bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()  {  public void configureChannel(ConfigurableServerChannel channel)  {  channel.setPersistent(true);  }  });  i f (bayeux.getLogger().isDebugEnabled())  System.err.println(bayeux.dump());  .................  }

这里我们介绍三个知识点:

1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:

  • 1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:

    {ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}

    TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)

    TS:服务端收到消息的时间

  • 2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。

2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。

3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。

接下来我们来看看这些服务渠道类的具体实现:

清单 12. Echo Service 实现
 @Service("echo")public static class EchoRPC{	@Session	private ServerSession _session;	@SuppressWarnings("unused")	@Configure("/service/echo")	private void configureEcho(ConfigurableServerChannel channel)	{		channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);	}	@Listener("/service/echo")	public void doEcho(ServerSession session, ServerMessage message)	{		Map data = message.getDataAsMap();		Log.info("ECHO from "+session+" "+data);				for(int i = 0; i < 50; i++){			session.deliver(_session, message.getChannel(), data, null);		}	}}

我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。

再来看看 Monitor 类:

清单 13. Monitor Service 实现
 @Service("monitor")  public static class Monitor  {  @Listener("/meta/subscribe")  public void monitorSubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Subscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }

Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。

最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:

清单 14. ChatService 的配置
   ...............    services  org.cometd.examples.ChatService    1  

细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:

清单 15. ChatService 实现
 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _members.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new ServerSession.Remo veListener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/members/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Mapnew_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }

以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。


服务器推送技术之比较

其实有很多种方式实现服务器推送,它们各有各的优缺点:

  1. 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
  2. Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
  3. 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
  4. 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
  5. HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。 HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。

结束语

这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。

服务器推送技术和 Bayeux 协议简介

服务器推送技术的基础思想是将浏览器主动查询信息改为服务器主动发送信息。服务器发送一批数据,浏览器显示这些数据,同时保证与服务器的连接。当服务器需要再次发送一批数据时,浏览器显示数据并保持连接。以后,服务器仍然可以发送批量数据,浏览器继续显示数据,依次类推。基于这种思想,这里我们要引出 Bayeux 协议。

Bayeux 是一套基于 Publish / Subscribe 模式,以 JSON 格式在浏览器与服务器之间传输事件的通信协议。该协议规定了浏览器与服务器之问的双向通信机制,克服了传统 Web 通信模式的缺点。

Bayeux 协议主要基于 HTTP 来传输低延迟的、异步的事件消息。这些消息通过频道 (Channels) 来投递,能够实现从服务器端到客户端、从客户端到服务器端或者通过服务器从一个客户端到另一个客户端的传送。 Bayeux 协议的主要目的是为使用了 Ajax 和 Comet 技术的 Web 客户端实现高响应的用户交互。 Bayeux 协议旨在通过允许执行者更容易的实现互操作性,来降低开发 Comet 应用程序的复杂性。它解决了共同的消息发布和路由问题,并提供了渐进式的改进和扩展机制。

一般情况下,在 HTTP 协议中,Client 要想获得 Server 的消息,必须先自己发送一个 Request,然后 Server 才会给予 Response。而 Bayeux 协议改变了这个情况,它允许 Server 端异步 Push 自己的消息到 Client 端。从而实现了 Client 和 Server 之间的双向操作模式。


服务器推送技术的一个简单实现

基于 Bayeux 协议实现服务器推送技术的方式有很多,可以通过 Flex 或者 Java 的 Applet。基于这两种技术,我们可以建立在客户端建立服务套接字接口,“双向操作模式”自然很容易实现,但是这些方式需要除浏览器以外的运行环境的支持。这里我们希望能采用一种纯脚本的方式,这种方式是不可能建立服务套接字接口的,那如何实现基于 Bayeux 协议的服务器推送呢?其实是可以模拟实现的,主要有两种方式:

1. 基于 HTTP 的长轮询来进行消息通信(基于 Ajax 的长轮询(long-polling)方式)。

2. 基于 Iframe 及 htmlfile 的流(streaming)方式。

这里我们采用第一种方式实现,即:客户端先向服务器端发送一个 HTTP Request,服务器端接收到后,阻塞在那边,等服务器有消息的时候,则返回一个 HTTP Response 给客户端,客户端收到后,断开连接,紧接着再发第二个 HTTP Request,以此反复进行,保持这个“长轮询”。期间,如果连接超时,那么会断开重连,以保持连接。

基于以上的思想,我们来看一下一个简单的实现,这个简单实现是基于 PHP 的。示例很简单,即便没用过 PHP 也能够很容易看明白,而且我们会在后面一一作出解释。

这个示例主要实现这样一个功能:

我们在浏览器里面分别打开三个窗口,并访问同一张页面。修改其中一个页面上的内容,另外两个页面上的内容也随即发生变化(注意:这里不用刷新页面)。这就会给我们一种:数据是服务器推送过来的感觉。

图 1. 简单服务器推送示例 -- 内容修改前

图 1. 简单服务器推送示例 -- 内容修改前

我们修改其中第一个窗口(左上)的内容(输入“222”,点击“Send”按钮,发送到后台)。此时不仅第一个窗口的内容变化了,其余两个窗口的内容也随即变化。

图 2. 简单服务器推送示例 -- 内容修改

图 2. 简单服务器推送示例 -- 内容修改

接下来我们来看看示例代码吧:

清单 1. 简单服务器推送 -- 前端代码 HTML
 

这个是我们所看到的输入框和提交按钮,大家可以注意一下它的“onsubmit”方法:当我们输入内容并点击提交时,它会执行“comet.doRequest($('word').value)”方法向后端发起请求(其实在这之前我们就已经建立了与服务端的长轮询并可随时开始服务器推送数据)。接下来我们来看看这个“comet”是什么样子的以及他的 Request 的具体实现:

清单 2. 简单服务器推送 -- 前端代码 JavaScript
 var Comet = Class.create();Comet.prototype = {	timestamp: 0,	url: './backend.php',	noerror: true,	initialize: function(){	},	connect: function(){		this.ajax = new Ajax.Request(this.url, {			method: 'get',			parameters: {				'timestamp': this.timestamp			},			onSuccess: function(transport){				var response = transport.responseText.evalJSON();				this.comet.timestamp = response['timestamp'];				this.comet.handleResponse(response);				this.comet.noerror = true;			},			onComplete: function(transport){				if (!this.comet.noerror) setTimeout(function(){					                     comet.connect()				                         }, 5000);				else 				this.comet.connect();				this.comet.noerror = false;			}		});		this.ajax.comet = this;	},	handleResponse: function(response){		$('content').innerHTML += '
' + response['msg'] + '
'; }, doReq uest: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); }}var comet = new Comet();comet.connect();

我们先看最后两段代码,这里是页面初始化时会执行的代码,其实在这里,我们就建立了一服务端的长轮询,我们来看看“connect”方法的实现吧:

“connect”方法这里是发了一个 Ajax 请求,然后分别设定了成功时(onSuccess)的返回处理和请求完成时(onComplete)的处理(注意 onComplete 不论成功失败都会执行)。我们要挂住这里的 onComplete 方法。可以看到,当请求完成时,如果连接有问题,它会过 5 秒重新连接,;如果没有问题,他会立即重新连接。

相信大家看到这里应该会有点眉目了,这里其实没有什么所谓的恒定不断的连接(类似 TCP 方式),它的真正实现是通过不断的 Ajax 请求实现的。

所以,当我们开启 3 个窗口时,其实我们打开了 3 个模拟的不间断的客户端与服务端的连接,所以他们会即时解到服务端的信息,不需要刷新页面。

我们再来看看服务端的实现,看看他是如何推送的:

清单 3. 简单服务器推送 -- 后端代码 PHP
 $filename  = dirname(__FILE__).'/data.txt';  // 将新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : '';  if ($msg != '')  {   file_put_contents($filename,$msg);   die();  }  // 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一  // 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。 $lastmodif    = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;  $currentmodif = filemtime($filename);  while ($currentmodif <= $lastmodif) // 检测文件是否被修改 {   usleep(10000); // sleep 10ms to unload the CPU   clearstatcache();   $currentmodif = filemtime($filename);  }  // 返回 JSON 数组 $response = array();  $response['msg']       = file_get_contents($filename);  $response['timestamp'] = $currentmodif;  echo json_encode($response);  flush();

我们可以参照上面的注释理解该代码,其实并不需要多少 PHP 的知识。服务端推送技术不是一个开发用的控件库,而是一个思想。这里的 while 循环便说明了服务端推送是如何保留所谓的“长轮询”的。

现在大家应该明白为什么三个窗口会同步变化了。其主要的核心思想就是服务端“握住”长轮询,然后在适当的时候“放手”。


Dojo 的 Cometd 工具包简介

之前我们是基于 JavaScript 自己实现了一个简单的 Cometd 应用,我们花了大量的代码来建立一个 Cometd 框架,真正用于处理我们自己的业务逻辑的代码其实就是“handleResponse”里面的那一行。我们能不能吧这些通用的代码省掉呢? The answer is yes. Dojo 已经对 Cometd 做了封装,基于 Dojo 的 Cometd 包,我们不用再浪费大量的代码在搭建 Cometd 框架上。对于前端脚本代码,我们只需要加上一个 Cometd 包的简单接口代码,便可以开始加入我们自己的业务逻辑代码了。

当然,Dojo 的 Cometd 包还包括后端的代码,可以在 Dojo 的官网下载中找到,它不与 Dojo 包一起发布,是一个单独的服务端开源代码,基于 Java 和 Jetty 的,有兴趣的读者可以下载下来研究一下。

通过 Dojo 的这两部分代码,我们便可以迅速地搭建我们的 Cometd 框架,我们剩下需要做的就是加入我们的业务逻辑。


Dojo 的 Cometd 工具包之前端

接下来我们来看看 Dojo 的 Cometd 工具包的前端封装:

清单 4. Cometd 前端初始化
 dojox.cometd.init("http://www.xxx.com/cometd");

这个接口用于建立并初始化与服务端的握手连接(Bayeux handshake,初始化了“Bayeux communication” 消息通讯)。建立这个连接是基于 Bayeux 协议的,它主要有两个任务:

  1. 客户端与服务端协商传输的消息类型。
  2. 如果协商成功,服务端会通知客户端具体的请求参数配置。
  3. 如果协商失败,客户端重新发起协商流程。

我们深入 Dojo 的 init 方法内部可以看到握手连接的具体实现过程,它的实现也是不间断的重复发送客户端的 Ajax 请求,与我们之前的自制案类似,有兴趣的同学可以参考如下代码(摘取部分):

清单 5. Cometd 内部机制
 this.init = function(...){............	var bindArgs = {		url: this.url,		handleAs: this.handleAs,		content: { "message": dojo.toJson([props]) },		load: dojo.hitch(this,function(msg){			this._backon();			this._finishInit(msg);		}),		error: dojo.hitch(this,function(e){			this._backoff();			this._finishInit(e);		}),		timeout: this.expectedNetworkDelay	};..............	if(this._isXD){		r = dojo.io.script.get(bindArgs);	}else{		r = dojo.xhrPost(bindArgs);	}..............}this._finishInit = function(data){..................	if(successful){		........		//ajax request inside		this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,        "tunnelInit");		this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,		"tunnelCollapse");		transport.startup(data);	}else{		if (!this._advice || this._advice["reconnect"] != "none"){			setTimeout(dojo.hitch(this, "init", this.url, this._props),			this._interval());		}	}....................}

可见,它们的 callback 方法里面都带有对自己本身的调用,这里的”init“方法也不例外。细心的读者可能还会发现,其实从例子上可以看出:Dojo 的 Cometd 也支持跨域,它的跨域是通过“script”的方式实现的。这里有一点需要大家了解,我们默认的服务端推送实现方式是长轮询(long-polling)模式,遇到跨域时,“long-polling”便不再适用,转为基于“script”的返回调用(callback-polling)模式。

接下来我们再来看看 Cometd 中关于消息推送的一些接口,这些消息通讯主要是基于渠道:

清单 6. Cometd 前端发布消息
 dojox.cometd.publish("/service/echo", { msg: msg });

这里的所谓“发布消息”就是向后端发送消息,用于前端主动向后端推送。

这里的第一个参数是发送消息的渠道标识(channel),这种“channel”共有三种类型:

1. 元渠道(meta channels):示例“/meta/connect”(通常以“/meta/”为开头)。元渠道主要不是用来消息传输,而是用于客户端监听,如握手连接或者网络连接等等的错误。通常我们会在客户端调用“addListener()”来开启监听元渠道,它可以在握手连接的建立之前就开启监听,而且这种消息监听是同步的。

2. 服务渠道(service channels):示例“/service/connect”(通常以“/service/”为开头)。它主要用于私有消息通讯,主要是一对一的通讯。通常我们会在客户端调用“subscribe()”来订阅服务渠道消息。服务渠道只有等握手连接建立好后才能开启,而且它是异步通讯的。

3. 普通渠道(normal channels):示例“/foo/bar”(无限制)。这种渠道没有什么限制,主要用于广播消息,即:多个客户端订阅了一个服务,该服务可以通过普通渠道进行消息广播。

渠道是通信的基础模式,我们可以根据需要选择相应的渠道模式。

第二个参数则是消息对象,这里的“msg”则是消息内容。

有一点要注意:这里的“publish”是基于 Bayeux 协议的,采用的异步消息传输机制,所以它是在服务端(Bayeux 服务器)收到消息之前就返回的。所以 publish 的返回并不代表服务端收到你 publish 的消息了。

Dojo 的 Cometd 还支持批量发送消息,通过这个接口可以有效地避免不必要的网络消息传输的浪费:

清单 7. Cometd 前端批量发布消息
 // 方法 1  cometd.batch(function()  {     cometd.publish('/channel1', { product: 'foo' });     cometd.publish('/channel2', { notificationType: 'all' });     cometd.publish('/channel3', { update: false });  });  // 方法 2  cometd.startBatch()  cometd.publish('/channel1', { product: 'foo' });  cometd.publish('/channel2', { notificationType: 'all' });  cometd.publish('/channel3', { update: false });  cometd.endBatch()

上述两种方案都可以实现消息的批量发送,推荐使用方法 1。

接下来我们看看服务端的消息推送:

清单 8. Cometd 前端订阅消息
 dojox.cometd.subscribe("/service/echo",echoRpcReturn);  function echoRpcReturn(msg){  dojo.byId("responses").innerHTML += msg;  }

这里所谓的“订阅消息”,其实就是接收服务端推送的消息,是后端主动向前端推送。这也是服务端推送的精华所在,同样也是很简单的一行代码。

这里我们看到了一个熟悉的方法 --- “subscribe”,之前我们已经介绍过了,它主要用于订阅服务渠道私有消息,这里就是它用法的一个示例。对应的服务端 Service 向对应的前端订阅者推送消息,这里就是通过“echo”渠道向前端推送消息,他会回调“echoRpcReturn”方法,并传入推送的消息作为实参。对于后端的每次推送,都会调用前端的“echoRpcReturn”方法。


Dojo 的 Cometd 工具包之后端

Dojo 的 Cometd 工具包的后端实现是基于 Java 和 Jetty 组件的,通过 Dojo 的服务端 Cometd 组件,我们同样能极其迅速的构建 Cometd 框架。我们需要做的仅仅是加入我们的业务逻辑代码即可。

先来看看 web.xml 的配置参数:

清单 9. 基本配置参数(web.xml)
               cometd           org.cometd.server.continuation.ContinuationCometdServlet                        timeout             60000                            cometd         /cometd/*                   cross-origin         org.eclipse.jetty.servlets.CrossOriginFilter                   cross-origin         /cometd/*       

这里我们先来看看“ContinuationCometdServlet”,这个 Servlet 主要用于解释 Bayeux 协议,所以关于它的配置是必须的。基于“ContinuationCometdServlet”的其他配置参数还有很多,如:

Timeout:长轮询的过期时间。如果超过这个时间还没有客户端消息,服务端会推送一个空消息。

Interval:轮询间隔时间。客户端结束前一个请求到发送下一个请求之间的间隔时间。

maxInterval:服务端最长等待时间。即:建立连接时,如果超过这个时间仍没有接到一个新的长轮询连接请求,服务端就会认为该客户端无效或者关闭了。

logLevel:日志级别。 “0 = warn, 1 = info, 2 = debug”。

以上是主要的配置参数,其余的配置参数还有很多,这里不一一介绍,有需要的读者可以查阅 Dojo 的帮助文档。另外,最后几行我们还配置了一个“cross-origin”,对应着“CrossOriginFilter”类,他用于支持跨域的 JavaScript 请求,如果您的项目中要支持跨域的服务器推送,请加入该配置。

接下来我们再来看看一些高级配置参数:

清单 10. 高级配置参数(web.xml)
   cometd  org.cometd.java.annotation.AnnotationCometdServlet    logLevel  1      services  org.cometd.examples.ChatService    1      cometd  /cometd/*      cometdDemo  org.cometd.examples.CometdDemoServlet  2  

这里我们主要要注意三个地方:

1. “CometdDemoServlet”:它是用于启动服务端 Cometd 框架的 Servlet,我们在后面会介绍。由于他配置了“load-on-startup”参数,所以在服务容器启动的时候,我们的 Cometd 服务端就已经搭建好了,之后我们会着重介绍他的“init”方法中的行为。

2. “AnnotationCometdServlet”:这个 Servlet 配置在这里表示了我们在服务端代码是基于 annotation 的。这是一个非常实用的 Servlet,通过这个 Servlet,你会发现,我们要做的事情仅仅是定义几个 Service 类,实现其中的几个方法即可。连很多调用 Cometd 框架 API 接口的代码都省去了。

3. “ChatService”:这里声明了一个 Service 类,他的用途是处理服务渠道的消息。这里声明的作用等同于代码中的“processor.process(new ChatService())”。

配置完成后,我们接下来可以看看代码了。通过以上的配置之后,你会发现,我们接下来要写的代码非常简单精炼:

清单 11. 服务类初始化 init
 public void init() throws ServletException  {  final BayeuxServerImpl bayeux =  (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);  if (bayeux==null)  throw new UnavailableException("No BayeuxServer!");  .................  // 创建扩展点 bayeux.addExtension(new TimesyncExtension());  bayeux.addExtension(new AcknowledgedMessagesExtension());  // 设定握手连接权限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(   GrantAuthorizer.GRANT_PUBLISH);  // 启动服务渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);  processor.process(new EchoRPC());  processor.process(new Monitor());  //processor.process(new ChatService());  bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()  {  public void configureChannel(ConfigurableServerChannel channel)  {  channel.setPersistent(true);  }  });  if ( bayeux.getLogger().isDebugEnabled())  System.err.println(bayeux.dump());  .................  }

这里我们介绍三个知识点:

1. Extension:Extension 是一个函数,它会在消息发出之前或者收到之后被调用,专门用来修改消息内容,例如加入一些特殊属性(这些属性多在消息的 ext 属性中)。注意,这些属性大多是应用无关的,如记录长轮询的次数等等。这里的“TimesyncExtension”和“AcknowledgedMessagesExtension”是两个比较常用的 Extension:

  • 1) “Timesync Extension”用于计算客户端事件和服务端时间的偏差。客户端需要同时引入“dojox.cometd.timesync”类,该 Extension 使得客户端和服务端在每次握手或者连接的时候能够互相交换各自的时钟信息,这也是的客户端可以很精确的计算出他与服务端时钟的偏移量。消息格式如下:

    {ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}

    TC:客户端发消息的时间(距离 1970 年 1 月号的时长,单位为毫秒)

    TS:服务端收到消息的时间

  • 2) “Acknowledge Extension”用于提供可靠的顺序消息机制。一旦加入了“Acknowledge Extension”,服务端会阻截非长轮询的客户端请求,这样会使你的服务器更加的高效。注意:客户端需要同时引入“dojox.cometd.ack”类与其协同工作。

2. Authorizer:设定握手连接权限,这里设定值为“GrantAuthorizer.GRANT_PUBLISH”,表示允许所有客户端建立握手连接。

3. Process Service:启动服务渠道“processor.process(new EchoRPC())”。通过这些服务渠道类,我们可以启动服务渠道处理客户端请求。这是我们服务端推送技术的关键所在,我们的业务逻辑代码也是主要放在这些服务渠道类里面。

接下来我们来看看这些服务渠道类的具体实现:

清单 12. Echo Service 实现
 @Service("echo")public static class EchoRPC{	@Session	private ServerSession _session;	@SuppressWarnings("unused")	@Configure("/service/echo")	private void configureEcho(ConfigurableServerChannel channel)	{		channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);	}	@Listener("/service/echo")	public void doEcho(ServerSession session, ServerMessage message)	{		Map data = message.getDataAsMap();		Log.info("ECHO from "+session+" "+data);				for(int i = 0; i < 50; i++){			session.deliver(_session, message.getChannel(), data, null);		}	}}

我们可以在“configureEcho”里面设定该服务渠道支持的权限。我们主要来看看“doEcho”方法,它被标识为“@Listener("/service/echo")”,所以它可以用于像客户端推送服务渠道为“echo”的消息,我们之前客户端代码示例里面的如下代码:“dojox.cometd.subscribe("/service/echo",echoRpcReturn)”就是专门用于处理这里服务渠道推送的消息,消息推送通过“deliver”方法,推送的消息信息放在“data”实参中。

再来看看 Monitor 类:

清单 13. Monitor Service 实现
 @Service("monitor")  public static class Monitor  {  @Listener("/meta/subscribe")  public void monitorSubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Subscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }

Monitor 渠道类与之前的 Echo 服务渠道类比较类似,不过它主要用于处理 meta 渠道,与业务逻辑无关。

最后,我们来看看被注释掉的“ChatService”类,他也可以通过“processor.process(new ChatService())”来启用,但是我们这里用了一个更为简单的方法:直接配置在 web.xml 文件中:

清单 14. ChatService 的配置
   ...............    services  org.cometd.examples.ChatService    1  

细心的读者可能在之前的代码示例中已经看到,这里就是通过配置的方式加载服务渠道类。参考以下具体实现的代码:

清单 15. ChatService 实现
 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _members.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new ServerSession.RemoveL istener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/members/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Mapnew_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }

以上是摘录部分 ChatService 实现代码,它主要是实现一个在线的聊天室,包括公开发言和私有(1 对 1)聊天等等功能,它的实现方式与之前的 Echo 和 Monitor 类似,这里不做详述,有兴趣的读者可以参考一下他的实现,来构造自己的服务器推送应用。


服务器推送技术之比较

其实有很多种方式实现服务器推送,它们各有各的优缺点:

  1. 传统轮询:此方法是利用 HTML 里面 meta 标签的刷新功能,在一定时间间隔后进行页面的转载,以此循环往复。它的最大缺点就是页面刷性给人带来的体验很差,而且服务器的压力也会比较大。
  2. Ajax 轮询:异步响应机制,即通过不间断的客户端 Ajax 请求,去发现服务端的变化。这种方式由于是客户端主动连接的,所以会有一定程度的延时,并且服务器的压力也不小。
  3. 长连接:这也是我们之前所介绍的一种方式。由于它是利用客户端的现有连接实现服务器主动向客户端推送信息,所以延时的情况很少,并且由于服务端的可操控性使得服务器的压力也迅速减小。其实这种技术还有其他的实现方式,通过 Iframe,在页面上嵌入一个隐藏帧(Iframe),将其“src”属性指向一个长连接的请求,这样一来,服务端就能够源源不断的向客户端发送数据。这种方式的不足就在于:它会造成浏览器的进度栏一直显示没有加载完成,当然我们可以通过 Google 的一个称为“htmlfile”的 ActiveX 控件解决,但是毕竟他需要安装 ActiveX 控件,对于终端用户也是不合适的。
  4. 套接字:可以利用 Flash 的 XMLSocket 类或者 Java 的 Applet 来建立 Socket 连接,实现全双工的服务器推送,然后通过 Flash 或者 Applet 与 JavaScript 通信的接口来实现最终的数据推送。但是这种方式需要 Flash 或者 JVM 的支持,同样不太合适于终端用户。
  5. HTML5 的 WebSocket:这种方式其实与套接字一样,但是这里需要单独强调一下:它是不需要用户而外安装任何插件的。 HTML5 提供了一个 WebSocket 的 JavaScript 接口,可以直接与服务端建立 Socket 连接,实现全双工通信,这种方式的服务器推送就是完全意义上的服务器推送了,没有半点模拟的成分,只是现阶段支持 HTML5 的浏览器并不多,而且一般老版本的各种浏览器基本都不支持。不过 HTML5 是一套非常好的标准,在将来,当 HTML5 流行起来以后将是我们实现服务器推送技术的不二选择。

结束语

这篇文章介绍了 Dojo 中的服务器推送 Cometd 工具包。基于服务器推送的理念,介绍了 Bayeux 协议的核心思想,并结合一个简单示例介绍了服务器推送的基本实现。随后,本着快速建立服务器推送框架的想法,介绍了 Dojo 的 Cometd 工具包,并分别从客户端接口和服务端接口两个方面分别介绍了 Dojo 的服务器推送框架的搭建和实现原理。最后,通过一些简单的示例展示了基于服务端推送的业务逻辑的具体实现。服务端推送技术具有很强的实用性,希望广大读者在开发自己的项目的过程中多关注一下,以尽可能多的完善自己的 Web 应用。

在 IBM Bluemix 云平台上开发并部署您的下一个应用。


 

 var Comet = Class.create();Comet.prototype = {	timestamp: 0,	url: './backend.php',	noerror: true,	initialize: function(){	},	connect: function(){		this.ajax = new Ajax.Request(this.url, {			method: 'get',			parameters: {				'timestamp': this.timestamp			},			onSuccess: function(transport){				var response = transport.responseText.evalJSON();				this.comet.timestamp = response['timestamp'];				this.comet.handleResponse(response);				this.comet.noerror = true;			},			onComplete: function(transport){				if (!this.comet.noerror) setTimeout(function(){					                     comet.connect()				                         }, 5 000);				else 				this.comet.connect();				this.comet.noerror = false;			}		});		this.ajax.comet = this;	},	handleResponse: function(response){		$('content').innerHTML += '
' + response['msg'] + '
'; }, doRequest: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); }}var comet = new Comet();comet.connect();

 $filename  = dirname(__FILE__).'/data.txt';  // 将新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : '';  if ($msg != '')  {   file_put_contents($filename,$msg);   die();  }  // 这是一个无限循环,一旦发现文件被修改,便会跳出循环并返回文件修改数据。如果文件一直没有修改,则会一  // 直处于循环检测状态,此时的 Ajax 连接也会一直保留,直到文件被修改为止,这就是所谓的“长轮询”。 $lastmodif    = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;  $currentmodif = filemtime($filename);  while ($currentmodif <= $lastmodif) // 检测文件是否被修改 {   usleep(10000); // sleep 10ms to unload the CPU   clearstatcache();   $currentmodif = filemtime($filename);  }  // 返回 JSON 数组 $response = array();  $response['msg']       = file_get_contents($filename);  $response['timestamp'] = $currentmodif;  echo json_encode($response);  flush();



 dojox.cometd.init("http://www.xxx.com/cometd");

 this.init = function(...){............	var bindArgs = {		url: this.url,		handleAs: this.handleAs,		content: { "message": dojo.toJson([props]) },		load: dojo.hitch(this,function(msg){			this._backon();			this._finishInit(msg);		}),		error: dojo.hitch(this,function(e){			this._backoff();			this._finishInit(e);		}),		timeout: this.expectedNetworkDelay	};..............	if(this._isXD){		r = dojo.io.script.get(bi ndArgs);	}else{		r = dojo.xhrPost(bindArgs);	}..............}this._finishInit = function(data){..................	if(successful){		........		//ajax request inside		this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,        "tunnelInit");		this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,		"tunnelCollapse");		transport.startup(data);	}else{		if(!this._advice || this._advice["reconnect"] != "none"){			setTimeout(dojo.hitch(this, "init", this.url, this._props),			this._interval());		}	}....................}

 dojox.cometd.publish("/service/echo", { msg: msg });

 // 方法 1  cometd.batch(function()  {     cometd.publish('/channel1', { product: 'foo' });     cometd.publish('/channel2', { notificationType: 'all' });     cometd.publish('/channel3', { update: false });  });  // 方法 2  cometd.startBatch()  cometd.publish('/channel1', { product: 'foo' });  cometd.publish('/channel2', { notifica tionType: 'all' });  cometd.publish('/channel3', { update: false });  cometd.endBatch()

 dojox.cometd.subscribe("/service/echo",echoRpcReturn);  function echoRpcReturn(msg){  dojo.byId("responses").innerHTML += msg;  }


               cometd           org.cometd.server.continuation.ContinuationCometdServlet                        timeout             60000                            cometd         /cometd/*                   cross-origin         org.eclipse.jetty.servlets.CrossOriginFilter                   cross-origin         /cometd/*       

   cometd  org.cometd.java.annotation.AnnotationCometdServlet    logLevel  1      services  org.cometd.examples.ChatService    1      cometd  /cometd/*      cometdDemo  org.cometd.examples.CometdDemoServlet  2< /load-on-startup>  

 public void init() throws ServletException  {  final BayeuxServerImpl bayeux =  (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);  if (bayeux==null)  throw new UnavailableException("No BayeuxServer!");  .................  // 创建扩展点 bayeux.addExtension(new TimesyncExtension());  bayeux.addExtension(new AcknowledgedMessagesExtension());  // 设定握手连接权限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(   GrantAuthorizer.GRANT_PUBLISH);  // 启动服务渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);  processor.process(new EchoRPC());  processor.process(new Monitor());  //processor.process(new ChatService());  bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()  {  public void configureChannel(ConfigurableServerChannel channel)  {  channel.setPersistent(true);  }  });  if (bayeux.getLogger().isDebugEnabled())  System.err.println(bayeux.d ump());  .................  }

 @Service("echo")public static class EchoRPC{	@Session	private ServerSession _session;	@SuppressWarnings("unused")	@Configure("/service/echo")	private void configureEcho(ConfigurableServerChannel channel)	{		channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);	}	@Listener("/service/echo")	public void doEcho(ServerSession session, ServerMessage message)	{		Map data = message.getDataAsMap();		Log.info("ECHO from "+session+" "+data);				for(int i = 0; i < 50; i++){			session.deliver(_session, message.getChannel(), data, null);		}	}}

 @Service("monitor")  public static class Monitor  {  @Listener("/meta/subscribe")  public void monitorSubscribe(ServerSession session, ServerMessage message)  {  Log.info("Monitored Subscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/unsubscribe")  public void monitorUnsubscrib e(ServerSession session, ServerMessage message)  {  Log.info("Monitored Unsubscribe from "+session+" for "           +message.get(Message.SUBSCRIPTION_FIELD));  }  @Listener("/meta/*")  public void monitorMeta(ServerSession session, ServerMessage message)  {  if (Log.isDebugEnabled())  Log.debug(message.toString());  }  }

   ...............    services  org.cometd.examples.ChatService    1  

 @Service("chat")  public class ChatService  {  ..........................................     @Listener("/service/members")     public void handleMembership(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         final String room = ((String)data.get("room")).substring("/chat/".length());         Map roomMembers = _memb ers.get(room);         if (roomMembers == null)         {             Map new_room = new ConcurrentHashMap();             roomMembers = _members.putIfAbsent(room, new_room);             if (roomMembers == null) roomMembers = new_room;         }         final Map members = roomMembers;         String userName = (String)data.get("user");         members.put(userName, client.getId());         client.addListener(new ServerSession.RemoveListener()         {             public void removed(ServerSession session, boolean timeout)             {                 members.values().remove(session.getId());                 broadcastMembers(room,members.keySet());             }         });         broadcastMembers(room,members.keySet());     }     private void broadcastMembers(String room, Set members)     {         // Broadcast the new members list         ClientSessionChannel channel =                  _session.getLocalSession().getChannel("/m embers/"+room);         channel.publish(members);     }  ..........................................     @Listener("/service/privatechat")     protected void privateChat(ServerSession client, ServerMessage message)     {         Map data = message.getDataAsMap();         String room = ((String)data.get("room")).substring("/chat/".length());         Map membersMap = _members.get(room);         if (membersMap==null)         {             Mapnew_room=new ConcurrentHashMap();             membersMap=_members.putIfAbsent(room,new_room);             if (membersMap==null)                 membersMap=new_room;         }         String[] peerNames = ((String)data.get("peer")).split(",");         ArrayList peers = new ArrayList(peerNames.length);  .................     }  }