00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038
00039 #include <string>
00040
00041 #include <mps/mps.h>
00042 #include <mps/transport_inet.h>
00043
00044 #include <FileDescriptor.h>
00045 #include <SocketDescriptor.h>
00046
00047 namespace MPS {
00048
00049 InetTransport *InetTransport::instance = 0;
00050
00051
00052
00053 static string toString(int x) {
00054 char buf[20];
00055 sprintf(buf, "%d", x);
00056 return buf;
00057 }
00058
00059
00060
00061 class InetTransport::InetIOStream: public MPS::InputStream, public MPS::OutputStream {
00062 private:
00063 ref<FileDescriptor> fd;
00064
00065 protected:
00066 virtual string referenceTransport() const;
00067
00068 public:
00069 InetIOStream(ref<FileDescriptor> const &_fd)
00070 : fd(_fd)
00071 {}
00072
00073 virtual int read() {
00074 int ch;
00075 try {
00076 while (!fd->read(ch))
00077 fd->block(true, false);
00078 } catch (FileDescriptor::Exception &fe) {
00079 throw MPS::MPSConnectionClosedException(fe.what());
00080 }
00081 return ch;
00082 }
00083
00084 virtual void flush() {
00085 try {
00086 fd->write(getBody(), getLength(), true);
00087 } catch (FileDescriptor::Exception &fe) {
00088 throw MPS::MPSConnectionClosedException(fe.what());
00089 }
00090 MPS::OutputStream::flush();
00091 }
00092 };
00093
00094 string InetTransport::InetIOStream::referenceTransport() const {
00095 return "inet";
00096 }
00097
00098
00099
00100 class InetTransport::InetConnection: public MPS::Connection {
00101 private:
00102 int oid;
00103 ref<FileDescriptor> fd;
00104
00105 void ensureConnection() {
00106 if (!fd.valid() || !fd->isOpen()) {
00107 throw MPS::MPSConnectionClosedException("InetConnection has no active fd");
00108 }
00109 }
00110
00111 public:
00112 InetConnection(int _oid, MPS::Address const &a)
00113 : MPS::Connection(a),
00114 oid(_oid),
00115 fd(0)
00116 {}
00117
00118 virtual MPS::OutputStream *getOutputStream() {
00119 ensureConnection();
00120
00121 MPS::OutputStream *stream = new InetIOStream(fd);
00122 stream->writeint(oid);
00123 return stream;
00124 }
00125
00126 virtual MPS::InputStream *getInputStream() {
00127 ensureConnection();
00128
00129 MPS::InputStream *stream = new InetIOStream(fd);
00130 int inputOid = stream->readint();
00131
00132 if (inputOid != oid) {
00133 throw MPS::MPSException("OID mismatch in InetTransport::InetConnection, got " +
00134 toString(inputOid) + " expected " + toString(oid));
00135 }
00136
00137 return stream;
00138 }
00139
00140 virtual void releaseOutputStream(MPS::OutputStream *stream) { delete stream; }
00141 virtual void releaseInputStream(MPS::InputStream *stream) { delete stream; }
00142
00143 void setActiveFd(ref<FileDescriptor> const &_fd) {
00144 fd = _fd;
00145 }
00146 };
00147
00148
00149
00150 class InetTransport::InetDemux: public FileDescriptor::Callback {
00151 public:
00152 typedef map< int, ref<InetConnection> > connectionMap_t;
00153
00154 private:
00155 connectionMap_t connectionMap;
00156 string canonicalAddress;
00157 ref<ClientSocketDescriptor> clientSock;
00158
00159 void processEvent(ref<FileDescriptor> const &desc) {
00160 MPS::InputStream *input = 0;
00161
00162 try {
00163
00164 try {
00165
00166 while (desc->inputReady()) {
00167 input = new InetIOStream(desc);
00168
00169 int targetOid = input->readint();
00170
00171 connectionMap_t::iterator i = connectionMap.find(targetOid);
00172 if (i == connectionMap.end()) {
00173 throw MPS::MPSException("Invalid OID in processRequest: " + toString(targetOid));
00174 } else {
00175 ref<InetConnection> &conn((*i).second);
00176
00177 conn->setActiveFd(desc);
00178 conn->fireCallback(*input);
00179 }
00180
00181 delete input;
00182 input = 0;
00183 }
00184
00185 } catch (FileDescriptor::Exception &fe) {
00186
00187
00188 throw;
00189 } catch (MPS::MPSException &mpse) {
00190
00191 throw;
00192 }
00193
00194 } catch (...) {
00195 if (input)
00196 delete input;
00197
00198 desc->close();
00199 }
00200 }
00201
00202 void acceptConnection(ref<FileDescriptor> const &desc) {
00203 try {
00204 ref<ServerSocketDescriptor> server = desc.downcast<ServerSocketDescriptor>();
00205 registerFd(server->accept());
00206 } catch (InvalidDowncastException &ide) {
00207 throw MPS::MPSException("WARNING: InetDemux::acceptConnection was given "
00208 "a non-server socket");
00209 }
00210 }
00211
00212 void registerFd(ref<FileDescriptor> const &fd) {
00213 fd->setNonBlocking(true);
00214 fd->setReadCallback(this,
00215 (FileDescriptor::Callback::Method) &InetDemux::processEvent);
00216 }
00217
00218 public:
00219
00220 InetDemux(ref<ClientSocketDescriptor> const &_clientSock)
00221 : connectionMap(),
00222 canonicalAddress(),
00223 clientSock(_clientSock)
00224 {
00225
00226
00227
00228 registerFd(clientSock);
00229 }
00230
00231
00232 InetDemux(ref<ServerSocketDescriptor> const &serverSock)
00233 : connectionMap(),
00234 canonicalAddress(string("mps:inet:") + serverSock->getCanonicalAddress()),
00235 clientSock(0)
00236 {
00237
00238
00239
00240 serverSock->setReadCallback(this,
00241 (FileDescriptor::Callback::Method) &InetDemux::acceptConnection);
00242 }
00243
00244 string const &getAddress() const { return canonicalAddress; }
00245
00246 ref<InetConnection> connectionForOid(int oid, Address const &address) {
00247 ref<InetConnection> result;
00248
00249 connectionMap_t::iterator i = connectionMap.find(oid);
00250 if (i == connectionMap.end()) {
00251 result = new InetConnection(oid, address);
00252 connectionMap[oid] = result;
00253
00254 if (clientSock.valid())
00255 result->setActiveFd(clientSock);
00256
00257 } else {
00258 result = (*i).second;
00259 }
00260
00261 return result;
00262 }
00263 };
00264
00265
00266
00267 class InetTransport::InetDispatcher: public Connection::Callback {
00268 private:
00269 ref<InetConnection> connection;
00270 MPS::Server *server;
00271
00272 void setupCallback() {
00273 connection->setCallback(this,
00274 (Connection::Callback::Method) &InetDispatcher::dispatchRequest);
00275 }
00276
00277 public:
00278 InetDispatcher(ref<InetConnection> const &_connection,
00279 MPS::Server *_server)
00280 : connection(_connection),
00281 server(_server)
00282 {
00283 setupCallback();
00284 }
00285
00286 void dispatchRequest(MPS::InputStream &input) {
00287 setupCallback();
00288
00289 MPS::OutputStream *output = connection->getOutputStream();
00290 int pos = output->getLength();
00291 server->dispatch(input, *output);
00292 if (output->getLength() > pos) {
00293
00294
00295 output->flush();
00296 }
00297 connection->releaseOutputStream(output);
00298 }
00299 };
00300
00301
00302
00303 InetTransport::InetTransport(string const &hostName,
00304 int portNumber)
00305 : Transport("inet"),
00306 fdMgr(new FileDescriptorManager()),
00307 clientMap(),
00308 serverMap(),
00309 defaultDemux(registerServerSocket(new ServerSocketDescriptor(fdMgr, portNumber, hostName))),
00310 nextOid(1)
00311 {}
00312
00313 ref<InetTransport::InetDemux>
00314 InetTransport::registerServerSocket(ref<ServerSocketDescriptor> const &serverSock)
00315 {
00316 ref<InetDemux> demux = new InetDemux(serverSock);
00317 serverMap[serverSock->getCanonicalAddress()] = demux;
00318 return demux;
00319 }
00320
00321
00322
00323 ref<Connection> InetTransport::connectTo(Address const &connectionSpec)
00324 {
00325 if (connectionSpec.getParamCount() != 3)
00326 throw MPS::MPSException("Inet transport requires 3 Address parameters");
00327
00328 string hostName = connectionSpec.getParam(0);
00329 int portNumber = atoi(connectionSpec.getParam(1).c_str());
00330 string address = hostName + ":" + connectionSpec.getParam(1);
00331
00332 ref<InetDemux> demux;
00333
00334 demuxMap_t::iterator i = clientMap.find(address);
00335 if (i == clientMap.end()) {
00336 try {
00337 demux = new InetDemux(new ClientSocketDescriptor(fdMgr, hostName, portNumber));
00338 clientMap[address] = demux;
00339 } catch (FileDescriptor::Exception &fe) {
00340 throw MPS::MPSException(fe.what());
00341 }
00342 } else {
00343 demux = (*i).second;
00344 }
00345
00346 return demux->connectionForOid(atoi(connectionSpec.getParam(2).c_str()), connectionSpec);
00347 }
00348
00349 string InetTransport::registerServer(Server *server, Address const &spec)
00350 {
00351 ref<InetDemux> demux;
00352
00353 if (spec.getParamCount() > 0) {
00354 string hostName = spec.getParam(0);
00355 int portNumber = atoi(spec.getParam(1).c_str());
00356 string address = hostName + ":" + spec.getParam(1);
00357
00358 demuxMap_t::iterator i = serverMap.find(address);
00359 if (i == serverMap.end()) {
00360 demux = registerServerSocket(new ServerSocketDescriptor(fdMgr, portNumber, hostName));
00361 } else {
00362 demux = (*i).second;
00363 }
00364 } else {
00365 demux = defaultDemux;
00366 if (!demux.valid()) {
00367 demux = registerServerSocket(new ServerSocketDescriptor(fdMgr));
00368 }
00369 }
00370
00371 if (!defaultDemux.valid()) {
00372 defaultDemux = demux;
00373 }
00374
00375 int oid = nextOid++;
00376
00377 Address newSpec(demux->getAddress());
00378 newSpec.setParam(2, toString(oid));
00379
00380 ref<InetConnection> conn = demux->connectionForOid(oid, newSpec);
00381 conn->setCallback(new InetDispatcher(conn, server),
00382 (Connection::Callback::Method) &InetDispatcher::dispatchRequest);
00383
00384 return newSpec.getResolvedName();
00385 }
00386
00387 string InetTransport::deregisterServer(Server *server, Address const &spec) {
00388 throw MPSException("InetTransport::deregisterServer not yet implemented");
00389 }
00390
00391 }