Consume from the Disruptor. Calls del with the slice of produced but not consumed elements. The argument firstIndex is the index of the first element in slice.
Generate a new consumer token.
Undocumented in source. Be warned that the author may not have intended to support it.
Iff there is more room in the Disruptor, calls del with a reference to the free slot. The second argument index is the index of the slot.
Returns the last slot the producer has written to or 0 if nothing has been produced yet.
the number of registered consumers
counters[0] is the producers counter,
where the data is actually stored
1 import std.functional : toDelegate; 2 alias D = Disruptor!int; 3 4 int testInt = ubyte.max; 5 auto doNothing = (int[] values, ulong idx) { 6 if (!values.empty) 7 testInt = values[0]; 8 }; 9 10 shared D d; 11 ConsumerToken consumer1 = d.createConsumerToken(); 12 ConsumerToken consumer2 = d.createConsumerToken(); 13 14 assert (!d.consume(consumer1, doNothing)); 15 assert (!d.consume(consumer2, doNothing)); 16 17 d.produce((ref int v, ulong _) { 18 v = 1; 19 }.toDelegate()); 20 21 assert (d.consume(consumer1, doNothing)); 22 assert (testInt == 1); 23 assert (!d.consume(consumer1, doNothing)); 24 testInt = 2; 25 assert (d.consume(consumer2, doNothing)); 26 assert (testInt == 1); 27 assert (!d.consume(consumer2, doNothing));
1 import std.functional : toDelegate; 2 alias D = Disruptor!int; 3 4 int testInt = ubyte.max; 5 auto doNothing = (int[] values, ulong idx) { 6 if (!values.empty) 7 testInt = values[0]; 8 }; 9 10 shared D d; 11 ConsumerToken consumer1 = d.createConsumerToken(); 12 ConsumerToken consumer2 = d.createConsumerToken(); 13 consumer2.waitFor(consumer1); 14 15 assert (!d.consume(consumer1, doNothing)); 16 assert (!d.consume(consumer2, doNothing)); 17 18 d.produce((ref int v, ulong _) { 19 v = 1; 20 }.toDelegate()); 21 22 testInt = ubyte.max; 23 assert (!d.consume(consumer2, doNothing)); 24 assert (testInt == ubyte.max); 25 assert (d.consume(consumer1, doNothing)); 26 assert (testInt == 1); 27 assert (!d.consume(consumer1, doNothing)); 28 assert (d.consume(consumer2, doNothing));
A single-producer, multiple consumer disruptor implementation