00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038
00039 #include <string>
00040 #include <deque>
00041
00042 #include <mps/mps.h>
00043 #include <mps/transport_simpl.h>
00044
00045 extern "C" {
00046 #include <simpl.h>
00047 }
00048
00049 #define MPS_MESSAGE_MAXLEN 8192
00050
00051 namespace MPS {
00052
00053 SimplTransport *SimplTransport::instance = 0;
00054
00055
00056
00057 static string toString(int x) {
00058 char buf[20];
00059 sprintf(buf, "%d", x);
00060 return buf;
00061 }
00062
00063
00064
00065 class SimplTransport::SimplIOStream: public MPS::InputStream, public MPS::OutputStream {
00066 private:
00067 bool useReply;
00068 FCID other;
00069 char msg[MPS_MESSAGE_MAXLEN];
00070 int pos;
00071 int msglen;
00072
00073 protected:
00074 virtual string referenceTransport() const { return "simpl"; }
00075
00076 public:
00077
00078
00079
00080
00081
00082
00083
00084 SimplIOStream(FCID const &target, bool _u)
00085 : useReply(_u),
00086 other(target),
00087 pos(0),
00088 msglen(0)
00089 {}
00090
00091
00092
00093
00094
00095 SimplIOStream()
00096 : useReply(true),
00097 pos(0)
00098 {
00099 other = * (FCID *) SimplTransport::popMessage(msg, msglen);
00100
00101 if (other.slot == -1)
00102 throw MPS::MPSException("SIMPL Receive failed");
00103 }
00104
00105
00106 FCID getOther() const { return other; }
00107
00108
00109 virtual int read() {
00110 if (pos >= msglen) {
00111 throw MPS::MPSConnectionClosedException("reading from a SIMPL message");
00112 }
00113
00114 return msg[pos++];
00115 }
00116
00117
00118 virtual void flush() {
00119 if (useReply) {
00120 if (Reply(&other, (void *) getBody(), getLength()) == -1)
00121 throw MPS::MPSException("SIMPL Reply failed");
00122 } else {
00123 int replylen = Send(&other, const_cast<char *>(getBody()), msg, getLength(), sizeof(msg));
00124
00125 if (replylen == -1)
00126 throw MPS::MPSException("SIMPL Send failed");
00127 if (replylen > 0)
00128 SimplTransport::pushMessage(&other, msg, replylen);
00129 }
00130
00131 MPS::OutputStream::flush();
00132 }
00133 };
00134
00135
00136
00137 class SimplTransport::SimplConnection: public MPS::Connection {
00138 private:
00139 int oid;
00140 FCID other;
00141
00142 public:
00143
00144
00145
00146
00147
00148
00149
00150 SimplConnection(int _oid, FCID const &_other, MPS::Address const &a)
00151 : MPS::Connection(a),
00152 oid(_oid),
00153 other(_other)
00154 {}
00155
00156
00157
00158
00159 virtual MPS::OutputStream *getOutputStream() {
00160 MPS::OutputStream *stream = new SimplIOStream(other, false);
00161 stream->writeint(oid);
00162 return stream;
00163 }
00164
00165
00166
00167
00168 virtual MPS::InputStream *getInputStream() {
00169 MPS::InputStream *stream = new SimplIOStream();
00170 int inputOid = stream->readint();
00171
00172 if (inputOid != oid) {
00173 throw MPS::MPSException("OID mismatch in SimplTransport::SimplConnection, got " +
00174 toString(inputOid) + " expected " + toString(oid));
00175 }
00176
00177 return stream;
00178 }
00179
00180 virtual void releaseOutputStream(MPS::OutputStream *stream) { delete stream; }
00181 virtual void releaseInputStream(MPS::InputStream *stream) { delete stream; }
00182 };
00183
00184
00185
00186 SimplTransport::SimplTransport(string const &_simplName)
00187 : Transport("simpl"),
00188 nextOid(1),
00189 simplName(_simplName),
00190 serverMap()
00191 {
00192 if (name_attach(const_cast<char *>(simplName.c_str()), MPS_MESSAGE_MAXLEN, 0) == -1)
00193 throw MPSException("Could not attach to SIMPL name " + simplName);
00194 }
00195
00196
00197
00198 ref<Connection> SimplTransport::connectTo(Address const &connectionSpec)
00199 {
00200 if (connectionSpec.getParamCount() != 2)
00201 throw MPS::MPSException("Simpl transport requires 2 Address parameters");
00202
00203 string remoteName = connectionSpec.getParam(0);
00204 int oid = atoi(connectionSpec.getParam(1).c_str());
00205
00206 FCID other = name_locate(const_cast<char *>(remoteName.c_str()));
00207 if (other.slot == -1)
00208 throw MPS::MPSException("Simpl name_locate of " + remoteName + " failed");
00209
00210 return new SimplConnection(oid, other, connectionSpec);
00211 }
00212
00213 string SimplTransport::registerServer(Server *server, Address const &spec)
00214 {
00215 if (spec.getParamCount() > 0) {
00216 throw MPS::MPSException("Simpl transport does not allow self-selection of simplName or OID");
00217 }
00218
00219 int thisOid = nextOid++;
00220
00221 serverMap[thisOid] = server;
00222
00223 Address newSpec("mps:simpl");
00224 newSpec.setParam(0, instance->simplName);
00225 newSpec.setParam(1, toString(thisOid));
00226
00227 return newSpec.getResolvedName();
00228 }
00229
00230 string SimplTransport::deregisterServer(Server *server, Address const &spec) {
00231
00232 throw MPSException("SimplTransport::deregisterServer not yet implemented");
00233 }
00234
00235
00236
00237 struct QueuedMessage_t {
00238 FCID sender;
00239 char *msg;
00240 int msglen;
00241
00242 QueuedMessage_t()
00243 : msg(0),
00244 msglen(0)
00245 {}
00246
00247 QueuedMessage_t(FCID const &s, char const *m, int len)
00248 : sender(s),
00249 msg(new char[len]),
00250 msglen(len)
00251 {
00252 memcpy(msg, m, len);
00253 }
00254
00255 ~QueuedMessage_t() {
00256 clean();
00257 }
00258
00259 private:
00260 QueuedMessage_t(QueuedMessage_t const &other);
00261 QueuedMessage_t const &operator=(QueuedMessage_t const &other);
00262
00263 void clean() {
00264 if (msg) {
00265 delete [] msg;
00266 msg = 0;
00267 }
00268 }
00269 };
00270
00271 static deque<QueuedMessage_t *> msgQueue;
00272
00273 void SimplTransport::pushMessage(void *sender, char const *msg, int msglen) {
00274 msgQueue.push_back(new QueuedMessage_t(* (FCID *) sender, msg, msglen));
00275 }
00276
00277 void *SimplTransport::popMessage(char *msg, int &msglen) {
00278 static FCID sender;
00279
00280 if (msgQueue.empty()) {
00281 sender = Receive(msg, &msglen);
00282 } else {
00283 QueuedMessage_t *qm = msgQueue.front();
00284 msgQueue.pop_front();
00285
00286 msglen = qm->msglen;
00287 memcpy(msg, qm->msg, msglen);
00288 sender = qm->sender;
00289
00290 delete qm;
00291 }
00292
00293 return &sender;
00294 }
00295
00296 void SimplTransport::mainloop() {
00297 while (1) {
00298 SimplIOStream input;
00299 int oid = input.readint();
00300
00301 serverMap_t::iterator i = instance->serverMap.find(oid);
00302 if (i == instance->serverMap.end()) {
00303 throw MPSException("Unknown OID " + toString(oid) + " received");
00304 }
00305
00306 Server *server = (*i).second;
00307 FCID sender = input.getOther();
00308
00309 SimplIOStream output(sender, true);
00310 output.writeint(oid);
00311 int pos = output.getLength();
00312
00313 server->dispatch(input, output);
00314
00315 if (output.getLength() > pos) {
00316 output.flush();
00317 } else {
00318 if (Reply(&sender, 0, 0) == -1)
00319 throw MPS::MPSException("SIMPL Reply failed in mainloop");
00320 }
00321 }
00322 }
00323
00324 }