← Back to team overview

nrtb-core team mailing list archive

[Branch ~fpstovall/nrtb/fps-sprint-003] Rev 29: Completed the new ipc channel manager library. It consists of the following:

 

------------------------------------------------------------
revno: 29
committer: Rick Stovall <fpstovall>
branch nick: ricks-sprint-003
timestamp: Fri 2013-08-30 10:05:13 -0400
message:
  Completed the new ipc channel manager library. It consists of the following:
   
   * abs_ipc_record : a record designed to be superclass for all ipc messages.
   * ipc_record_p : a std::unique_ptr used to queue and catch ipc messages.
   * ipc_queue :  a linear_queue of ipc_record_p elements.
   * ipc_channel_manager: a class which provides a "clearing house" for named ipc queues.
   * global_ipc_channel_manager: a singleton which provides a univerally accessable ipc_channel_manager.
  
  Unit tests have been written and completed successfully. The ipc library should be ready to use.
modified:
  cpp/common/Makefile
  cpp/common/ipc_channel/ipc_channel.cpp
  cpp/common/ipc_channel/ipc_channel.h
  cpp/common/ipc_channel/ipc_channel_test.cpp


--
lp:~fpstovall/nrtb/fps-sprint-003
https://code.launchpad.net/~fpstovall/nrtb/fps-sprint-003

Your team NRTB Core is subscribed to branch lp:~fpstovall/nrtb/fps-sprint-003.
To unsubscribe from this branch go to https://code.launchpad.net/~fpstovall/nrtb/fps-sprint-003/+edit-subscription
=== modified file 'cpp/common/Makefile'
--- cpp/common/Makefile	2013-08-09 16:30:23 +0000
+++ cpp/common/Makefile	2013-08-30 14:05:13 +0000
@@ -47,6 +47,7 @@
 	@cd singleton; make ${action}
 	@cd logger; make ${action}
 	@cd confreader; make ${action}
+	@cd ipc_channel; make ${action}
 
 # --- the following are obsolete and may be removed later.
 #	@cd threads; make ${action}

=== modified file 'cpp/common/ipc_channel/ipc_channel.cpp'
--- cpp/common/ipc_channel/ipc_channel.cpp	2013-08-27 02:17:50 +0000
+++ cpp/common/ipc_channel/ipc_channel.cpp	2013-08-30 14:05:13 +0000
@@ -40,7 +40,5 @@
   return channels.end();
 }
 
-
-  
 } // namespace nrtb
 

=== modified file 'cpp/common/ipc_channel/ipc_channel.h'
--- cpp/common/ipc_channel/ipc_channel.h	2013-08-27 02:17:50 +0000
+++ cpp/common/ipc_channel/ipc_channel.h	2013-08-30 14:05:13 +0000
@@ -22,8 +22,6 @@
 #include <common.h>
 #include <linear_queue.h>
 #include <memory>
-#include <boost/concept_check.hpp>
-#include <serializer.h>
 #include <singleton.h>
 
 
@@ -32,7 +30,7 @@
   
 class abs_ipc_record;
 
-typedef abs_ipc_record* ipc_record_p;
+typedef std::unique_ptr<abs_ipc_record> ipc_record_p;
 
 typedef linear_queue<ipc_record_p> ipc_queue;
 

=== modified file 'cpp/common/ipc_channel/ipc_channel_test.cpp'
--- cpp/common/ipc_channel/ipc_channel_test.cpp	2013-08-27 02:17:50 +0000
+++ cpp/common/ipc_channel/ipc_channel_test.cpp	2013-08-30 14:05:13 +0000
@@ -18,6 +18,7 @@
 
 #include "ipc_channel.h"
 #include <iostream>
+#include <future>
 
 using namespace nrtb;
 using namespace std;
@@ -29,6 +30,8 @@
   int msg_num;
 };
 
+typedef main_msg * main_msg_p;
+
 class worker_msg: public abs_ipc_record
 {
 public:
@@ -36,21 +39,27 @@
   int ret_num;
 };
 
+typedef worker_msg * worker_msg_p;
+
 int worker(int limit)
 {
-  global_ipc_channel_manager & ipc 
+  cout << "worker started" << endl;
+  global_ipc_channel_manager & ipc
     = global_ipc_channel_manager::get_reference();
   ipc_queue & in = ipc.get("worker");
   ipc_queue & out = ipc.get("main");
   int total(0);
   while (total < limit)
   {
-    main_msg & msg = static_cast<main_msg>(in.pop());
+    ipc_record_p raw_msg = in.pop();
+    main_msg_p msg = static_cast<main_msg_p>(raw_msg.get());
     worker_msg_p outmsg(new worker_msg(in));
     outmsg->ret_num = msg->msg_num;
-    out.push(outmsg);
+    msg->return_to.push(ipc_record_p(outmsg));
     total++;
   };
+  cout << "worker ended" << endl;
+  return total;
 };
 
 int main()
@@ -64,17 +73,36 @@
   ipc_queue & out = ipc.get("worker");
   int limit = 100;
   // start the worker here.
+  auto worktask = async(launch::async,worker,limit);
   
   for (int i(0); i<limit; i++)
   {
     main_msg_p msg(new main_msg(in));
     msg->msg_num = i;
-    out.push(msg);
-  };
+    out.push(ipc_record_p(msg));
+  };
+  
+  cout << "processed " << worktask.get() << endl;
+
+  while (in.size())
+  {
+    ipc_record_p raw = in.pop();
+    worker_msg_p reply = static_cast<worker_msg_p>(raw.get());
+  };
+  
+  auto b = ipc.begin();
+  auto e = ipc.end();
+  for (auto i = b; i != e; i++)
+  {
+    cout << i->first << ": "
+      << i->second.in_count << " | "
+      << i->second.out_count << endl;
+  }
 
   cout << "=========== IPC Channel test complete ============="
     << endl;
-  
+
+  return !(out.in_count == in.out_count);
 };