1 module guillotine.providers.sequential; 2 3 import libsnooze; 4 import guillotine.provider; 5 import std.container.slist; 6 import core.sync.mutex : Mutex; 7 import core.thread : Thread; 8 9 version(unittest) 10 { 11 import std.stdio : writeln; 12 } 13 14 public final class Sequential : Provider 15 { 16 private Event event; 17 private SList!(Task) taskQueue; 18 private Mutex taskQueueLock; 19 private Thread runner; 20 private bool running; 21 22 this() 23 { 24 this.event = new Event(); 25 this.taskQueueLock = new Mutex(); 26 this.runner = new Thread(&worker); 27 this.event.ensure(runner); 28 } 29 30 public override void consumeTask(Task task) 31 { 32 version(unittest) 33 { 34 writeln("Sequential: Consuming task '", task, "'..."); 35 } 36 37 // Lock the queue 38 taskQueueLock.lock(); 39 40 // Append the task 41 taskQueue.insertAfter(taskQueue[], task); 42 43 // Unlock the queue 44 taskQueueLock.unlock(); 45 46 // Wake up the runner (just using all to avoid a catch for exception 47 // ... which would occur if wait() hasn't been called atleast once 48 // ... in `runner` 49 event.notifyAll(); 50 51 version(unittest) 52 { 53 writeln("Sequential: Task '", task, "' consumed"); 54 } 55 } 56 57 public void start() 58 { 59 // TODO: Set flag 60 this.running = true; 61 62 runner.start(); 63 } 64 65 public void worker() 66 { 67 // TODO: Add running condition here? 68 while(running) 69 { 70 try 71 { 72 // Sleep till awoken for an enqueue 73 event.wait(); 74 75 // Check if we are running, if not, exit 76 if(!running) 77 { 78 continue; 79 } 80 81 // Lock the queue 82 taskQueueLock.lock(); 83 84 // Potential item to process 85 Task potTask; 86 87 // If there are any items 88 import std.range; 89 if(walkLength(taskQueue[]) > 0) 90 { 91 // Get the front item 92 potTask = taskQueue.front(); 93 94 // Remove the item 95 taskQueue.linearRemoveElement(potTask); 96 } 97 98 // Unlock the queue 99 taskQueueLock.unlock(); 100 101 // If there was an item, then run it now 102 if(potTask !is null) 103 { 104 potTask.run(); 105 } 106 107 // TODO: After wait check if boolean 108 // ... was flipped to stop (no longer run) 109 } 110 catch(InterruptedException e) 111 { 112 // TODO: Handle by doing nothing 113 continue; 114 } 115 catch(SnoozeError e) 116 { 117 // TODO: Stop and handle this 118 } 119 } 120 } 121 122 /** 123 * Stops the provider. 124 * 125 * If there is a task executing currently 126 * then it will finish, any other submitted 127 * tasks will not be run. 128 * 129 * This method will hang till said task 130 * has finished executing. 131 */ 132 public void stop() 133 { 134 // TODO: set flag 135 this.running = false; 136 137 // TODO: notify 138 this.event.notify(runner); 139 140 // TODO: Should we hang here till finished? 141 this.runner.join(); 142 } 143 }