00001
00002 #ifndef MPS_Transport_Fastmsg_Message_H
00003 #define MPS_Transport_Fastmsg_Message_H
00004
00005 #include <mps/mps.h>
00006 #include <fastmsg/Mailbox.h>
00007
00008 class FastmsgOutputStream: public MPS::OutputStream {
00009 private:
00010 ref<Mailbox> me;
00011 ref<Mailbox> other;
00012 int corrId;
00013
00014 protected:
00015 virtual string referenceTransport() const { return "fastmsg"; }
00016
00017 public:
00018 FastmsgOutputStream(ref<Mailbox> const &_me,
00019 ref<Mailbox> const &_other,
00020 int _corrId)
00021 : me(_me),
00022 other(_other),
00023 corrId(_corrId)
00024 {}
00025
00026 virtual void flush() {
00027 static int messageType = FastmsgRoot::getMessageType("MPS2");
00028
00029 int len = getLength();
00030 ref<Message> msg = Message::create(me, other, messageType,
00031 corrId, len);
00032 memcpy(msg->getBody(), getBody(), len);
00033 msg->deliver();
00034 MPS::OutputStream::flush();
00035 }
00036 };
00037
00038 class FastmsgInputStream: public MPS::InputStream {
00039 private:
00040 ref<Message> msg;
00041 int pos;
00042 int len;
00043
00044 public:
00045 FastmsgInputStream(ref<Message> const &_msg)
00046 : msg(_msg),
00047 pos(0),
00048 len(_msg->getLength())
00049 {}
00050
00051 virtual int read() {
00052 if (pos >= len) {
00053 throw MPS::MPSConnectionClosedException("reading from a fastmsg message");
00054 }
00055
00056 return ((char *) msg->getBody())[pos++];
00057 }
00058 };
00059
00060 #endif