1 module guillotine.providers.sequential;
2 
3 import libsnooze;
4 import guillotine.provider;
5 import std.container.slist;
6 import core.sync.mutex : Mutex;
7 import core.thread : Thread;
8 
9 version(unittest)
10 {
11     import std.stdio : writeln;
12 }
13 
14 public final class Sequential : Provider
15 {
16     private Event event;
17     private SList!(Task) taskQueue;
18     private Mutex taskQueueLock;
19     private Thread runner;
20     private bool running;
21 
22     this()
23     {
24         this.event = new Event();
25         this.taskQueueLock = new Mutex();
26         this.runner = new Thread(&worker);
27         this.event.ensure(runner);
28     }
29 
30     public override void consumeTask(Task task)
31     {
32         version(unittest)
33         {
34             writeln("Sequential: Consuming task '", task, "'...");
35         }
36 
37         // Lock the queue
38         taskQueueLock.lock();
39 
40         // Append the task
41         taskQueue.insertAfter(taskQueue[], task);
42 
43         // Unlock the queue
44         taskQueueLock.unlock();
45 
46         // Wake up the runner (just using all to avoid a catch for exception
47         // ... which would occur if wait() hasn't been called atleast once
48         // ... in `runner`
49         event.notifyAll();
50 
51         version(unittest)
52         {
53             writeln("Sequential: Task '", task, "' consumed");
54         }
55     }
56 
57     public void start()
58     {
59         // TODO: Set flag
60         this.running = true;
61 
62         runner.start();
63     }   
64 
65     public void worker()
66     {
67         // TODO: Add running condition here?
68         while(running)
69         {
70             try
71             {
72                 // Sleep till awoken for an enqueue
73                 event.wait();
74 
75                 // Check if we are running, if not, exit
76                 if(!running)
77                 {
78                     continue;
79                 }
80 
81                 // Lock the queue
82                 taskQueueLock.lock();
83 
84                 // Potential item to process
85                 Task potTask;
86 
87                 // If there are any items
88                 import std.range;
89                 if(walkLength(taskQueue[]) > 0)
90                 {
91                     // Get the front item
92                     potTask = taskQueue.front();
93 
94                     // Remove the item
95                     taskQueue.linearRemoveElement(potTask);
96                 }
97 
98                 // Unlock the queue
99                 taskQueueLock.unlock();
100 
101                 // If there was an item, then run it now
102                 if(potTask !is null)
103                 {
104                     potTask.run();
105                 }
106 
107                 // TODO: After wait check if boolean
108                 // ... was flipped to stop (no longer run)
109             }
110             catch(InterruptedException e)
111             {
112                 // TODO: Handle by doing nothing
113                 continue;
114             }
115             catch(SnoozeError e)
116             {
117                 // TODO: Stop and handle this
118             }
119         }
120     }
121 
122     /** 
123      * Stops the provider.
124      *
125      * If there is a task executing currently
126      * then it will finish, any other submitted
127      * tasks will not be run.
128      *
129      * This method will hang till said task
130      * has finished executing.
131      */
132     public void stop()
133     {
134         // TODO: set flag
135         this.running = false;
136 
137         // TODO: notify
138         this.event.notify(runner);
139 
140         // TODO: Should we hang here till finished?
141         this.runner.join();
142     }
143 }