Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members  

InetTransport.cc

Go to the documentation of this file.
00001 /**************************************************************************
00002  Copyright (c) 2000-2001, Tony Garnock-Jones
00003  All rights reserved.
00004 
00005  Redistribution and use in source and binary forms, with or without
00006  modification, are permitted provided that the following conditions are
00007  met:
00008 
00009      * Redistributions of source code must retain the above copyright
00010        notice, this list of conditions and the following disclaimer.
00011 
00012      * Redistributions in binary form must reproduce the above
00013        copyright notice, this list of conditions and the following
00014        disclaimer in the documentation and/or other materials provided
00015        with the distribution.
00016 
00017      * Neither the names of the copyright holders nor the names of this
00018        software's contributors may be used to endorse or promote
00019        products derived from this software without specific prior
00020        written permission.
00021 
00022  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00025  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
00026  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00027  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00028  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00029  PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00030  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00031  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00032  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00033 **************************************************************************/
00034 
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038 
00039 #include <string>
00040 
00041 #include <mps/mps.h>
00042 #include <mps/transport_inet.h>
00043 
00044 #include <FileDescriptor.h>
00045 #include <SocketDescriptor.h>
00046 
00047 namespace MPS {
00048 
00049 InetTransport *InetTransport::instance = 0;
00050 
00051 ///////////////////////////////////////////////////////////////////////////
00052 
00053 static string toString(int x) {
00054   char buf[20];
00055   sprintf(buf, "%d", x);
00056   return buf;
00057 }
00058 
00059 ///////////////////////////////////////////////////////////////////////////
00060 
00061 class InetTransport::InetIOStream: public MPS::InputStream, public MPS::OutputStream {
00062 private:
00063   ref<FileDescriptor> fd;
00064 
00065 protected:
00066   virtual string referenceTransport() const;
00067 
00068 public:
00069   InetIOStream(ref<FileDescriptor> const &_fd)
00070     : fd(_fd)
00071   {}
00072 
00073   virtual int read() {
00074     int ch;
00075     try {
00076       while (!fd->read(ch))
00077         fd->block(true, false);
00078     } catch (FileDescriptor::Exception &fe) {
00079       throw MPS::MPSConnectionClosedException(fe.what());
00080     }
00081     return ch;
00082   }
00083 
00084   virtual void flush() {
00085     try {
00086       fd->write(getBody(), getLength(), true);
00087     } catch (FileDescriptor::Exception &fe) {
00088       throw MPS::MPSConnectionClosedException(fe.what());
00089     }
00090     MPS::OutputStream::flush();
00091   }
00092 };
00093 
00094 string InetTransport::InetIOStream::referenceTransport() const {
00095   return "inet";
00096 }
00097 
00098 ///////////////////////////////////////////////////////////////////////////
00099 
00100 class InetTransport::InetConnection: public MPS::Connection {
00101 private:
00102   int oid;
00103   ref<FileDescriptor> fd;
00104 
00105   void ensureConnection() {
00106     if (!fd.valid() || !fd->isOpen()) {
00107       throw MPS::MPSConnectionClosedException("InetConnection has no active fd");
00108     }
00109   }
00110 
00111 public:
00112   InetConnection(int _oid, MPS::Address const &a)
00113     : MPS::Connection(a),
00114            oid(_oid),
00115            fd(0)
00116   {}
00117 
00118   virtual MPS::OutputStream *getOutputStream() {
00119     ensureConnection();
00120 
00121     MPS::OutputStream *stream = new InetIOStream(fd);
00122     stream->writeint(oid);
00123     return stream;
00124   }
00125 
00126   virtual MPS::InputStream *getInputStream() {
00127     ensureConnection();
00128 
00129     MPS::InputStream *stream = new InetIOStream(fd);
00130     int inputOid = stream->readint();
00131 
00132     if (inputOid != oid) {
00133       throw MPS::MPSException("OID mismatch in InetTransport::InetConnection, got " +
00134                               toString(inputOid) + " expected " + toString(oid));
00135     }
00136 
00137     return stream;
00138   }
00139 
00140   virtual void releaseOutputStream(MPS::OutputStream *stream) { delete stream; }
00141   virtual void releaseInputStream(MPS::InputStream *stream) { delete stream; }
00142 
00143   void setActiveFd(ref<FileDescriptor> const &_fd) {
00144     fd = _fd;
00145   }
00146 };
00147 
00148 ///////////////////////////////////////////////////////////////////////////
00149 
00150 class InetTransport::InetDemux: public FileDescriptor::Callback {
00151 public:
00152   typedef map< int, ref<InetConnection> > connectionMap_t;      ///< Map of OID -> Connection
00153 
00154 private:
00155   connectionMap_t connectionMap;
00156   string canonicalAddress;
00157   ref<ClientSocketDescriptor> clientSock;
00158 
00159   void processEvent(ref<FileDescriptor> const &desc) {
00160     MPS::InputStream *input = 0;
00161 
00162     try {
00163 
00164       try {
00165 
00166         while (desc->inputReady()) {
00167           input = new InetIOStream(desc);
00168 
00169           int targetOid = input->readint();
00170 
00171           connectionMap_t::iterator i = connectionMap.find(targetOid);
00172           if (i == connectionMap.end()) {
00173             throw MPS::MPSException("Invalid OID in processRequest: " + toString(targetOid));
00174           } else {
00175             ref<InetConnection> &conn((*i).second);
00176 
00177             conn->setActiveFd(desc);
00178             conn->fireCallback(*input);
00179           }
00180 
00181           delete input;
00182           input = 0;
00183         }
00184 
00185       } catch (FileDescriptor::Exception &fe) {
00186         //      fprintf(stderr, "InetDemux::processEvent: %s (%d: %s)\n",
00187         //              fe.what(), fe.getErrno(), strerror(fe.getErrno()));
00188         throw;
00189       } catch (MPS::MPSException &mpse) {
00190         //      fprintf(stderr, "InetDemux::processEvent: %s\n", mpse.getMessage().c_str());
00191         throw;
00192       }
00193 
00194     } catch (...) {
00195       if (input)
00196         delete input;
00197 
00198       desc->close();
00199     }
00200   }
00201 
00202   void acceptConnection(ref<FileDescriptor> const &desc) {
00203     try {
00204       ref<ServerSocketDescriptor> server = desc.downcast<ServerSocketDescriptor>();
00205       registerFd(server->accept());
00206     } catch (InvalidDowncastException &ide) {
00207       throw MPS::MPSException("WARNING: InetDemux::acceptConnection was given "
00208                               "a non-server socket");
00209     }
00210   }
00211 
00212   void registerFd(ref<FileDescriptor> const &fd) {
00213     fd->setNonBlocking(true);
00214     fd->setReadCallback(this,
00215                         (FileDescriptor::Callback::Method) &InetDemux::processEvent);
00216   }
00217 
00218 public:
00219   /// Client demux
00220   InetDemux(ref<ClientSocketDescriptor> const &_clientSock)
00221     : connectionMap(),
00222       canonicalAddress(),
00223       clientSock(_clientSock)
00224   {
00225 //      fprintf(stderr, "New client connection: %s:%d ==> %s:%d\n",
00226 //          clientSock->getLocalHostname().c_str(), clientSock->getLocalPort(),
00227 //          clientSock->getRemoteHostname().c_str(), clientSock->getRemotePort());
00228     registerFd(clientSock);
00229   }
00230 
00231   /// Server demux
00232   InetDemux(ref<ServerSocketDescriptor> const &serverSock)
00233     : connectionMap(),
00234       canonicalAddress(string("mps:inet:") + serverSock->getCanonicalAddress()),
00235       clientSock(0)
00236   {
00237 //      fprintf(stderr, "New server connection: %s:%d ==> %s:%d\n",
00238 //          serverSock->getLocalHostname().c_str(), serverSock->getLocalPort(),
00239 //          serverSock->getRemoteHostname().c_str(), serverSock->getRemotePort());
00240     serverSock->setReadCallback(this,
00241                                 (FileDescriptor::Callback::Method) &InetDemux::acceptConnection);
00242   }
00243 
00244   string const &getAddress() const { return canonicalAddress; }
00245 
00246   ref<InetConnection> connectionForOid(int oid, Address const &address) {
00247     ref<InetConnection> result;
00248 
00249     connectionMap_t::iterator i = connectionMap.find(oid);
00250     if (i == connectionMap.end()) {
00251       result = new InetConnection(oid, address);
00252       connectionMap[oid] = result;
00253 
00254       if (clientSock.valid())
00255         result->setActiveFd(clientSock);
00256 
00257     } else {
00258       result = (*i).second;
00259     }
00260 
00261     return result;
00262   }
00263 };
00264 
00265 ///////////////////////////////////////////////////////////////////////////
00266 
00267 class InetTransport::InetDispatcher: public Connection::Callback {
00268 private:
00269   ref<InetConnection> connection;
00270   MPS::Server *server;
00271 
00272   void setupCallback() {
00273     connection->setCallback(this,
00274                             (Connection::Callback::Method) &InetDispatcher::dispatchRequest);
00275   }
00276 
00277 public:
00278   InetDispatcher(ref<InetConnection> const &_connection,
00279                  MPS::Server *_server)
00280     : connection(_connection),
00281       server(_server)
00282   {
00283     setupCallback();
00284   }
00285 
00286   void dispatchRequest(MPS::InputStream &input) {
00287     setupCallback();
00288 
00289     MPS::OutputStream *output = connection->getOutputStream();
00290     int pos = output->getLength();
00291     server->dispatch(input, *output);
00292     if (output->getLength() > pos) {
00293       // Only flush if the dispatch ended up writing anything.
00294       // Otherwise we end up sending a header with no body :-)
00295       output->flush();
00296     }
00297     connection->releaseOutputStream(output);
00298   }
00299 };
00300 
00301 ///////////////////////////////////////////////////////////////////////////
00302 
00303 InetTransport::InetTransport(string const &hostName,
00304                              int portNumber)
00305   : Transport("inet"),
00306     fdMgr(new FileDescriptorManager()),
00307     clientMap(),
00308     serverMap(),
00309     defaultDemux(registerServerSocket(new ServerSocketDescriptor(fdMgr, portNumber, hostName))),
00310     nextOid(1)
00311 {}
00312 
00313 ref<InetTransport::InetDemux>
00314 InetTransport::registerServerSocket(ref<ServerSocketDescriptor> const &serverSock)
00315 {
00316   ref<InetDemux> demux = new InetDemux(serverSock);
00317   serverMap[serverSock->getCanonicalAddress()] = demux;
00318   return demux;
00319 }
00320 
00321 ///////////////////////////////////////////////////////////////////////////
00322 
00323 ref<Connection> InetTransport::connectTo(Address const &connectionSpec)
00324 {
00325   if (connectionSpec.getParamCount() != 3)
00326     throw MPS::MPSException("Inet transport requires 3 Address parameters");
00327 
00328   string hostName = connectionSpec.getParam(0);
00329   int portNumber = atoi(connectionSpec.getParam(1).c_str());
00330   string address = hostName + ":" + connectionSpec.getParam(1);
00331 
00332   ref<InetDemux> demux;
00333 
00334   demuxMap_t::iterator i = clientMap.find(address);
00335   if (i == clientMap.end()) {
00336     try {
00337       demux = new InetDemux(new ClientSocketDescriptor(fdMgr, hostName, portNumber));
00338       clientMap[address] = demux;
00339     } catch (FileDescriptor::Exception &fe) {
00340       throw MPS::MPSException(fe.what());
00341     }
00342   } else {
00343     demux = (*i).second;
00344   }
00345 
00346   return demux->connectionForOid(atoi(connectionSpec.getParam(2).c_str()), connectionSpec);
00347 }
00348 
00349 string InetTransport::registerServer(Server *server, Address const &spec)
00350 {
00351   ref<InetDemux> demux;
00352 
00353   if (spec.getParamCount() > 0) {
00354     string hostName = spec.getParam(0);
00355     int portNumber = atoi(spec.getParam(1).c_str());
00356     string address = hostName + ":" + spec.getParam(1);
00357 
00358     demuxMap_t::iterator i = serverMap.find(address);
00359     if (i == serverMap.end()) {
00360       demux = registerServerSocket(new ServerSocketDescriptor(fdMgr, portNumber, hostName));
00361     } else {
00362       demux = (*i).second;
00363     }
00364   } else {
00365     demux = defaultDemux;
00366     if (!demux.valid()) {
00367       demux = registerServerSocket(new ServerSocketDescriptor(fdMgr));
00368     }
00369   }
00370 
00371   if (!defaultDemux.valid()) {
00372     defaultDemux = demux;
00373   }
00374 
00375   int oid = nextOid++;
00376 
00377   Address newSpec(demux->getAddress());
00378   newSpec.setParam(2, toString(oid));
00379 
00380   ref<InetConnection> conn = demux->connectionForOid(oid, newSpec);
00381   conn->setCallback(new InetDispatcher(conn, server),
00382                     (Connection::Callback::Method) &InetDispatcher::dispatchRequest);
00383 
00384   return newSpec.getResolvedName();
00385 }
00386 
00387 string InetTransport::deregisterServer(Server *server, Address const &spec) {
00388   throw MPSException("InetTransport::deregisterServer not yet implemented"); // how embarrassing.
00389 }
00390 
00391 }

Generated at Wed Aug 15 01:05:15 2001 for mps-cpp by doxygen1.2.6 written by Dimitri van Heesch, © 1997-2001