1 /**
2 An implementation of the [fowler].
3 
4 A disruptor is glorified ring buffer that allows multiple consumers to
5 read its contents. This implementation is only threadsafe for a single
6 producer. It differs from a normal single producer / multi consumer queue
7 in the following points:
8 
9 $(LIST
10   * Slots are not distributed between consumers. Every consumer can consume each slot.
11   * Slow consumer can catch up by consuming multiple slots at once.
12   * Consumers can coordinate between each other by forming a dependency graph. If
13 consumer A declares to depend on consumer B, it will only be able to read
14 a slot after B has finished consuming it.
15 )
16 
17 To interact with the disruptor a producer just calls [Disruptor.produce],
18 while consumers need to aquire a [ConsumerToken] first. The token tracks
19 the current position of the consumer in the ringbuffer and on which other
20 consumers it depends.
21 
22 Link_References:
23     fowler = [https://martinfowler.com/articles/lmax.html | Disruptor Pattern]
24 */
25 
26 module disruptor.disruptor;
27 
28 import std.math : nextPow2;
29 import std.algorithm : map;
30 import std.range;
31 
32 /**
33 ConsumerToken are used by consumers to interact with the [Disruptor]
34 
35 
36 Consumer must receive exactly one token via a call to
37 `createConsumerToken` and provide it on every call
38 to Disruptor.consume.
39 
40 Consumer tokens are also used to track dependencies between
41 different consumers, e.g. if A may only consume slots that
42 have alreay been consumed by B.
43 
44 ---
45 auto tokenA = disruptor.createConsumerToken();
46 auto tokenB = disruptor.createConsumerToken();
47 tokenA.waitFor(B);
48 ---
49 */
50 struct ConsumerToken
51 {
52     ubyte[7] dependencies = [ ubyte.max, ubyte.max, ubyte.max,
53         ubyte.max, ubyte.max, ubyte.max, ubyte.max ];
54     ubyte ownSlot;
55 
56     this(ubyte slot) { ownSlot = slot; }
57 
58     void waitFor(const ConsumerToken other)
59     {
60         import std.algorithm.searching : find;
61         dependencies[].find(ubyte.max).front = other.ownSlot;
62     }
63 }
64 
65 /**
66 A single-producer, multiple consumer disruptor implementation
67 
68 $(LIST
69         * T = The type of the slots
70         * Size = size of the ring buffer in slots. Rounded up to the next
71            power of 2.
72         * Consumers = number of [ConsumerToken] the disruptor can issue
73 )
74 */
75 struct Disruptor(T, ulong Size=nextPow2(10_000), ulong Consumers=63)
76 {
77     import core.atomic : atomicLoad, atomicStore, atomicOp, MemoryOrder, atomicFence;
78 
79     @disable this(this);
80     @disable this(ref return scope inout Disruptor another);
81 private:
82     // counters[0] is the producer's counter,
83     ulong[Consumers+1] counters;
84 
85     // the number of registered consumers
86     shared ubyte consumerCount = 0;
87 
88     // where the data is actually stored
89     T[Size] ringBuffer = void;
90 
91     /*
92     Returns the last slot the producer has written to
93     or 0 if nothing has been produced yet.
94 
95     The first slot the producer writes to is 1.
96     */
97     ulong producerCount() const shared
98     {
99         return counters[0].atomicLoad!(MemoryOrder.acq);
100     }
101 
102     /*
103     Returns the counter of the slowest consumer
104     */
105     ulong minConsumerCount() const shared
106     {
107         import std.algorithm : map, minElement;
108         if (consumerCount == 0)
109             return 0;
110         return counters[1 .. consumerCount+1]
111             .map!((ref x) => x.atomicLoad!(MemoryOrder.acq))
112             .minElement;
113     }
114 public:
115     void initialize()
116     {
117         foreach(i; 0 .. Size)
118         {
119             import core.lifetime : emplace;
120             emplace(&ringBuffer[i], 0);
121         }
122     }
123 
124     /**
125     Iff there is more room in the Disruptor, calls [#param-del|del]
126     with a reference to the free slot. The second argument [#param-index|index]
127     is the index of the slot.
128 
129     Returns:
130         true, if the delegate was called
131         false, otherwise (Disruptor is full)
132     */
133     bool produce(scope void delegate(ref T slot, ulong index) del) shared
134     {
135         ulong nextSlot = counters[0] + 1;
136 
137         ulong minConsumer = minConsumerCount();
138         bool full = minConsumer + Size < nextSlot;
139         if (!full)
140         {
141             T[] frame = cast(T[])(ringBuffer);
142             static if (__traits(hasMember, T, "reuse"))
143                 frame[nextSlot % Size].reuse();
144             del(frame[nextSlot % Size], nextSlot);
145             counters[0].atomicStore!(MemoryOrder.rel)(nextSlot);
146         }
147         return !full;
148     }
149 
150     /**
151     Consume from the Disruptor. Calls del with the slice of produced but not
152     consumed elements. The argument firstIndex is the index of the
153     first element in slice.
154 
155     Only calls del if there is something to consume (slice is never empty).
156 
157     Returns: true, if del was called, otherwise false.
158     */
159     bool consume(ConsumerToken token, scope void delegate(T[] slice, ulong firstIndex) del) shared
160     {
161         import std.range : only, chain;
162         import std.algorithm : map, filter, minElement;
163 
164         ulong max = only(producerCount())
165             .chain(token.dependencies[].filter!(x => x != ubyte.max)
166                 .map!(x => counters[x]))
167             .minElement;
168         import std.stdio;
169         ulong myCounter = counters[token.ownSlot].atomicLoad!(MemoryOrder.acq);
170         assert (max >= myCounter);
171         ulong nextToRead = myCounter + 1;
172         if (nextToRead <= max)
173         {
174             const auto start = nextToRead % Size;
175             auto end = (max + 1) % Size;
176             T[] frame = cast(T[]) {
177                 if (start > end)
178                     return ringBuffer[start .. $];
179                 return ringBuffer[start .. end];
180             }();
181 
182             del(frame, nextToRead);
183             counters[token.ownSlot].atomicStore!(MemoryOrder.rel)(max);
184             return true;
185         }
186         return false;
187     }
188 
189     /// Generate a new consumer token.
190     ConsumerToken createConsumerToken() shared
191     {
192         ubyte token = atomicOp!("+=")(consumerCount, 1);
193         return ConsumerToken(token);
194     }
195 }
196 
197 ///
198 unittest {
199     import std.functional : toDelegate;
200     alias D = Disruptor!int;
201 
202     int testInt = ubyte.max;
203     auto doNothing = (int[] values, ulong idx) {
204         if (!values.empty)
205             testInt = values[0];
206     };
207 
208     shared D d;
209     ConsumerToken consumer1 = d.createConsumerToken();
210     ConsumerToken consumer2 = d.createConsumerToken();
211 
212     assert (!d.consume(consumer1, doNothing));
213     assert (!d.consume(consumer2, doNothing));
214 
215     d.produce((ref int v, ulong _) {
216         v = 1;
217     }.toDelegate());
218 
219     assert (d.consume(consumer1, doNothing));
220     assert (testInt == 1);
221     assert (!d.consume(consumer1, doNothing));
222     testInt = 2;
223     assert (d.consume(consumer2, doNothing));
224     assert (testInt == 1);
225     assert (!d.consume(consumer2, doNothing));
226 }
227 
228 ///
229 unittest
230 {
231     import std.functional : toDelegate;
232     alias D = Disruptor!int;
233 
234     int testInt = ubyte.max;
235     auto doNothing = (int[] values, ulong idx) {
236         if (!values.empty)
237             testInt = values[0];
238     };
239 
240     shared D d;
241     ConsumerToken consumer1 = d.createConsumerToken();
242     ConsumerToken consumer2 = d.createConsumerToken();
243     consumer2.waitFor(consumer1);
244 
245     assert (!d.consume(consumer1, doNothing));
246     assert (!d.consume(consumer2, doNothing));
247 
248     d.produce((ref int v, ulong _) {
249         v = 1;
250     }.toDelegate());
251 
252     testInt = ubyte.max;
253     assert (!d.consume(consumer2, doNothing));
254     assert (testInt == ubyte.max);
255     assert (d.consume(consumer1, doNothing));
256     assert (testInt == 1);
257     assert (!d.consume(consumer1, doNothing));
258     assert (d.consume(consumer2, doNothing));
259 }