Disruptor

A single-producer, multiple consumer disruptor implementation

Members

Functions

consume
bool consume(ConsumerToken token, void delegate(T[] slice, ulong firstIndex) del)

Consume from the Disruptor. Calls del with the slice of produced but not consumed elements. The argument firstIndex is the index of the first element in slice.

createConsumerToken
ConsumerToken createConsumerToken()

Generate a new consumer token.

minConsumerCount
ulong minConsumerCount()

Undocumented in source. Be warned that the author may not have intended to support it.

produce
bool produce(void delegate(ref T slot, ulong index) del)

Iff there is more room in the Disruptor, calls del with a reference to the free slot. The second argument index is the index of the slot.

producerCount
ulong producerCount()

Returns the last slot the producer has written to or 0 if nothing has been produced yet.

Variables

consumerCount
ubyte consumerCount;

the number of registered consumers

counters
ulong[Consumers + 1] counters;

counters[0] is the producers counter,

ringBuffer
T[Size] ringBuffer;

where the data is actually stored

Examples

1 import std.functional : toDelegate;
2 alias D = Disruptor!int;
3 
4 int testInt = ubyte.max;
5 auto doNothing = (int[] values, ulong idx) {
6     if (!values.empty)
7         testInt = values[0];
8 };
9 
10 shared D d;
11 ConsumerToken consumer1 = d.createConsumerToken();
12 ConsumerToken consumer2 = d.createConsumerToken();
13 
14 assert (!d.consume(consumer1, doNothing));
15 assert (!d.consume(consumer2, doNothing));
16 
17 d.produce((ref int v, ulong _) {
18     v = 1;
19 }.toDelegate());
20 
21 assert (d.consume(consumer1, doNothing));
22 assert (testInt == 1);
23 assert (!d.consume(consumer1, doNothing));
24 testInt = 2;
25 assert (d.consume(consumer2, doNothing));
26 assert (testInt == 1);
27 assert (!d.consume(consumer2, doNothing));
1 import std.functional : toDelegate;
2 alias D = Disruptor!int;
3 
4 int testInt = ubyte.max;
5 auto doNothing = (int[] values, ulong idx) {
6     if (!values.empty)
7         testInt = values[0];
8 };
9 
10 shared D d;
11 ConsumerToken consumer1 = d.createConsumerToken();
12 ConsumerToken consumer2 = d.createConsumerToken();
13 consumer2.waitFor(consumer1);
14 
15 assert (!d.consume(consumer1, doNothing));
16 assert (!d.consume(consumer2, doNothing));
17 
18 d.produce((ref int v, ulong _) {
19     v = 1;
20 }.toDelegate());
21 
22 testInt = ubyte.max;
23 assert (!d.consume(consumer2, doNothing));
24 assert (testInt == ubyte.max);
25 assert (d.consume(consumer1, doNothing));
26 assert (testInt == 1);
27 assert (!d.consume(consumer1, doNothing));
28 assert (d.consume(consumer2, doNothing));

Meta