nrtb-core team mailing list archive
-
nrtb-core team
-
Mailing list archive
-
Message #00530
[Branch ~fpstovall/nrtb/fps-sprint-003] Rev 29: Completed the new ipc channel manager library. It consists of the following:
------------------------------------------------------------
revno: 29
committer: Rick Stovall <fpstovall>
branch nick: ricks-sprint-003
timestamp: Fri 2013-08-30 10:05:13 -0400
message:
Completed the new ipc channel manager library. It consists of the following:
* abs_ipc_record : a record designed to be superclass for all ipc messages.
* ipc_record_p : a std::unique_ptr used to queue and catch ipc messages.
* ipc_queue : a linear_queue of ipc_record_p elements.
* ipc_channel_manager: a class which provides a "clearing house" for named ipc queues.
* global_ipc_channel_manager: a singleton which provides a univerally accessable ipc_channel_manager.
Unit tests have been written and completed successfully. The ipc library should be ready to use.
modified:
cpp/common/Makefile
cpp/common/ipc_channel/ipc_channel.cpp
cpp/common/ipc_channel/ipc_channel.h
cpp/common/ipc_channel/ipc_channel_test.cpp
--
lp:~fpstovall/nrtb/fps-sprint-003
https://code.launchpad.net/~fpstovall/nrtb/fps-sprint-003
Your team NRTB Core is subscribed to branch lp:~fpstovall/nrtb/fps-sprint-003.
To unsubscribe from this branch go to https://code.launchpad.net/~fpstovall/nrtb/fps-sprint-003/+edit-subscription
=== modified file 'cpp/common/Makefile'
--- cpp/common/Makefile 2013-08-09 16:30:23 +0000
+++ cpp/common/Makefile 2013-08-30 14:05:13 +0000
@@ -47,6 +47,7 @@
@cd singleton; make ${action}
@cd logger; make ${action}
@cd confreader; make ${action}
+ @cd ipc_channel; make ${action}
# --- the following are obsolete and may be removed later.
# @cd threads; make ${action}
=== modified file 'cpp/common/ipc_channel/ipc_channel.cpp'
--- cpp/common/ipc_channel/ipc_channel.cpp 2013-08-27 02:17:50 +0000
+++ cpp/common/ipc_channel/ipc_channel.cpp 2013-08-30 14:05:13 +0000
@@ -40,7 +40,5 @@
return channels.end();
}
-
-
} // namespace nrtb
=== modified file 'cpp/common/ipc_channel/ipc_channel.h'
--- cpp/common/ipc_channel/ipc_channel.h 2013-08-27 02:17:50 +0000
+++ cpp/common/ipc_channel/ipc_channel.h 2013-08-30 14:05:13 +0000
@@ -22,8 +22,6 @@
#include <common.h>
#include <linear_queue.h>
#include <memory>
-#include <boost/concept_check.hpp>
-#include <serializer.h>
#include <singleton.h>
@@ -32,7 +30,7 @@
class abs_ipc_record;
-typedef abs_ipc_record* ipc_record_p;
+typedef std::unique_ptr<abs_ipc_record> ipc_record_p;
typedef linear_queue<ipc_record_p> ipc_queue;
=== modified file 'cpp/common/ipc_channel/ipc_channel_test.cpp'
--- cpp/common/ipc_channel/ipc_channel_test.cpp 2013-08-27 02:17:50 +0000
+++ cpp/common/ipc_channel/ipc_channel_test.cpp 2013-08-30 14:05:13 +0000
@@ -18,6 +18,7 @@
#include "ipc_channel.h"
#include <iostream>
+#include <future>
using namespace nrtb;
using namespace std;
@@ -29,6 +30,8 @@
int msg_num;
};
+typedef main_msg * main_msg_p;
+
class worker_msg: public abs_ipc_record
{
public:
@@ -36,21 +39,27 @@
int ret_num;
};
+typedef worker_msg * worker_msg_p;
+
int worker(int limit)
{
- global_ipc_channel_manager & ipc
+ cout << "worker started" << endl;
+ global_ipc_channel_manager & ipc
= global_ipc_channel_manager::get_reference();
ipc_queue & in = ipc.get("worker");
ipc_queue & out = ipc.get("main");
int total(0);
while (total < limit)
{
- main_msg & msg = static_cast<main_msg>(in.pop());
+ ipc_record_p raw_msg = in.pop();
+ main_msg_p msg = static_cast<main_msg_p>(raw_msg.get());
worker_msg_p outmsg(new worker_msg(in));
outmsg->ret_num = msg->msg_num;
- out.push(outmsg);
+ msg->return_to.push(ipc_record_p(outmsg));
total++;
};
+ cout << "worker ended" << endl;
+ return total;
};
int main()
@@ -64,17 +73,36 @@
ipc_queue & out = ipc.get("worker");
int limit = 100;
// start the worker here.
+ auto worktask = async(launch::async,worker,limit);
for (int i(0); i<limit; i++)
{
main_msg_p msg(new main_msg(in));
msg->msg_num = i;
- out.push(msg);
- };
+ out.push(ipc_record_p(msg));
+ };
+
+ cout << "processed " << worktask.get() << endl;
+
+ while (in.size())
+ {
+ ipc_record_p raw = in.pop();
+ worker_msg_p reply = static_cast<worker_msg_p>(raw.get());
+ };
+
+ auto b = ipc.begin();
+ auto e = ipc.end();
+ for (auto i = b; i != e; i++)
+ {
+ cout << i->first << ": "
+ << i->second.in_count << " | "
+ << i->second.out_count << endl;
+ }
cout << "=========== IPC Channel test complete ============="
<< endl;
-
+
+ return !(out.in_count == in.out_count);
};