nrtb-core team mailing list archive
-
nrtb-core team
-
Mailing list archive
-
Message #00303
[Merge] lp:~fpstovall/nrtb/circular_queue into lp:nrtb
Rick Stovall has proposed merging lp:~fpstovall/nrtb/circular_queue into lp:nrtb.
Requested reviews:
NRTB Core (nrtb-core): code
For more details, see:
https://code.launchpad.net/~fpstovall/nrtb/circular_queue/+merge/78527
Added the following to the nrtb c++ common library:
Circlar_queue; a fixed size, high speed buffer/queue designed for communications between threads.
linear_queue: an auto-expanding but somewhat slower buffer/queue designed for communications between threads.
--
https://code.launchpad.net/~fpstovall/nrtb/circular_queue/+merge/78527
Your team NRTB Core is requested to review the proposed merge of lp:~fpstovall/nrtb/circular_queue into lp:nrtb.
=== modified file 'common/Makefile'
--- common/Makefile 2011-08-15 03:47:28 +0000
+++ common/Makefile 2011-10-07 00:13:24 +0000
@@ -40,6 +40,8 @@
@cd point; make ${action}
@cd timer; make ${action}
@cd threads; make ${action}
+ @cd circular_queue; make ${action}
+ @cd linear_queue; make ${action}
@cd sockets; make ${action}
@cd serializer; make ${action}
@cd singleton; make ${action}
=== added directory 'common/circular_queue'
=== added file 'common/circular_queue/Makefile'
--- common/circular_queue/Makefile 1970-01-01 00:00:00 +0000
+++ common/circular_queue/Makefile 2011-10-07 00:13:24 +0000
@@ -0,0 +1,31 @@
+#***********************************************
+#This file is part of the NRTB project (https://launchpad.net/nrtb).
+#
+# NRTB is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# NRTB is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with NRTB. If not, see <http://www.gnu.org/licenses/>.
+#
+#***********************************************
+
+lib: circular_queue_test
+ @./circular_queue_test
+ @cp -v circular_queue.h ../include
+ @echo build complete
+
+circular_queue_test: circular_queue.h circular_queue_test.cpp
+ @rm -f circular_queue_test
+ g++ -c circular_queue_test.cpp -I../include
+ g++ -o circular_queue_test circular_queue_test.o ../obj/common.o ../obj/base_thread.o -lpthread
+
+clean:
+ @rm -rvf *.o circular_queue_test ../include/circular_queue.h *.log ../obj/circular_queue.o
+ @echo all objects and executables have been erased.
=== added file 'common/circular_queue/circular_queue.h'
--- common/circular_queue/circular_queue.h 1970-01-01 00:00:00 +0000
+++ common/circular_queue/circular_queue.h 2011-10-07 00:13:24 +0000
@@ -0,0 +1,174 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with NRTB. If not, see <http://www.gnu.org/licenses/>.
+
+ **********************************************/
+
+#ifndef nrtb_circular_queue_h
+#define nrtb_circular_queue_h
+
+#include <iostream>
+#include <base_thread.h>
+#include <boost/circular_buffer.hpp>
+
+namespace nrtb
+{
+
+/********************************************************
+ * The circular_queue template is designed for use with
+ * the classic producer/consumer thread management model.
+ * The producer uses circular_queue::push() to put items
+ * in the queue as they become available, and the consumer
+ * thread calls circular_queue::park() when it is ready
+ * for the next item to work.
+ *
+ * Common uses would be for buffering outgoing or incomming
+ * messages from a communications channel, providing a feed
+ * queue for parallel threads to make full use of multi-core
+ * processors, or any case where one or more threads are
+ * passing data to another set of threads.
+********************************************************/
+template <class T>
+class circular_queue
+{
+public:
+ class queue_not_ready: public base_exception {};
+
+ /*********************************************
+ * creates the queue with the specified
+ * number of elements. All memory is allocated
+ * at construction to minimize delays at runtime.
+ *********************************************/
+ circular_queue(int size);
+
+ /*********************************************
+ * releases all items in the queue
+ *********************************************/
+ virtual ~circular_queue();
+
+ /*********************************************
+ * Puts an item in the queue.
+ *********************************************/
+ void push(T item);
+
+ /*********************************************
+ * Pops the next item off the queue, blocking
+ * if needed until an item becomes available.
+ *********************************************/
+ T pop();
+
+ /*********************************************
+ * puts the queue in shutdown mode.
+ *********************************************/
+ void shutdown();
+
+ // returns the number of items in the queue
+ int size();
+ // resizes the buffer, may cause data loss
+ void resize(int newsize);
+ // clears the buffer, data will be discarded.
+ void clear();
+
+protected:
+
+ boost::circular_buffer<T> buffer;
+ cond_variable buffer_lock;
+ bool ready;
+};
+
+template <class T>
+circular_queue<T>::circular_queue(int size)
+{
+ buffer.set_capacity(size);
+ ready = true;
+};
+
+// TODO: needed ... a queue stop method.
+
+template <class T>
+circular_queue<T>::~circular_queue()
+{
+};
+
+template <class T>
+void circular_queue<T>::push(T item)
+{
+ if (ready)
+ {
+ scope_lock lock(buffer_lock);
+ buffer.push_back(item);
+ buffer_lock.signal();
+ }
+ else
+ {
+ queue_not_ready e;
+ throw e;
+ }
+};
+
+template <class T>
+T circular_queue<T>::pop()
+{
+ scope_lock lock(buffer_lock);
+ while (buffer.empty() && ready)
+ buffer_lock.wait();
+ if (!ready)
+ {
+ queue_not_ready e;
+ throw e;
+ };
+ T returnme = buffer.front();
+ buffer.pop_front();
+ return returnme;
+};
+
+template <class T>
+void circular_queue<T>::shutdown()
+{
+ try
+ {
+ scope_lock lock(buffer_lock);
+ ready = false;
+ buffer_lock.broadcast_signal();
+ buffer.clear();
+ }
+ catch (...) {}
+}
+
+
+template <class T>
+int circular_queue<T>::size()
+{
+ scope_lock lock(buffer_lock);
+ return buffer.size();
+};
+
+template <class T>
+void circular_queue<T>::resize(int newsize)
+{
+ scope_lock lock(buffer_lock);
+ buffer.set_capacity(newsize);
+};
+
+template <class T>
+void circular_queue<T>::clear()
+{
+ scope_lock lock(buffer_lock);
+ buffer.clear();
+};
+
+} // namespace nrtb
+
+#endif //nrtb_circular_queue_h//
=== added file 'common/circular_queue/circular_queue_test.cpp'
--- common/circular_queue/circular_queue_test.cpp 1970-01-01 00:00:00 +0000
+++ common/circular_queue/circular_queue_test.cpp 2011-10-07 00:13:24 +0000
@@ -0,0 +1,152 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with NRTB. If not, see <http://www.gnu.org/licenses/>.
+
+ **********************************************/
+
+#include <string>
+#include <iostream>
+#include "circular_queue.h"
+#include <boost/shared_ptr.hpp>
+
+using namespace nrtb;
+using namespace std;
+
+typedef circular_queue<int> test_queue;
+typedef boost::shared_ptr<test_queue> queue_p;
+
+class consumer_task: public thread
+{
+public:
+
+ consumer_task(string n, queue_p buffer)
+ {
+ name = n;
+ input = buffer;
+ count = 0;
+ };
+
+ ~consumer_task()
+ {
+ cout << ">> in " << name << "::~consumer_task()" << endl;
+ try
+ {
+ this->thread::~thread();
+ input.reset();
+ }
+ catch (...) {};
+ cout << "<< leaving " << name << "::~consumer_task()" << endl;
+ };
+
+ int get_count() { return count; };
+
+ void run()
+ {
+ try
+ {
+ while (true)
+ {
+ int num = input->pop();
+ {
+ static mutex console;
+ scope_lock lock(console);
+ cout << name << " picked up " << num
+ << endl;
+ };
+ count++;
+ lastnum = num;
+ yield();
+ }
+ }
+ catch (...) {};
+ };
+
+protected:
+ // link to the feed queue
+ queue_p input;
+ // a name to report
+ string name;
+ // number of items processed
+ int count;
+ // last number caught
+ int lastnum;
+};
+
+typedef boost::shared_ptr<consumer_task> task_p;
+
+
+int main()
+{
+ int er_count = 0;
+ /************************************************
+ * Load queue and then cook it down...
+ ***********************************************/
+ // make and load a queue
+ queue_p q1(new test_queue(50));
+ for (int i=0; i<100; i++)
+ {
+ q1->push(i);
+ };
+ // the queue should be loaded with 50-99
+ // attach a thread and process it.
+ task_p p1(new consumer_task("task 1",q1));
+ p1->start();
+ while (q1->size()) usleep(100);
+ cout << "cp 1 " << p1->get_count() << endl;
+ /************************************************
+ * now that the preload is exhasted, shove items
+ * in one at a time to make sure each is picked
+ * up correctly.
+ ***********************************************/
+ for (int i=200; i<225; i++)
+ {
+ q1->push(i);
+ usleep(100);
+ };
+ cout << "cp 2 " << p1->get_count() << endl;
+ /************************************************
+ * Last check; attach a second thread to the queue
+ * and make sure both are servicing it.
+ ***********************************************/
+ task_p p2(new consumer_task("task 2",q1));
+ p2->start();
+ for (int i=300; i<325; i++)
+ {
+ q1->push(i);
+ };
+ while (q1->size()) usleep(100);
+ // shut it all down
+ q1->shutdown();
+ p1->join();
+ p2->join();
+ // important numbers
+ int tot_items = p1->get_count() + p2->get_count();
+ int p1_items = p1->get_count() - 75;
+ int p2_items = p2->get_count();
+ // release she threads and queues.
+ p1.reset();
+ p2.reset();
+ q1.reset();
+ // do some reporting.
+ cout << "cp 3 "
+ << tot_items
+ << " [75 + (" << p1_items
+ << " + " << p2_items
+ << ")]" << endl;
+ bool passed = (tot_items == 100);
+ // inverted logic needed because 0 is good for
+ // return codes.
+ return !passed;
+};
\ No newline at end of file
=== added directory 'common/linear_queue'
=== added file 'common/linear_queue/Makefile'
--- common/linear_queue/Makefile 1970-01-01 00:00:00 +0000
+++ common/linear_queue/Makefile 2011-10-07 00:13:24 +0000
@@ -0,0 +1,31 @@
+#***********************************************
+#This file is part of the NRTB project (https://launchpad.net/nrtb).
+#
+# NRTB is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# NRTB is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with NRTB. If not, see <http://www.gnu.org/licenses/>.
+#
+#***********************************************
+
+lib: linear_queue_test
+ @./linear_queue_test
+ @cp -v linear_queue.h ../include
+ @echo build complete
+
+linear_queue_test: linear_queue.h linear_queue_test.cpp
+ @rm -f linear_queue_test
+ g++ -c linear_queue_test.cpp -I../include
+ g++ -o linear_queue_test linear_queue_test.o ../obj/common.o ../obj/base_thread.o -lpthread
+
+clean:
+ @rm -rvf *.o linear_queue_test ../include/linear_queue.h *.log ../obj/linear_queue.o
+ @echo all objects and executables have been erased.
=== added file 'common/linear_queue/linear_queue.h'
--- common/linear_queue/linear_queue.h 1970-01-01 00:00:00 +0000
+++ common/linear_queue/linear_queue.h 2011-10-07 00:13:24 +0000
@@ -0,0 +1,165 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with NRTB. If not, see <http://www.gnu.org/licenses/>.
+
+ **********************************************/
+
+#ifndef nrtb_linear_queue_h
+#define nrtb_linear_queue_h
+
+#include <iostream>
+#include <base_thread.h>
+#include <list>
+
+namespace nrtb
+{
+
+/********************************************************
+ * The linear_queue template is designed for use with
+ * the classic producer/consumer thread management model.
+ * The producer uses linear_queue::push() to put items
+ * in the queue as they become available, and the consumer
+ * thread calls linear_queue::park() when it is ready
+ * for the next item to work.
+ *
+ * This queue will expand as needed. Constrast this with
+ * the circular_queue, which is of a fixed size and will
+ * drop older data when full
+ *
+ * Common uses would be for buffering outgoing or incomming
+ * messages from a communications channel, providing a feed
+ * queue for parallel threads to make full use of multi-core
+ * processors, or any case where one or more threads are
+ * passing data to another set of threads.
+********************************************************/
+template <class T>
+class linear_queue
+{
+public:
+ class queue_not_ready: public base_exception {};
+
+ /*********************************************
+ * creates the queue with the specified
+ * number of elements.
+ *********************************************/
+ linear_queue();
+
+ /*********************************************
+ * releases all items in the queue
+ *********************************************/
+ virtual ~linear_queue();
+
+ /*********************************************
+ * Puts an item in the queue.
+ *********************************************/
+ void push(T item);
+
+ /*********************************************
+ * Pops the next item off the queue, blocking
+ * if needed until an item becomes available.
+ *********************************************/
+ T pop();
+
+ /*********************************************
+ * puts the queue in shutdown mode.
+ *********************************************/
+ void shutdown();
+
+ // returns the number of items in the queue
+ int size();
+ // clears the buffer, data will be discarded.
+ void clear();
+
+protected:
+
+ std::list<T> buffer;
+ cond_variable buffer_lock;
+ bool ready;
+};
+
+template <class T>
+linear_queue<T>::linear_queue()
+{
+ ready = true;
+};
+
+template <class T>
+linear_queue<T>::~linear_queue()
+{
+};
+
+template <class T>
+void linear_queue<T>::push(T item)
+{
+ if (ready)
+ {
+ scope_lock lock(buffer_lock);
+ buffer.push_back(item);
+ buffer_lock.signal();
+ }
+ else
+ {
+ queue_not_ready e;
+ throw e;
+ }
+};
+
+template <class T>
+T linear_queue<T>::pop()
+{
+ scope_lock lock(buffer_lock);
+ while (buffer.empty() && ready)
+ buffer_lock.wait();
+ if (!ready)
+ {
+ queue_not_ready e;
+ throw e;
+ };
+ T returnme = buffer.front();
+ buffer.pop_front();
+ return returnme;
+};
+
+template <class T>
+void linear_queue<T>::shutdown()
+{
+ try
+ {
+ scope_lock lock(buffer_lock);
+ ready = false;
+ buffer_lock.broadcast_signal();
+ buffer.clear();
+ }
+ catch (...) {}
+}
+
+
+template <class T>
+int linear_queue<T>::size()
+{
+ scope_lock lock(buffer_lock);
+ return buffer.size();
+};
+
+template <class T>
+void linear_queue<T>::clear()
+{
+ scope_lock lock(buffer_lock);
+ buffer.clear();
+};
+
+} // namespace nrtb
+
+#endif //nrtb_linear_queue_h//
=== added file 'common/linear_queue/linear_queue_test.cpp'
--- common/linear_queue/linear_queue_test.cpp 1970-01-01 00:00:00 +0000
+++ common/linear_queue/linear_queue_test.cpp 2011-10-07 00:13:24 +0000
@@ -0,0 +1,152 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with NRTB. If not, see <http://www.gnu.org/licenses/>.
+
+ **********************************************/
+
+#include <string>
+#include <iostream>
+#include "linear_queue.h"
+#include <boost/shared_ptr.hpp>
+
+using namespace nrtb;
+using namespace std;
+
+typedef linear_queue<int> test_queue;
+typedef boost::shared_ptr<test_queue> queue_p;
+
+class consumer_task: public thread
+{
+public:
+
+ consumer_task(string n, queue_p buffer)
+ {
+ name = n;
+ input = buffer;
+ count = 0;
+ };
+
+ ~consumer_task()
+ {
+ cout << ">> in " << name << "::~consumer_task()" << endl;
+ try
+ {
+ this->thread::~thread();
+ input.reset();
+ }
+ catch (...) {};
+ cout << "<< leaving " << name << "::~consumer_task()" << endl;
+ };
+
+ int get_count() { return count; };
+
+ void run()
+ {
+ try
+ {
+ while (true)
+ {
+ int num = input->pop();
+ {
+ static mutex console;
+ scope_lock lock(console);
+ cout << name << " picked up " << num
+ << endl;
+ };
+ count++;
+ lastnum = num;
+ yield();
+ }
+ }
+ catch (...) {};
+ };
+
+protected:
+ // link to the feed queue
+ queue_p input;
+ // a name to report
+ string name;
+ // number of items processed
+ int count;
+ // last number caught
+ int lastnum;
+};
+
+typedef boost::shared_ptr<consumer_task> task_p;
+
+
+int main()
+{
+ int er_count = 0;
+ /************************************************
+ * Load queue and then cook it down...
+ ***********************************************/
+ // make and load a queue
+ queue_p q1(new test_queue());
+ for (int i=0; i<100; i++)
+ {
+ q1->push(i);
+ };
+ // the queue should be loaded with 50-99
+ // attach a thread and process it.
+ task_p p1(new consumer_task("task 1",q1));
+ p1->start();
+ while (q1->size()) usleep(100);
+ cout << "cp 1 " << p1->get_count() << endl;
+ /************************************************
+ * now that the preload is exhasted, shove items
+ * in one at a time to make sure each is picked
+ * up correctly.
+ ***********************************************/
+ for (int i=200; i<225; i++)
+ {
+ q1->push(i);
+ usleep(100);
+ };
+ cout << "cp 2 " << p1->get_count() << endl;
+ /************************************************
+ * Last check; attach a second thread to the queue
+ * and make sure both are servicing it.
+ ***********************************************/
+ task_p p2(new consumer_task("task 2",q1));
+ p2->start();
+ for (int i=300; i<325; i++)
+ {
+ q1->push(i);
+ };
+ while (q1->size()) usleep(100);
+ // shut it all down
+ q1->shutdown();
+ p1->join();
+ p2->join();
+ // important numbers
+ int tot_items = p1->get_count() + p2->get_count();
+ int p1_items = p1->get_count() - 125;
+ int p2_items = p2->get_count();
+ // release she threads and queues.
+ p1.reset();
+ p2.reset();
+ q1.reset();
+ // do some reporting.
+ cout << "cp 3 "
+ << tot_items
+ << " [125 + (" << p1_items
+ << " + " << p2_items
+ << ")]" << endl;
+ bool passed = (tot_items == 150);
+ // inverted logic needed because 0 is good for
+ // return codes.
+ return !passed;
+};
\ No newline at end of file
=== modified file 'common/threads/base_thread.cpp'
--- common/threads/base_thread.cpp 2011-09-15 01:21:11 +0000
+++ common/threads/base_thread.cpp 2011-10-07 00:13:24 +0000
@@ -352,22 +352,15 @@
// is there anyone waiting on this cond_variable?
if (!try_lock() || (waiting > 0))
{
- // not good, there are others waiting on us or we are locked.
- unlock();
cerr << "WARNING: there were " << waiting <<
" threads queued in ~cond_variable." << endl;
- }
- else
- {
- pthread_cond_destroy(&mycv);
};
- // unlock before we leave.
- unlock();
}
catch (...)
{
cerr << "WARNING: there was an error in ~cond_variable." << endl;;
};
+ pthread_cond_destroy(&mycv);
};
void cond_variable::lock()
Follow ups