// DU4-Scheduler.cpp
// Ales Novak NPRG051 2011/2012

#ifndef __scheduler_h__
#define __scheduler_h__

#include <pthread.h>
#include <semaphore.h>
#include <queue>
#include <list>
#include <stdexcept>

 * Class encapsulating POSIX semaphores */
class sem {
		sem_t s;
		 * initializes semaphore with value/0 */
		sem (unsigned int _val = 0) {
			int ret = sem_init (&s, 0, _val);
			if (ret) throw std::runtime_error ("Cannot create semaphore ");

		 * waits (optionally [implicitly] blocking, till the semaphore is > 0,
		 * then decrements it by 1 */
		int wait (bool blocking = true) {
			int r;
			if (blocking) {
				r = sem_wait (&s);
				return (r);
			} else {
				return (sem_trywait (&s));
		 * increments semaphore by 1 */
		void post ()
			sem_post (&s);

		 * waits for semaphore till specified time passes, returning !0 
		 * when timeout passed */
		int timedwait (int milli) {
			struct timespec ts;
			ts.tv_sec = milli/1000;
			ts.tv_sec = (milli*1000)%1000000;

			return (sem_timedwait (&s, &ts));


 * Class representing any task, it's the only type the executing threads have 
class base_task {
		base_task (class scheduling_policy *polic) : done(false), sema (NULL), sched (polic) {
		 * if task is done */
		bool done;
		 * this is NULL or semaphore, if there's a process waiting for this task to 
		 * finish (calling "get_result") */
		sem *sema;
		class scheduling_policy *sched;

		 * method implemented by inherited template class */
		virtual void proceed () = 0;

class scheduling_policy {
		 * Encapsulation of thread */
		class threadinfo {
			friend class scheduling_policy;
				pthread_t pth;
				class base_task *what;
				class scheduling_policy *sched;

				threadinfo (scheduling_policy *p) {//: sched (p) {
					sched = p;
				void cancel () {
					pthread_cancel (pth);
				void create (void*(*fce)(void*) = NULL) {
					if (pthread_create (& pth, NULL, fce?fce:sched->getfunc(), sched)) {
						throw std::runtime_error ("Cannot create thread");

		virtual void *(*getfunc())(void*) = 0;
		base_task *get_request (int milli = -1) {
			if (milli == -1)
				s_reqs.wait ();
			else {
				if (s_reqs.timedwait (milli)) return (NULL);
			m_reqs.wait ();
			base_task *r = todo.front ();
			todo.pop ();
			m_reqs.post ();
			return (r);

		void set_response (base_task *t) {
			m_reqs.wait ();
			t->done = true;
			if (t->sema) {
				t->sema->post ();	
			m_reqs.post ();

		void remove_thread (pthread_t p) {
			m_reqs.wait ();
			for (auto it = threads.begin (); it != threads.end () ; it ++) {
				if ((*it)->pth == p) {
					threads.erase (it);
			m_reqs.post ();

		std::queue <base_task *> todo;
		scheduling_policy (int maxthreads) : s_max_threads (maxthreads), s_reqs (0), m_reqs (1) {

		virtual void task_inserted (base_task *t) {


		sem s_max_threads;
		sem s_reqs;
		sem m_reqs;
		std::list <threadinfo *> threads;
 * template representing  taking T as type of result of the functor
 * F type of the functor
template <typename T, typename F> class task : public base_task {
		F & func;
		T result;
		task (F &f, scheduling_policy *p) : base_task (p), func (f) {
		void proceed () {
			result  = func ();

		 * Returns computed value */
		T get_result () {
			/** ensures we won't race */
			sched->m_reqs.wait ();
			if (! done) {
				/** we have to create new semaphore and wait for it
				 * it will be raised by thread, which does (will do)
				 * this task */
				sema = new sem ();
				sched->m_reqs.post ();
				sema->wait ();

				delete sema;
			} else {
				/** this is so easy, task was finished before we touched it */
				sched->m_reqs.post ();
			return (result);

 * ************************************************************
 * ***********************************************************/
template <int N> class thread_pool : public scheduling_policy {
		static void *thrfunc (void *arg);

		virtual void *(*getfunc())(void*) {
			return (thrfunc);

		 * we prepare the pool of executors. They will do the rest */
		thread_pool () : scheduling_policy (N) {
			for (int i = 0; i < N; i ++) {
				threadinfo *t = new threadinfo (this);
				t->create ();
				threads.push_back (t);

template <int N>
void *thread_pool<N>::thrfunc (void *arg)
	scheduling_policy *p = (scheduling_policy*) arg;

	do {
		base_task *t = p->get_request ();

		t->proceed ();

		p->set_response (t);
	} while (1);

	return (NULL);

 * ************************************************************
 * ***********************************************************/
template <int N> class one_thread_per_task : public scheduling_policy {
		static void *thrfunc (void *arg);
		static void *mainthrfunc (void *arg);

		virtual void *(*getfunc())(void*) {
			return (thrfunc);

		threadinfo *mainth;

		one_thread_per_task () : scheduling_policy (N) {
			mainth = new threadinfo (this);
			mainth->create (mainthrfunc);
			threads.push_back (mainth);

		~one_thread_per_task () {

template <int N>
void *one_thread_per_task<N>::mainthrfunc (void *arg)
	scheduling_policy *p = (scheduling_policy*) arg;

	do {
		p->s_max_threads.wait ();
		p->s_reqs.wait ();

		threadinfo *t = new threadinfo (p);
		t->create (thrfunc);
		p->threads.push_back (t);
		p->s_reqs.post ();

	} while (1);
	return (NULL);

template <int N>
void *one_thread_per_task<N>::thrfunc (void *arg)
	scheduling_policy *p = (scheduling_policy*) arg;

		base_task *t = p->get_request ();
		t->proceed ();

		p->set_response (t);

		pthread_detach (pthread_self ());
		p->s_max_threads.post ();
		p->remove_thread (pthread_self ());
		pthread_exit (NULL);

		return (NULL);

 * ************************************************************
 * ***********************************************************/
template <int N, int M> class  one_thread_per_task_with_delay : public scheduling_policy {
		sem s_free_execs;

		static void *thrfunc (void *arg);
		static void *mainthrfunc (void *arg);

		virtual void *(*getfunc())(void*) {
			return (thrfunc);

		threadinfo *mainth;

		one_thread_per_task_with_delay () : scheduling_policy (N) {
			mainth = new threadinfo (this);
			mainth->create (mainthrfunc);
			threads.push_back (mainth);
template <int N, int M>
void *one_thread_per_task_with_delay<N,M>::mainthrfunc (void *arg)
	one_thread_per_task_with_delay *p = (one_thread_per_task_with_delay*) arg;

	do {
		if (! p->s_free_execs.wait (false)) {
			/* we succeeded in locking free executor (which finished work
			 * but waits for further requests) */
		} else {
			p->s_max_threads.wait ();

			p->s_reqs.wait ();
			threadinfo *t = new threadinfo (p);
			t->create (thrfunc);
			p->threads.push_back (t);
			p->s_reqs.post ();


	} while (1);
	return (NULL);

template <int N, int M>
void *one_thread_per_task_with_delay <N,M>::thrfunc (void *arg)
	one_thread_per_task_with_delay *p = (one_thread_per_task_with_delay*) arg;

	base_task *t = p->get_request ();

	do {
		t->proceed ();

		p->set_response (t);

		p->s_free_execs.post ();

		t = p->get_request (M);

		/* we did not received new task until timeout passed */
		if (! t) break;
	} while (1);

	pthread_detach (pthread_self ());
	p->s_max_threads.post ();
	pthread_exit (NULL);
	p->remove_thread (pthread_self ());

	return (NULL);

template <typename S> class scheduler : protected S {
		using S::todo;
		using S::s_reqs;
		using S::m_reqs;
		using S::task_inserted;
		scheduler () {

		~scheduler () {

		template <typename F> auto run (F op) -> task<decltype(op()),F>* {
			task <decltype(op()),F> *t = new task <decltype(op()),F> (op, this);
			m_reqs.wait ();
			todo.push (t);
			s_reqs.post ();
			m_reqs.post ();
			task_inserted (t);
			return (t);
