1 /** 2 * Executor service for task submission and tracking 3 */ 4 module guillotine.executor; 5 6 import guillotine.future : Future; 7 import guillotine.value; 8 import guillotine.result : Result; 9 import guillotine.provider; 10 import std.traits : isFunction, FunctionTypeOf, ReturnType, arity; 11 12 // TODO: Document 13 private bool isSupportedReturn(alias FuncSymbol)() 14 { 15 return __traits(isSame, ReturnType!(FuncSymbol), int) || 16 __traits(isSame, ReturnType!(FuncSymbol), bool) || 17 __traits(isSame, ReturnType!(FuncSymbol), Object) || 18 __traits(isSame, ReturnType!(FuncSymbol), float) || 19 __traits(isSame, ReturnType!(FuncSymbol), void); 20 } 21 22 // TODO: Document 23 private bool isSupportedFunction(alias FuncSymbol)() 24 { 25 // Arity must be 0 26 return arity!(FuncSymbol) == 0 && isSupportedReturn!(FuncSymbol); 27 } 28 29 /** 30 * This wraps a provided function 31 * and `Future` toghether in a way 32 * such that one can submit this 33 * to a provider as a kind-of `Task` 34 * which when completed will wake up 35 * those waiting on the `Future` 36 */ 37 private class FutureTask : Task 38 { 39 /** 40 * Related future for this task 41 */ 42 private Future future; 43 44 /** 45 * Function to call 46 */ 47 private Value function() func; 48 49 /** 50 * Constructs a new `FutureTask` with the 51 * provided future and function to run 52 * 53 * Params: 54 * future = the `Future` to associate with 55 * func = the function to run 56 */ 57 this(Future future, Value function() func) 58 { 59 this.future = future; 60 this.func = func; 61 } 62 63 /** 64 * The wrapper task that will run and extract 65 * any values/errors and report these back 66 * to the associated `Future` 67 */ 68 public void run() 69 { 70 // Return value (in non-error case) 71 Value retVal; 72 73 // The exception (in error case) 74 Exception errVal; 75 76 try 77 { 78 // Set the future's state to running now 79 import guillotine.future : State; 80 future.state = State.RUNNING; 81 82 // Run the function and obtain its return value (on success) 83 retVal = func(); 84 } 85 catch(Exception e) 86 { 87 errVal = e; 88 } 89 90 if(errVal is null) 91 { 92 future.complete(Result(retVal)); 93 } 94 else 95 { 96 future.error(errVal); 97 } 98 } 99 } 100 101 /** 102 * Generates a `Value`-returning function with 103 * a 0 parameter arity based on an input symbol 104 * of an existing function which has 0 arity 105 * and return a supported type 106 * 107 * Params: 108 * FuncIn = The name of the function to wrap 109 */ 110 private template WorkerFunction(alias FuncIn) 111 { 112 /** 113 * Generated "wrapper" function 114 * 115 * Returns: the `Value` of the result 116 */ 117 Value workerFunc() 118 { 119 alias funcInReturn = ReturnType!(FuncIn); 120 121 Value value; 122 ValueUnion valUnion; 123 124 static if(__traits(isSame, funcInReturn, int)) 125 { 126 valUnion.integer = FuncIn(); 127 } 128 else static if(__traits(isSame, funcInReturn, uint)) 129 { 130 valUnion.uinteger = FuncIn(); 131 } 132 else static if(__traits(isSame, funcInReturn, float)) 133 { 134 valUnion.floating = FuncIn(); 135 } 136 else static if(__traits(isSame, funcInReturn, bool)) 137 { 138 valUnion.boolean = FuncIn(); 139 } 140 else static if(__traits(isSame, funcInReturn, string)) 141 { 142 valUnion.str = FuncIn(); 143 } 144 else static if(__traits(isSame, funcInReturn, Object)) 145 { 146 valUnion.object = FuncIn(); 147 } 148 else static if(__traits(isSame, funcInReturn, void)) 149 { 150 // With no return type you just call it 151 FuncIn(); 152 153 // We create an emoty struct 154 valUnion.none = Empty(); 155 } 156 157 158 159 160 value.value = valUnion; 161 return value; 162 } 163 } 164 165 import guillotine.providers.sequential : Sequential; 166 167 /** 168 * The default `Provider` 169 */ 170 public alias DefaultProvider = Sequential; 171 172 /** 173 * Provides a task submission service upon 174 * which tasks can be submitted and a `Future` 175 * is returned as handle to that submitted task. 176 * 177 * The underlying scheduling/threading mechanism 178 * is provided in the form of a `Provider` 179 */ 180 public class Executor 181 { 182 /** 183 * Underlying thread provider 184 * whereby the tasks will be 185 * pushed into for running 186 */ 187 private Provider provider; 188 189 /** 190 * Constructs a new `Executor` with the given 191 * provider which provides us with some sort 192 * of underlying thread execution mechanism 193 * where the tasks will actually be executed 194 * 195 * Params: 196 * provider = the `Provider` to use 197 */ 198 this(Provider provider) 199 { 200 this.provider = provider; 201 } 202 203 /** 204 * Constructs a new `Executor` wuth the 205 * default provider 206 */ 207 this() 208 { 209 this(new DefaultProvider()); 210 } 211 212 /** 213 * Submits the provided function as a task 214 * and returns a handle to it in the form 215 * of a future 216 * 217 * Returns: the task's `Future` 218 */ 219 public Future submitTask(alias Symbol)() 220 if (isFunction!(Symbol) && isSupportedFunction!(Symbol)) 221 { 222 FutureTask task; 223 224 Value function() ptr; 225 alias func = Symbol; 226 227 228 ptr = &WorkerFunction!(func).workerFunc; 229 230 version(unittest) 231 { 232 import std.stdio; 233 writeln("Generated worker function: ", ptr); 234 } 235 236 // Create the Future 237 Future future = new Future(); 238 239 // Create the Future task 240 task = new FutureTask(future, ptr); 241 242 // Submit the task 243 provider.consumeTask(task); 244 245 version(unittest) 246 { 247 writeln("Just submitted future task: ", future); 248 } 249 250 return future; 251 } 252 } 253 254 version(unittest) 255 { 256 import std.stdio : writeln; 257 import core.thread : Thread, dur; 258 } 259 260 /** 261 * Tests the submission of three tasks using 262 * the `Sequential` provider and with the first 263 * two tasks having values to return and the last 264 * having nothing 265 */ 266 unittest 267 { 268 import guillotine.providers.sequential; 269 import guillotine.executor; 270 import guillotine.future; 271 import guillotine.result; 272 import guillotine.provider; 273 274 Provider provider = new Sequential(); 275 276 // Start the provider so it can execute 277 // submitted tasks 278 provider.start(); 279 280 // Create an executor with this provider 281 Executor t = new Executor(provider); 282 283 284 // Submit a few tasks 285 Future fut1 = t.submitTask!(hi); 286 Future fut2 = t.submitTask!(hiFloat); 287 Future fut3 = t.submitTask!(hiVoid); 288 289 // Await on the first task 290 writeln("Fut1 waiting..."); 291 Result res1 = fut1.await(); 292 writeln("Fut1 done with: '", res1.getValue().value.integer, "'"); 293 assert(res1.getValue().value.integer == 69); 294 295 // Stops the internal task runner thread 296 provider.stop(); 297 } 298 299 version(unittest) 300 { 301 private int hi() 302 { 303 writeln("Let's go hi()!"); 304 305 // Pretend to do some work 306 Thread.sleep(dur!("seconds")(2)); 307 308 return 69; 309 } 310 311 private float hiFloat() 312 { 313 writeln("Let's go hiFloat()!"); 314 315 // Pretend to do some work 316 Thread.sleep(dur!("seconds")(10)); 317 318 return 69.420; 319 } 320 321 private void hiVoid() 322 { 323 writeln("Let's go hiVoid()!"); 324 325 // Pretend to do some work 326 Thread.sleep(dur!("seconds")(10)); 327 } 328 }