1 module crybot.concurrency.disruptor;
2 
3 import std.math : nextPow2;
4 import std.algorithm : map;
5 import std.range;
6 
7 ///
8 struct ConsumerToken
9 {
10     ubyte[7] dependencies = [ ubyte.max, ubyte.max, ubyte.max,
11         ubyte.max, ubyte.max, ubyte.max, ubyte.max ];
12     ubyte ownSlot;
13 
14     this(ubyte slot) { ownSlot = slot; }
15 
16     void waitFor(const ConsumerToken other)
17     {
18         import std.algorithm.searching : find;
19         dependencies[].find(ubyte.max).front = other.ownSlot;
20     }
21 }
22 
23 /**
24 A single-producer, multiple consumer disruptor implementation
25 */
26 struct Disruptor(T, ulong Size=nextPow2(10_000), ulong Consumers=63)
27 {
28     import core.atomic : atomicLoad, atomicStore, atomicOp, MemoryOrder;
29 
30     /// counters[0] is the producers counter,
31     ulong[Consumers+1] counters;
32 
33     /// the number of registered consumers
34     shared ubyte consumerCount = 0;
35 
36     /// where the data is actually stored
37     T[Size] ringBuffer;
38 
39     /**
40     Returns the last slot the producer has written to
41     or 0 if nothing has been produced yet.
42 
43     The first slot the producer writes to is 1.
44     */
45     ulong producerCount() const shared { return counters[0]; }
46     ulong minConsumerCount() const shared 
47     {
48         import std.algorithm : map, minElement;
49         if (consumerCount == 0)
50             return 0;
51         return counters[1 .. consumerCount+1]
52             .map!((ref x) => x.atomicLoad!(MemoryOrder.acq))
53             .minElement;
54     }
55 
56     /**
57     Iff there is more room in the Disruptor, calls del
58     with a reference to the free slot. The second argument index
59     is the index of the slot. 
60 
61     Returns: 
62         true, if the delegate was called
63         false, otherwise (Disruptor is full)
64     */
65     bool produce(void delegate(ref T slot, ulong index) del) shared
66     {
67         import core.atomic : atomicFence, atomicStore;
68         ulong nextSlot = counters[0] + 1;
69 
70         ulong minConsumer = minConsumerCount();
71         bool full = minConsumer + Size < nextSlot;
72         if (!full)
73         {
74             T[] frame = cast(T[])(ringBuffer);
75             del(frame[nextSlot % Size], nextSlot);
76             counters[0].atomicStore!(MemoryOrder.rel)(nextSlot);
77         }
78         return !full;
79     }
80 
81     /**
82     Consume from the Disruptor. Calls del with the slice of produced but not
83     consumed elements. The argument firstIndex is the index of the
84     first element in slice. 
85 
86     Only calls del if there is something to consume (slice is never empty).
87 
88     Returns: true, if del was called, otherwise false.
89     */
90     bool consume(ConsumerToken token, void delegate(T[] slice, ulong firstIndex) del) shared
91     {
92         import std.range : only, chain;
93         import std.algorithm : map, filter, minElement;
94 
95         ulong max = only(producerCount())
96             .chain(token.dependencies[].filter!(x => x != ubyte.max)
97                 .map!(x => counters[x]))
98             .minElement;
99         ulong myCounter = counters[token.ownSlot].atomicLoad!(MemoryOrder.acq);
100         assert (max >= myCounter);
101         ulong nextToRead = myCounter + 1;
102         if (nextToRead <= max)
103         {
104             const auto start = nextToRead % Size;
105             auto end = (max + 1) % Size;
106             T[] frame = cast(T[]) {
107                 if (start > end)
108                     return ringBuffer[start .. $];
109                 return ringBuffer[start .. end];
110             }();
111 
112             del(frame, nextToRead);
113             counters[token.ownSlot].atomicStore!(MemoryOrder.rel)(max);
114             return true;
115         }
116         return false;
117     }
118 
119     /// Generate a new consumer token. 
120     ConsumerToken createConsumerToken() shared
121     {
122         ubyte token = atomicOp!("+=")(consumerCount, 1);
123         return ConsumerToken(token);
124     }
125 }
126 
127 ///
128 unittest {
129     import std.functional : toDelegate;
130     alias D = Disruptor!int;
131 
132     int testInt = ubyte.max;
133     auto doNothing = (int[] values, ulong idx) {
134         if (!values.empty)
135             testInt = values[0];
136     };
137 
138     shared D d;
139     ConsumerToken consumer1 = d.createConsumerToken();
140     ConsumerToken consumer2 = d.createConsumerToken();
141 
142     assert (!d.consume(consumer1, doNothing));
143     assert (!d.consume(consumer2, doNothing));
144 
145     d.produce((ref int v, ulong _) {
146         v = 1;
147     }.toDelegate());
148 
149     assert (d.consume(consumer1, doNothing));
150     assert (testInt == 1);
151     assert (!d.consume(consumer1, doNothing));
152     testInt = 2;
153     assert (d.consume(consumer2, doNothing));
154     assert (testInt == 1);
155     assert (!d.consume(consumer2, doNothing));
156 }
157 
158 ///
159 unittest
160 {
161     import std.functional : toDelegate;
162     alias D = Disruptor!int;
163 
164     int testInt = ubyte.max;
165     auto doNothing = (int[] values, ulong idx) {
166         if (!values.empty)
167             testInt = values[0];
168     };
169 
170     shared D d;
171     ConsumerToken consumer1 = d.createConsumerToken();
172     ConsumerToken consumer2 = d.createConsumerToken();
173     consumer2.waitFor(consumer1);
174 
175     assert (!d.consume(consumer1, doNothing));
176     assert (!d.consume(consumer2, doNothing));
177 
178     d.produce((ref int v, ulong _) {
179         v = 1;
180     }.toDelegate());
181 
182     testInt = ubyte.max;
183     assert (!d.consume(consumer2, doNothing));
184     assert (testInt == ubyte.max);
185     assert (d.consume(consumer1, doNothing));
186     assert (testInt == 1);
187     assert (!d.consume(consumer1, doNothing));
188     assert (d.consume(consumer2, doNothing));
189 }