1 /** 
2  * Serial execution strategy
3  */
4 module guillotine.providers.sequential;
5 
6 import libsnooze;
7 import guillotine.provider;
8 import std.container.slist;
9 import std.range : walkLength;
10 import core.sync.mutex : Mutex;
11 import core.thread : Thread;
12 import guillotine.exceptions;
13 
14 version(unittest)
15 {
16     import std.stdio : writeln;
17 }
18 
19 /** 
20  * Provides sequential or "serial" execution
21  * on a singular thread of the consumed tasks
22  */
23 public final class Sequential : Provider
24 {
25     private Event event;
26     private SList!(Task) taskQueue;
27     private Mutex taskQueueLock;
28     private Thread runner;
29     private bool running;
30 
31     /** 
32      * Constricts a new `Sequential` provider
33      */
34     this()
35     {
36         this.event = new Event();
37         this.taskQueueLock = new Mutex();
38         this.runner = new Thread(&worker);
39         this.event.ensure(runner);
40     }
41 
42     /** 
43      * Consumes the provided task for
44      * execution in the near future
45      *
46      * Params:
47      *   task = the `Task` to consume
48      */
49     public override void consumeTask(Task task)
50     {
51         version(unittest)
52         {
53             writeln("Sequential: Consuming task '", task, "'...");
54         }
55 
56         // Lock the queue
57         taskQueueLock.lock();
58 
59         // Append the task
60         taskQueue.insertAfter(taskQueue[], task);
61 
62         // Unlock the queue
63         taskQueueLock.unlock();
64 
65         // Wake up the runner (just using all to avoid a catch for exception
66         // ... which would occur if wait() hasn't been called atleast once
67         // ... in `runner`
68         event.notifyAll();
69 
70         version(unittest)
71         {
72             writeln("Sequential: Task '", task, "' consumed");
73         }
74     }
75 
76     /** 
77      * Starts the provider's execution thread
78      */
79     public void start()
80     {
81         // Set the running flag to true
82         this.running = true;
83 
84         // Start the runner thread
85         runner.start();
86     }   
87 
88     public void worker()
89     {
90         // TODO: Add running condition here?
91         while(running)
92         {
93             try
94             {
95                 // Sleep till awoken for an enqueue
96                 event.wait();
97 
98                 // Check if we are running, if not, exit
99                 if(!running)
100                 {
101                     continue;
102                 }
103 
104                 // Lock the queue
105                 taskQueueLock.lock();
106 
107                 // Potential item to process
108                 Task potTask;
109 
110                 // If there are any items
111                 if(walkLength(taskQueue[]) > 0)
112                 {
113                     // Get the front item
114                     potTask = taskQueue.front();
115 
116                     // Remove the item
117                     taskQueue.linearRemoveElement(potTask);
118                 }
119 
120                 // Unlock the queue
121                 taskQueueLock.unlock();
122 
123                 // If there was an item, then run it now
124                 if(potTask !is null)
125                 {
126                     potTask.run();
127                 }
128 
129             }
130             catch(InterruptedException e)
131             {
132                 // Handle by doing nothing, retry wait()
133                 continue;
134             }
135             catch(SnoozeError e)
136             {
137                 // TODO: Stop and handle this
138             }
139         }
140     }
141 
142     /** 
143      * Stops the provider.
144      *
145      * If there is a task executing currently
146      * then it will finish, any other submitted
147      * tasks will not be run.
148      *
149      * This method will hang till said task
150      * has finished executing.
151      *
152      * Throws:
153      *   `GuillotineException` on error stopping
154      * the worker
155      */
156     public void stop()
157     {
158         // Set running flag to false
159         this.running = false;
160 
161         try
162         {
163             // Notify the sleeping worker to wake up
164             this.event.notify(runner);
165         }
166         catch(SnoozeError e)
167         {
168             throw new GuillotineException("Error notifying() sleeping worker in stop()");
169         }
170 
171         // Wait for the runner thread to fully exit
172         this.runner.join();
173 
174         // TODO: Destroy the libsnooze event here
175     }
176 }