1 /** 2 * Executor service for task submission and tracking 3 */ 4 module guillotine.executor; 5 6 import guillotine.future : Future; 7 import guillotine.value; 8 import guillotine.result : Result; 9 import guillotine.provider; 10 import std.traits : isFunction, isDelegate, FunctionTypeOf, ReturnType, arity; 11 import std.traits : isAssignable; 12 import std.functional : toDelegate; 13 14 // TODO: Document 15 private bool isSupportedReturn(alias FuncSymbol)() 16 { 17 return __traits(isSame, ReturnType!(FuncSymbol), int) || 18 __traits(isSame, ReturnType!(FuncSymbol), bool) || 19 isAssignable!(Object, ReturnType!(FuncSymbol)) || 20 __traits(isSame, ReturnType!(FuncSymbol), float) || 21 __traits(isSame, ReturnType!(FuncSymbol), void); 22 } 23 24 // TODO: Document 25 private bool isSupportedFunction(alias FuncSymbol)() 26 { 27 // Arity must be 0 28 return arity!(FuncSymbol) == 0 && isSupportedReturn!(FuncSymbol); 29 } 30 31 /** 32 * This wraps a provided function 33 * and `Future` toghether in a way 34 * such that one can submit this 35 * to a provider as a kind-of `Task` 36 * which when completed will wake up 37 * those waiting on the `Future` 38 */ 39 private class FutureTask : Task 40 { 41 /** 42 * Related future for this task 43 */ 44 private Future future; 45 46 /** 47 * Function to call 48 */ 49 private Value delegate() func; 50 51 /** 52 * Constructs a new `FutureTask` with the 53 * provided future and function to run 54 * 55 * Params: 56 * future = the `Future` to associate with 57 * func = the function to run 58 */ 59 this(Future future, Value delegate() func) 60 { 61 this.future = future; 62 this.func = func; 63 } 64 65 /** 66 * The wrapper task that will run and extract 67 * any values/errors and report these back 68 * to the associated `Future` 69 */ 70 public void run() 71 { 72 // Return value (in non-error case) 73 Value retVal; 74 75 // The exception (in error case) 76 Exception errVal; 77 78 try 79 { 80 // Set the future's state to running now 81 import guillotine.future : State; 82 future.state = State.RUNNING; 83 84 // Run the function and obtain its return value (on success) 85 retVal = func(); 86 } 87 catch(Exception e) 88 { 89 errVal = e; 90 } 91 92 if(errVal is null) 93 { 94 future.complete(Result(retVal)); 95 } 96 else 97 { 98 future.error(errVal); 99 } 100 } 101 } 102 103 /** 104 * Generates a `Value`-returning function with 105 * a 0 parameter arity based on an input symbol 106 * of an existing function which has 0 arity 107 * and return a supported type 108 * 109 * Params: 110 * FuncIn = The name of the function to wrap 111 */ 112 private template WorkerFunction(alias FuncIn) 113 { 114 /** 115 * Generated "wrapper" function 116 * 117 * Returns: the `Value` of the result 118 */ 119 Value workerFunc() 120 { 121 alias funcInReturn = ReturnType!(FuncIn); 122 123 Value value; 124 ValueUnion valUnion; 125 126 static if(__traits(isSame, funcInReturn, int)) 127 { 128 valUnion.integer = FuncIn(); 129 } 130 else static if(__traits(isSame, funcInReturn, uint)) 131 { 132 valUnion.uinteger = FuncIn(); 133 } 134 else static if(__traits(isSame, funcInReturn, float)) 135 { 136 valUnion.floating = FuncIn(); 137 } 138 else static if(__traits(isSame, funcInReturn, bool)) 139 { 140 valUnion.boolean = FuncIn(); 141 } 142 else static if(__traits(isSame, funcInReturn, string)) 143 { 144 valUnion.str = FuncIn(); 145 } 146 else static if(isAssignable!(Object, funcInReturn)) 147 { 148 valUnion.object = FuncIn(); 149 } 150 else static if(__traits(isSame, funcInReturn, void)) 151 { 152 // With no return type you just call it 153 FuncIn(); 154 155 // We create an emoty struct 156 valUnion.none = Empty(); 157 } 158 159 160 161 162 value.value = valUnion; 163 return value; 164 } 165 } 166 167 import guillotine.providers.sequential : Sequential; 168 169 /** 170 * The default `Provider` 171 */ 172 public alias DefaultProvider = Sequential; 173 174 /** 175 * Provides a task submission service upon 176 * which tasks can be submitted and a `Future` 177 * is returned as handle to that submitted task. 178 * 179 * The underlying scheduling/threading mechanism 180 * is provided in the form of a `Provider` 181 */ 182 public class Executor 183 { 184 /** 185 * Underlying thread provider 186 * whereby the tasks will be 187 * pushed into for running 188 */ 189 private Provider provider; 190 191 /** 192 * Constructs a new `Executor` with the given 193 * provider which provides us with some sort 194 * of underlying thread execution mechanism 195 * where the tasks will actually be executed 196 * 197 * Params: 198 * provider = the `Provider` to use 199 */ 200 this(Provider provider) 201 { 202 this.provider = provider; 203 } 204 205 /** 206 * Constructs a new `Executor` wuth the 207 * default provider 208 */ 209 this() 210 { 211 this(new DefaultProvider()); 212 } 213 214 /** 215 * Submits the provided function as a task 216 * and returns a handle to it in the form 217 * of a future 218 * 219 * Returns: the task's `Future` 220 */ 221 public Future submitTask(alias Symbol)() 222 if ((isFunction!(Symbol) || isDelegate!(Symbol)) && isSupportedFunction!(Symbol)) 223 { 224 FutureTask task; 225 226 Value delegate() ptr; 227 alias func = Symbol; 228 229 static if(isDelegate!(Symbol)) 230 { 231 ptr = &WorkerFunction!(func).workerFunc; 232 } 233 else 234 { 235 ptr = toDelegate(&WorkerFunction!(func).workerFunc); 236 } 237 238 version(unittest) 239 { 240 import std.stdio; 241 writeln("Generated worker function: ", ptr); 242 } 243 244 // Create the Future 245 Future future = new Future(); 246 247 // Create the Future task 248 task = new FutureTask(future, ptr); 249 250 // Submit the task 251 provider.consumeTask(task); 252 253 version(unittest) 254 { 255 writeln("Just submitted future task: ", future); 256 } 257 258 return future; 259 } 260 } 261 262 version(unittest) 263 { 264 import std.stdio : writeln; 265 import core.thread : Thread, dur; 266 } 267 268 /** 269 * Tests the submission of three tasks using 270 * the `Sequential` provider and with the first 271 * two tasks having values to return and the last 272 * having nothing 273 */ 274 unittest 275 { 276 import guillotine.providers.sequential; 277 import guillotine.executor; 278 import guillotine.future; 279 import guillotine.result; 280 import guillotine.provider; 281 282 Provider provider = new Sequential(); 283 284 // Start the provider so it can execute 285 // submitted tasks 286 provider.start(); 287 288 // Create an executor with this provider 289 Executor t = new Executor(provider); 290 291 292 // Submit a few tasks 293 Future fut1 = t.submitTask!(hi); 294 Future fut2 = t.submitTask!(hiFloat); 295 Future fut3 = t.submitTask!(hiVoid); 296 297 // Await on the first task 298 writeln("Fut1 waiting..."); 299 Result res1 = fut1.await(); 300 writeln("Fut1 done with: '", res1.getValue().value.integer, "'"); 301 assert(res1.getValue().value.integer == 69); 302 303 // Stops the internal task runner thread 304 provider.stop(); 305 } 306 307 version(unittest) 308 { 309 private int hi() 310 { 311 writeln("Let's go hi()!"); 312 313 // Pretend to do some work 314 Thread.sleep(dur!("seconds")(2)); 315 316 return 69; 317 } 318 319 private float hiFloat() 320 { 321 writeln("Let's go hiFloat()!"); 322 323 // Pretend to do some work 324 Thread.sleep(dur!("seconds")(10)); 325 326 return 69.420; 327 } 328 329 private void hiVoid() 330 { 331 writeln("Let's go hiVoid()!"); 332 333 // Pretend to do some work 334 Thread.sleep(dur!("seconds")(10)); 335 } 336 }