1 /** 2 * Serial execution strategy 3 */ 4 module guillotine.providers.sequential; 5 6 import libsnooze; 7 import guillotine.provider; 8 import std.container.slist; 9 import std.range : walkLength; 10 import core.sync.mutex : Mutex; 11 import core.thread : Thread; 12 import guillotine.exceptions; 13 14 version(unittest) 15 { 16 import std.stdio : writeln; 17 } 18 19 /** 20 * Provides sequential or "serial" execution 21 * on a singular thread of the consumed tasks 22 */ 23 public final class Sequential : Provider 24 { 25 private Event event; 26 private SList!(Task) taskQueue; 27 private Mutex taskQueueLock; 28 private Thread runner; 29 private bool running; 30 31 /** 32 * Constricts a new `Sequential` provider 33 */ 34 this() 35 { 36 this.event = new Event(); 37 this.taskQueueLock = new Mutex(); 38 this.runner = new Thread(&worker); 39 this.event.ensure(runner); 40 } 41 42 /** 43 * Consumes the provided task for 44 * execution in the near future 45 * 46 * Params: 47 * task = the `Task` to consume 48 */ 49 public override void consumeTask(Task task) 50 { 51 version(unittest) 52 { 53 writeln("Sequential: Consuming task '", task, "'..."); 54 } 55 56 // Lock the queue 57 taskQueueLock.lock(); 58 59 // Append the task 60 taskQueue.insertAfter(taskQueue[], task); 61 62 // Unlock the queue 63 taskQueueLock.unlock(); 64 65 // Wake up the runner (just using all to avoid a catch for exception 66 // ... which would occur if wait() hasn't been called atleast once 67 // ... in `runner` 68 event.notifyAll(); 69 70 version(unittest) 71 { 72 writeln("Sequential: Task '", task, "' consumed"); 73 } 74 } 75 76 /** 77 * Starts the provider's execution thread 78 */ 79 public void start() 80 { 81 // Set the running flag to true 82 this.running = true; 83 84 // Start the runner thread 85 runner.start(); 86 } 87 88 public void worker() 89 { 90 // TODO: Add running condition here? 91 while(running) 92 { 93 try 94 { 95 // Sleep till awoken for an enqueue 96 event.wait(); 97 98 // Check if we are running, if not, exit 99 if(!running) 100 { 101 continue; 102 } 103 104 // Lock the queue 105 taskQueueLock.lock(); 106 107 // Potential item to process 108 Task potTask; 109 110 // If there are any items 111 if(walkLength(taskQueue[]) > 0) 112 { 113 // Get the front item 114 potTask = taskQueue.front(); 115 116 // Remove the item 117 taskQueue.linearRemoveElement(potTask); 118 } 119 120 // Unlock the queue 121 taskQueueLock.unlock(); 122 123 // If there was an item, then run it now 124 if(potTask !is null) 125 { 126 potTask.run(); 127 } 128 129 } 130 catch(InterruptedException e) 131 { 132 // Handle by doing nothing, retry wait() 133 continue; 134 } 135 catch(SnoozeError e) 136 { 137 // TODO: Stop and handle this 138 } 139 } 140 } 141 142 /** 143 * Stops the provider. 144 * 145 * If there is a task executing currently 146 * then it will finish, any other submitted 147 * tasks will not be run. 148 * 149 * This method will hang till said task 150 * has finished executing. 151 * 152 * Throws: 153 * `GuillotineException` on error stopping 154 * the worker 155 */ 156 public void stop() 157 { 158 // Set running flag to false 159 this.running = false; 160 161 try 162 { 163 // Notify the sleeping worker to wake up 164 this.event.notify(runner); 165 } 166 catch(SnoozeError e) 167 { 168 throw new GuillotineException("Error notifying() sleeping worker in stop()"); 169 } 170 171 // Wait for the runner thread to fully exit 172 this.runner.join(); 173 174 // TODO: Destroy the libsnooze event here 175 } 176 }