// DU4-Scheduler.cpp // Ales Novak NPRG051 2011/2012 #ifndef __scheduler_h__ #define __scheduler_h__ #include <pthread.h> #include <semaphore.h> #include <queue> #include <list> #include <stdexcept> /** * Class encapsulating POSIX semaphores */ class sem { protected: sem_t s; public: /** * initializes semaphore with value/0 */ sem (unsigned int _val = 0) { int ret = sem_init (&s, 0, _val); if (ret) throw std::runtime_error ("Cannot create semaphore "); } /** * waits (optionally [implicitly] blocking, till the semaphore is > 0, * then decrements it by 1 */ int wait (bool blocking = true) { int r; if (blocking) { r = sem_wait (&s); return (r); } else { return (sem_trywait (&s)); } } /** * increments semaphore by 1 */ void post () { sem_post (&s); } /** * waits for semaphore till specified time passes, returning !0 * when timeout passed */ int timedwait (int milli) { struct timespec ts; ts.tv_sec = milli/1000; ts.tv_sec = (milli*1000)%1000000; return (sem_timedwait (&s, &ts)); } }; /** * Class representing any task, it's the only type the executing threads have */ class base_task { public: base_task (class scheduling_policy *polic) : done(false), sema (NULL), sched (polic) { } /** * if task is done */ bool done; /** * this is NULL or semaphore, if there's a process waiting for this task to * finish (calling "get_result") */ sem *sema; class scheduling_policy *sched; /** * method implemented by inherited template class */ virtual void proceed () = 0; }; class scheduling_policy { public: /** * Encapsulation of thread */ class threadinfo { friend class scheduling_policy; protected: pthread_t pth; class base_task *what; class scheduling_policy *sched; public: threadinfo (scheduling_policy *p) {//: sched (p) { sched = p; } void cancel () { pthread_cancel (pth); } void create (void*(*fce)(void*) = NULL) { if (pthread_create (& pth, NULL, fce?fce:sched->getfunc(), sched)) { throw std::runtime_error ("Cannot create thread"); } } }; virtual void *(*getfunc())(void*) = 0; base_task *get_request (int milli = -1) { if (milli == -1) s_reqs.wait (); else { if (s_reqs.timedwait (milli)) return (NULL); } m_reqs.wait (); base_task *r = todo.front (); todo.pop (); m_reqs.post (); return (r); } void set_response (base_task *t) { m_reqs.wait (); t->done = true; if (t->sema) { t->sema->post (); } m_reqs.post (); } void remove_thread (pthread_t p) { m_reqs.wait (); for (auto it = threads.begin (); it != threads.end () ; it ++) { if ((*it)->pth == p) { threads.erase (it); break; } } m_reqs.post (); } public: std::queue <base_task *> todo; scheduling_policy (int maxthreads) : s_max_threads (maxthreads), s_reqs (0), m_reqs (1) { } virtual void task_inserted (base_task *t) { } sem s_max_threads; sem s_reqs; sem m_reqs; std::list <threadinfo *> threads; }; /** * template representing taking T as type of result of the functor * F type of the functor */ template <typename T, typename F> class task : public base_task { protected: F & func; T result; public: task (F &f, scheduling_policy *p) : base_task (p), func (f) { } void proceed () { result = func (); } /** * Returns computed value */ T get_result () { /** ensures we won't race */ sched->m_reqs.wait (); if (! done) { /** we have to create new semaphore and wait for it * it will be raised by thread, which does (will do) * this task */ sema = new sem (); sched->m_reqs.post (); sema->wait (); delete sema; } else { /** this is so easy, task was finished before we touched it */ sched->m_reqs.post (); } return (result); } }; /************************************************************** ************************************************************* * THREAD_POOL * ************************************************************ * ***********************************************************/ template <int N> class thread_pool : public scheduling_policy { public: static void *thrfunc (void *arg); virtual void *(*getfunc())(void*) { return (thrfunc); } /** * we prepare the pool of executors. They will do the rest */ thread_pool () : scheduling_policy (N) { for (int i = 0; i < N; i ++) { threadinfo *t = new threadinfo (this); t->create (); threads.push_back (t); } } }; template <int N> void *thread_pool<N>::thrfunc (void *arg) { scheduling_policy *p = (scheduling_policy*) arg; do { base_task *t = p->get_request (); t->proceed (); p->set_response (t); } while (1); return (NULL); } /************************************************************** ************************************************************* * ONE_THREAD_PER_TASK * ************************************************************ * ***********************************************************/ template <int N> class one_thread_per_task : public scheduling_policy { public: static void *thrfunc (void *arg); static void *mainthrfunc (void *arg); virtual void *(*getfunc())(void*) { return (thrfunc); } threadinfo *mainth; one_thread_per_task () : scheduling_policy (N) { mainth = new threadinfo (this); mainth->create (mainthrfunc); threads.push_back (mainth); } ~one_thread_per_task () { } }; template <int N> void *one_thread_per_task<N>::mainthrfunc (void *arg) { scheduling_policy *p = (scheduling_policy*) arg; do { p->s_max_threads.wait (); p->s_reqs.wait (); threadinfo *t = new threadinfo (p); t->create (thrfunc); p->threads.push_back (t); p->s_reqs.post (); } while (1); return (NULL); } template <int N> void *one_thread_per_task<N>::thrfunc (void *arg) { scheduling_policy *p = (scheduling_policy*) arg; base_task *t = p->get_request (); t->proceed (); p->set_response (t); pthread_detach (pthread_self ()); p->s_max_threads.post (); p->remove_thread (pthread_self ()); pthread_exit (NULL); return (NULL); } /************************************************************** ************************************************************* * ONE_THREAD_PER_TASK_WITH_DELAY * ************************************************************ * ***********************************************************/ template <int N, int M> class one_thread_per_task_with_delay : public scheduling_policy { public: sem s_free_execs; static void *thrfunc (void *arg); static void *mainthrfunc (void *arg); virtual void *(*getfunc())(void*) { return (thrfunc); } threadinfo *mainth; one_thread_per_task_with_delay () : scheduling_policy (N) { mainth = new threadinfo (this); mainth->create (mainthrfunc); threads.push_back (mainth); } }; template <int N, int M> void *one_thread_per_task_with_delay<N,M>::mainthrfunc (void *arg) { one_thread_per_task_with_delay *p = (one_thread_per_task_with_delay*) arg; do { if (! p->s_free_execs.wait (false)) { /* we succeeded in locking free executor (which finished work * but waits for further requests) */ } else { p->s_max_threads.wait (); p->s_reqs.wait (); threadinfo *t = new threadinfo (p); t->create (thrfunc); p->threads.push_back (t); p->s_reqs.post (); } } while (1); return (NULL); } template <int N, int M> void *one_thread_per_task_with_delay <N,M>::thrfunc (void *arg) { one_thread_per_task_with_delay *p = (one_thread_per_task_with_delay*) arg; base_task *t = p->get_request (); do { t->proceed (); p->set_response (t); p->s_free_execs.post (); t = p->get_request (M); /* we did not received new task until timeout passed */ if (! t) break; } while (1); pthread_detach (pthread_self ()); p->s_max_threads.post (); pthread_exit (NULL); p->remove_thread (pthread_self ()); return (NULL); } template <typename S> class scheduler : protected S { protected: public: using S::todo; using S::s_reqs; using S::m_reqs; using S::task_inserted; scheduler () { } ~scheduler () { } template <typename F> auto run (F op) -> task<decltype(op()),F>* { task <decltype(op()),F> *t = new task <decltype(op()),F> (op, this); m_reqs.wait (); todo.push (t); s_reqs.post (); m_reqs.post (); task_inserted (t); return (t); } }; #endif