Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members  

SimplTransport.cc

Go to the documentation of this file.
00001 /**************************************************************************
00002  Copyright (c) 2000-2001, Tony Garnock-Jones
00003  All rights reserved.
00004 
00005  Redistribution and use in source and binary forms, with or without
00006  modification, are permitted provided that the following conditions are
00007  met:
00008 
00009      * Redistributions of source code must retain the above copyright
00010        notice, this list of conditions and the following disclaimer.
00011 
00012      * Redistributions in binary form must reproduce the above
00013        copyright notice, this list of conditions and the following
00014        disclaimer in the documentation and/or other materials provided
00015        with the distribution.
00016 
00017      * Neither the names of the copyright holders nor the names of this
00018        software's contributors may be used to endorse or promote
00019        products derived from this software without specific prior
00020        written permission.
00021 
00022  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00025  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
00026  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00027  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00028  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00029  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00030  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00031  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00032  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00033 **************************************************************************/
00034 
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038 
00039 #include <string>
00040 #include <deque>
00041 
00042 #include <mps/mps.h>
00043 #include <mps/transport_simpl.h>
00044 
00045 extern "C" {
00046 #include <simpl.h>
00047 }
00048 
00049 #define MPS_MESSAGE_MAXLEN      8192    ///< arbitrary - passed in to simpl
00050 
00051 namespace MPS {
00052 
00053 SimplTransport *SimplTransport::instance = 0;
00054 
00055 ///////////////////////////////////////////////////////////////////////////
00056 
00057 static string toString(int x) {
00058   char buf[20];
00059   sprintf(buf, "%d", x);
00060   return buf;
00061 }
00062 
00063 ///////////////////////////////////////////////////////////////////////////
00064 
00065 class SimplTransport::SimplIOStream: public MPS::InputStream, public MPS::OutputStream {
00066 private:
00067   bool useReply;                        ///< Determines if Reply() or Send() will be used to send
00068   FCID other;                           ///< Handle on our SIMPL interlocutor
00069   char msg[MPS_MESSAGE_MAXLEN];         ///< Message buffer space. Note yucky fixed size!
00070   int pos;                              ///< Current read-position
00071   int msglen;                           ///< Number of valid bytes in msg
00072 
00073 protected:
00074   virtual string referenceTransport() const { return "simpl"; }
00075 
00076 public:
00077   /**
00078    * Construct a SimplIOStream which will communicate with the
00079    * nominated target, using Reply() or Send() as per the passed-in
00080    * boolean.
00081    *
00082    * @param target the SIMPL process we are to communicate with
00083    * @param _u initial setting of useReply */
00084   SimplIOStream(FCID const &target, bool _u)
00085     : useReply(_u),
00086       other(target),
00087       pos(0),
00088       msglen(0)
00089   {}
00090 
00091   /**
00092    * Wait for a message to arrive from some other SIMPL process, and
00093    * when it does, construct a SimplIOStream around it. Automatically
00094    * sets useReply to true, given the nature of the task. */
00095   SimplIOStream()
00096     : useReply(true),
00097       pos(0)
00098   {
00099     other = * (FCID *) SimplTransport::popMessage(msg, msglen);
00100 
00101     if (other.slot == -1)
00102       throw MPS::MPSException("SIMPL Receive failed");
00103   }
00104 
00105   /// Returns the handle on the SIMPL process at the other end of this link.
00106   FCID getOther() const { return other; }
00107 
00108   /// Reads a byte from the message this SimplIOStream has read from our interlocutor
00109   virtual int read() {
00110     if (pos >= msglen) {
00111       throw MPS::MPSConnectionClosedException("reading from a SIMPL message");
00112     }
00113 
00114     return msg[pos++];
00115   }
00116 
00117   /// Sends our outbound message to our interlocutor
00118   virtual void flush() {
00119     if (useReply) {
00120       if (Reply(&other, (void *) getBody(), getLength()) == -1)
00121         throw MPS::MPSException("SIMPL Reply failed");
00122     } else {
00123       int replylen = Send(&other, const_cast<char *>(getBody()), msg, getLength(), sizeof(msg));
00124 
00125       if (replylen == -1)
00126         throw MPS::MPSException("SIMPL Send failed");
00127       if (replylen > 0)
00128         SimplTransport::pushMessage(&other, msg, replylen);
00129     }
00130 
00131     MPS::OutputStream::flush();
00132   }
00133 };
00134 
00135 ///////////////////////////////////////////////////////////////////////////
00136 
00137 class SimplTransport::SimplConnection: public MPS::Connection {
00138 private:
00139   int oid;                      ///< OID this connection is connected to
00140   FCID other;                   ///< SIMPL process that holds the object we are connected to
00141 
00142 public:
00143   /**
00144    * Constructs a connection to the given OID at the named SIMPL process.
00145    *
00146    * @param _oid the OID to connect to
00147    * @param _other the SIMPL process to connect to
00148    * @param a the MPS::Address to pass on to our superclass, MPS::Connection
00149    */
00150   SimplConnection(int _oid, FCID const &_other, MPS::Address const &a)
00151     : MPS::Connection(a),
00152            oid(_oid),
00153            other(_other)
00154   {}
00155 
00156   /**
00157    * Returns a writeable-stream that will send a message to the object
00158    * we are connected to */
00159   virtual MPS::OutputStream *getOutputStream() {
00160     MPS::OutputStream *stream = new SimplIOStream(other, false);
00161     stream->writeint(oid);
00162     return stream;
00163   }
00164 
00165   /**
00166    * Returns a readable-stream that returns bytes from a message we
00167    * received from the object we are connected to */
00168   virtual MPS::InputStream *getInputStream() {
00169     MPS::InputStream *stream = new SimplIOStream();
00170     int inputOid = stream->readint();
00171 
00172     if (inputOid != oid) {
00173       throw MPS::MPSException("OID mismatch in SimplTransport::SimplConnection, got " +
00174                               toString(inputOid) + " expected " + toString(oid));
00175     }
00176 
00177     return stream;
00178   }
00179 
00180   virtual void releaseOutputStream(MPS::OutputStream *stream) { delete stream; }
00181   virtual void releaseInputStream(MPS::InputStream *stream) { delete stream; }
00182 };
00183 
00184 ///////////////////////////////////////////////////////////////////////////
00185 
00186 SimplTransport::SimplTransport(string const &_simplName)
00187   : Transport("simpl"),
00188     nextOid(1),
00189     simplName(_simplName),
00190     serverMap()
00191 {
00192   if (name_attach(const_cast<char *>(simplName.c_str()), MPS_MESSAGE_MAXLEN, 0) == -1)
00193     throw MPSException("Could not attach to SIMPL name " + simplName);
00194 }
00195 
00196 ///////////////////////////////////////////////////////////////////////////
00197 
00198 ref<Connection> SimplTransport::connectTo(Address const &connectionSpec)
00199 {
00200   if (connectionSpec.getParamCount() != 2)
00201     throw MPS::MPSException("Simpl transport requires 2 Address parameters");
00202 
00203   string remoteName = connectionSpec.getParam(0);
00204   int oid = atoi(connectionSpec.getParam(1).c_str());
00205 
00206   FCID other = name_locate(const_cast<char *>(remoteName.c_str()));
00207   if (other.slot == -1)
00208     throw MPS::MPSException("Simpl name_locate of " + remoteName + " failed");
00209 
00210   return new SimplConnection(oid, other, connectionSpec);
00211 }
00212 
00213 string SimplTransport::registerServer(Server *server, Address const &spec)
00214 {
00215   if (spec.getParamCount() > 0) {
00216     throw MPS::MPSException("Simpl transport does not allow self-selection of simplName or OID");
00217   }
00218 
00219   int thisOid = nextOid++;
00220 
00221   serverMap[thisOid] = server;
00222 
00223   Address newSpec("mps:simpl");
00224   newSpec.setParam(0, instance->simplName);
00225   newSpec.setParam(1, toString(thisOid));
00226 
00227   return newSpec.getResolvedName();
00228 }
00229 
00230 string SimplTransport::deregisterServer(Server *server, Address const &spec) {
00231   // how embarrassing.
00232   throw MPSException("SimplTransport::deregisterServer not yet implemented");
00233 }
00234 
00235 ///////////////////////////////////////////////////////////////////////////
00236 
00237 struct QueuedMessage_t {
00238   FCID sender;
00239   char *msg;
00240   int msglen;
00241 
00242   QueuedMessage_t()
00243     : msg(0),
00244       msglen(0)
00245   {}
00246 
00247   QueuedMessage_t(FCID const &s, char const *m, int len)
00248     : sender(s),
00249       msg(new char[len]),
00250       msglen(len)
00251   {
00252     memcpy(msg, m, len);
00253   }
00254 
00255   ~QueuedMessage_t() {
00256     clean();
00257   }
00258 
00259 private:
00260   QueuedMessage_t(QueuedMessage_t const &other);
00261   QueuedMessage_t const &operator=(QueuedMessage_t const &other);
00262 
00263   void clean() {
00264     if (msg) {
00265       delete [] msg;
00266       msg = 0;
00267     }
00268   }
00269 };
00270 
00271 static deque<QueuedMessage_t *> msgQueue;
00272 
00273 void SimplTransport::pushMessage(void *sender, char const *msg, int msglen) {
00274   msgQueue.push_back(new QueuedMessage_t(* (FCID *) sender, msg, msglen));
00275 }
00276 
00277 void *SimplTransport::popMessage(char *msg, int &msglen) {
00278   static FCID sender;
00279 
00280   if (msgQueue.empty()) {
00281     sender = Receive(msg, &msglen);
00282   } else {
00283     QueuedMessage_t *qm = msgQueue.front();
00284     msgQueue.pop_front();
00285 
00286     msglen = qm->msglen;
00287     memcpy(msg, qm->msg, msglen);
00288     sender = qm->sender;
00289 
00290     delete qm;
00291   }
00292 
00293   return &sender;
00294 }
00295 
00296 void SimplTransport::mainloop() {
00297   while (1) {
00298     SimplIOStream input;
00299     int oid = input.readint();
00300 
00301     serverMap_t::iterator i = instance->serverMap.find(oid);
00302     if (i == instance->serverMap.end()) {
00303       throw MPSException("Unknown OID " + toString(oid) + " received");
00304     }
00305 
00306     Server *server = (*i).second;
00307     FCID sender = input.getOther();
00308 
00309     SimplIOStream output(sender, true);
00310     output.writeint(oid);
00311     int pos = output.getLength();
00312 
00313     server->dispatch(input, output);
00314 
00315     if (output.getLength() > pos) {
00316       output.flush();
00317     } else {
00318       if (Reply(&sender, 0, 0) == -1)
00319         throw MPS::MPSException("SIMPL Reply failed in mainloop");
00320     }
00321   }
00322 }
00323 
00324 }

Generated at Wed Aug 15 01:05:16 2001 for mps-cpp by doxygen1.2.6 written by Dimitri van Heesch, © 1997-2001