// DU4-Scheduler.cpp
// Ales Novak NPRG051 2011/2012

#ifndef __scheduler_h__
#define __scheduler_h__

#include <pthread.h>
#include <semaphore.h>
#include <queue>
#include <list>
#include <stdexcept>

/**
 * Class encapsulating POSIX semaphores */
class sem {
	protected:
		sem_t s;
	public:
		/**
		 * initializes semaphore with value/0 */
		sem (unsigned int _val = 0) {
			int ret = sem_init (&s, 0, _val);
			if (ret) throw std::runtime_error ("Cannot create semaphore ");
		}

		/**
		 * waits (optionally [implicitly] blocking, till the semaphore is > 0,
		 * then decrements it by 1 */
		int wait (bool blocking = true) {
			int r;
			if (blocking) {
				r = sem_wait (&s);
				return (r);
			} else {
				return (sem_trywait (&s));
			}
		}
		
		/**
		 * increments semaphore by 1 */
		void post ()
		{
			sem_post (&s);
		}

		/**
		 * waits for semaphore till specified time passes, returning !0 
		 * when timeout passed */
		int timedwait (int milli) {
			struct timespec ts;
			ts.tv_sec = milli/1000;
			ts.tv_sec = (milli*1000)%1000000;

			return (sem_timedwait (&s, &ts));
		}

};

/**
 * Class representing any task, it's the only type the executing threads have 
 */
class base_task {
	public:
		base_task (class scheduling_policy *polic) : done(false), sema (NULL), sched (polic) {
		}
		/**
		 * if task is done */
		bool done;
		/**
		 * this is NULL or semaphore, if there's a process waiting for this task to 
		 * finish (calling "get_result") */
		sem *sema;
		class scheduling_policy *sched;

		/**
		 * method implemented by inherited template class */
		virtual void proceed () = 0;
};


class scheduling_policy {
	public:
		/**
		 * Encapsulation of thread */
		class threadinfo {
			friend class scheduling_policy;
			protected:
				pthread_t pth;
				class base_task *what;
				class scheduling_policy *sched;
			public:

				threadinfo (scheduling_policy *p) {//: sched (p) {
					sched = p;
				}
				void cancel () {
					pthread_cancel (pth);
				}
				void create (void*(*fce)(void*) = NULL) {
					if (pthread_create (& pth, NULL, fce?fce:sched->getfunc(), sched)) {
						throw std::runtime_error ("Cannot create thread");
					}
				}
				 
		};

		virtual void *(*getfunc())(void*) = 0;
		base_task *get_request (int milli = -1) {
			if (milli == -1)
				s_reqs.wait ();
			else {
				if (s_reqs.timedwait (milli)) return (NULL);
			}
			m_reqs.wait ();
			base_task *r = todo.front ();
			todo.pop ();
			m_reqs.post ();
			return (r);
		}

		void set_response (base_task *t) {
			m_reqs.wait ();
			t->done = true;
			if (t->sema) {
				t->sema->post ();	
			}
			m_reqs.post ();
		}

		void remove_thread (pthread_t p) {
			m_reqs.wait ();
			for (auto it = threads.begin (); it != threads.end () ; it ++) {
				if ((*it)->pth == p) {
					threads.erase (it);
					break;
				}
			}
			m_reqs.post ();
		}

	public:
		std::queue <base_task *> todo;
		scheduling_policy (int maxthreads) : s_max_threads (maxthreads), s_reqs (0), m_reqs (1) {
		}

		virtual void task_inserted (base_task *t) {

		}

		sem s_max_threads;
		sem s_reqs;
		sem m_reqs;
		std::list <threadinfo *> threads;
		
};
/**
 * template representing  taking T as type of result of the functor
 * F type of the functor
 */
template <typename T, typename F> class task : public base_task {
	protected:
		F & func;
		T result;
	public:
		task (F &f, scheduling_policy *p) : base_task (p), func (f) {
		}
		void proceed () {
			result  = func ();
		}

		/** 
		 * Returns computed value */
		T get_result () {
			/** ensures we won't race */
			sched->m_reqs.wait ();
			
			if (! done) {
				/** we have to create new semaphore and wait for it
				 * it will be raised by thread, which does (will do)
				 * this task */
				sema = new sem ();
				sched->m_reqs.post ();
				sema->wait ();

				delete sema;
			} else {
				/** this is so easy, task was finished before we touched it */
				sched->m_reqs.post ();
			}
			return (result);
		}
};

/**************************************************************
 *************************************************************
 * THREAD_POOL
 * ************************************************************
 * ***********************************************************/
template <int N> class thread_pool : public scheduling_policy {
	public:
		static void *thrfunc (void *arg);

		virtual void *(*getfunc())(void*) {
			return (thrfunc);
		}

		/**
		 * we prepare the pool of executors. They will do the rest */
		thread_pool () : scheduling_policy (N) {
			for (int i = 0; i < N; i ++) {
				threadinfo *t = new threadinfo (this);
				t->create ();
				threads.push_back (t);
			}
		}
};

template <int N>
void *thread_pool<N>::thrfunc (void *arg)
{
	scheduling_policy *p = (scheduling_policy*) arg;

	do {
		base_task *t = p->get_request ();

		t->proceed ();


		p->set_response (t);
	} while (1);

	return (NULL);
}


/**************************************************************
 *************************************************************
 * ONE_THREAD_PER_TASK
 * ************************************************************
 * ***********************************************************/
template <int N> class one_thread_per_task : public scheduling_policy {
	public:
		static void *thrfunc (void *arg);
		static void *mainthrfunc (void *arg);

		virtual void *(*getfunc())(void*) {
			return (thrfunc);
		}

		threadinfo *mainth;

		one_thread_per_task () : scheduling_policy (N) {
			mainth = new threadinfo (this);
			mainth->create (mainthrfunc);
			threads.push_back (mainth);
		}

		~one_thread_per_task () {
			
		}
};

template <int N>
void *one_thread_per_task<N>::mainthrfunc (void *arg)
{
	scheduling_policy *p = (scheduling_policy*) arg;

	do {
		p->s_max_threads.wait ();
		p->s_reqs.wait ();

		threadinfo *t = new threadinfo (p);
		t->create (thrfunc);
		p->threads.push_back (t);
		p->s_reqs.post ();

	} while (1);
	return (NULL);
}

template <int N>
void *one_thread_per_task<N>::thrfunc (void *arg)
{
	scheduling_policy *p = (scheduling_policy*) arg;

		base_task *t = p->get_request ();
		t->proceed ();


		p->set_response (t);

		pthread_detach (pthread_self ());
		p->s_max_threads.post ();
		p->remove_thread (pthread_self ());
		pthread_exit (NULL);

		return (NULL);
	}



/**************************************************************
 *************************************************************
 * ONE_THREAD_PER_TASK_WITH_DELAY
 * ************************************************************
 * ***********************************************************/
template <int N, int M> class  one_thread_per_task_with_delay : public scheduling_policy {
	public:
		sem s_free_execs;

		static void *thrfunc (void *arg);
		static void *mainthrfunc (void *arg);

		virtual void *(*getfunc())(void*) {
			return (thrfunc);
		}

		threadinfo *mainth;

		one_thread_per_task_with_delay () : scheduling_policy (N) {
			mainth = new threadinfo (this);
			mainth->create (mainthrfunc);
			threads.push_back (mainth);
		}
};
template <int N, int M>
void *one_thread_per_task_with_delay<N,M>::mainthrfunc (void *arg)
{
	one_thread_per_task_with_delay *p = (one_thread_per_task_with_delay*) arg;

	do {
		if (! p->s_free_execs.wait (false)) {
			/* we succeeded in locking free executor (which finished work
			 * but waits for further requests) */
		} else {
			p->s_max_threads.wait ();

			p->s_reqs.wait ();
			threadinfo *t = new threadinfo (p);
			t->create (thrfunc);
			p->threads.push_back (t);
			p->s_reqs.post ();

		}


	} while (1);
	return (NULL);
}


template <int N, int M>
void *one_thread_per_task_with_delay <N,M>::thrfunc (void *arg)
{
	one_thread_per_task_with_delay *p = (one_thread_per_task_with_delay*) arg;

	base_task *t = p->get_request ();

	do {
		t->proceed ();

		p->set_response (t);

		p->s_free_execs.post ();

		t = p->get_request (M);

		/* we did not received new task until timeout passed */
		if (! t) break;
	} while (1);

	pthread_detach (pthread_self ());
	p->s_max_threads.post ();
	pthread_exit (NULL);
	p->remove_thread (pthread_self ());

	return (NULL);
}

template <typename S> class scheduler : protected S {
	protected:
	public:
		using S::todo;
		using S::s_reqs;
		using S::m_reqs;
		using S::task_inserted;
		scheduler () {
		}

		~scheduler () {
		}

		template <typename F> auto run (F op) -> task<decltype(op()),F>* {
			task <decltype(op()),F> *t = new task <decltype(op()),F> (op, this);
			m_reqs.wait ();
			todo.push (t);
			s_reqs.post ();
			m_reqs.post ();
			task_inserted (t);
			return (t);
		}
};

#endif