1 /** 2 An implementation of the [fowler]. 3 4 A disruptor is glorified ring buffer that allows multiple consumers to 5 read its contents. This implementation is only threadsafe for a single 6 producer. It differs from a normal single producer / multi consumer queue 7 in the following points: 8 9 $(LIST 10 * Slots are not distributed between consumers. Every consumer can consume each slot. 11 * Slow consumer can catch up by consuming multiple slots at once. 12 * Consumers can coordinate between each other by forming a dependency graph. If 13 consumer A declares to depend on consumer B, it will only be able to read 14 a slot after B has finished consuming it. 15 ) 16 17 To interact with the disruptor a producer just calls [Disruptor.produce], 18 while consumers need to aquire a [ConsumerToken] first. The token tracks 19 the current position of the consumer in the ringbuffer and on which other 20 consumers it depends. 21 22 Link_References: 23 fowler = [https://martinfowler.com/articles/lmax.html | Disruptor Pattern] 24 */ 25 26 module disruptor.disruptor; 27 28 import std.math : nextPow2; 29 import std.algorithm : map; 30 import std.range; 31 32 /** 33 ConsumerToken are used by consumers to interact with the [Disruptor] 34 35 36 Consumer must receive exactly one token via a call to 37 `createConsumerToken` and provide it on every call 38 to Disruptor.consume. 39 40 Consumer tokens are also used to track dependencies between 41 different consumers, e.g. if A may only consume slots that 42 have alreay been consumed by B. 43 44 --- 45 auto tokenA = disruptor.createConsumerToken(); 46 auto tokenB = disruptor.createConsumerToken(); 47 tokenA.waitFor(B); 48 --- 49 */ 50 struct ConsumerToken 51 { 52 ubyte[7] dependencies = [ ubyte.max, ubyte.max, ubyte.max, 53 ubyte.max, ubyte.max, ubyte.max, ubyte.max ]; 54 ubyte ownSlot; 55 56 this(ubyte slot) { ownSlot = slot; } 57 58 void waitFor(const ConsumerToken other) 59 { 60 import std.algorithm.searching : find; 61 dependencies[].find(ubyte.max).front = other.ownSlot; 62 } 63 } 64 65 /** 66 A single-producer, multiple consumer disruptor implementation 67 68 $(LIST 69 * T = The type of the slots 70 * Size = size of the ring buffer in slots. Rounded up to the next 71 power of 2. 72 * Consumers = number of [ConsumerToken] the disruptor can issue 73 ) 74 */ 75 struct Disruptor(T, ulong Size=nextPow2(10_000), ulong Consumers=63) 76 { 77 import core.atomic : atomicLoad, atomicStore, atomicOp, MemoryOrder, atomicFence; 78 79 @disable this(this); 80 @disable this(ref return scope inout Disruptor another); 81 private: 82 // counters[0] is the producer's counter, 83 ulong[Consumers+1] counters; 84 85 // the number of registered consumers 86 shared ubyte consumerCount = 0; 87 88 // where the data is actually stored 89 T[Size] ringBuffer = void; 90 91 /* 92 Returns the last slot the producer has written to 93 or 0 if nothing has been produced yet. 94 95 The first slot the producer writes to is 1. 96 */ 97 ulong producerCount() const shared 98 { 99 return counters[0].atomicLoad!(MemoryOrder.acq); 100 } 101 102 /* 103 Returns the counter of the slowest consumer 104 */ 105 ulong minConsumerCount() const shared 106 { 107 import std.algorithm : map, minElement; 108 if (consumerCount == 0) 109 return 0; 110 return counters[1 .. consumerCount+1] 111 .map!((ref x) => x.atomicLoad!(MemoryOrder.acq)) 112 .minElement; 113 } 114 public: 115 void initialize() 116 { 117 foreach(i; 0 .. Size) 118 { 119 import core.lifetime : emplace; 120 emplace(&ringBuffer[i], 0); 121 } 122 } 123 124 /** 125 Iff there is more room in the Disruptor, calls [#param-del|del] 126 with a reference to the free slot. The second argument [#param-index|index] 127 is the index of the slot. 128 129 Returns: 130 true, if the delegate was called 131 false, otherwise (Disruptor is full) 132 */ 133 bool produce(scope void delegate(ref T slot, ulong index) del) shared 134 { 135 ulong nextSlot = counters[0] + 1; 136 137 ulong minConsumer = minConsumerCount(); 138 bool full = minConsumer + Size < nextSlot; 139 if (!full) 140 { 141 T[] frame = cast(T[])(ringBuffer); 142 static if (__traits(hasMember, T, "reuse")) 143 frame[nextSlot % Size].reuse(); 144 del(frame[nextSlot % Size], nextSlot); 145 counters[0].atomicStore!(MemoryOrder.rel)(nextSlot); 146 } 147 return !full; 148 } 149 150 /** 151 Consume from the Disruptor. Calls del with the slice of produced but not 152 consumed elements. The argument firstIndex is the index of the 153 first element in slice. 154 155 Only calls del if there is something to consume (slice is never empty). 156 157 Returns: true, if del was called, otherwise false. 158 */ 159 bool consume(ConsumerToken token, scope void delegate(T[] slice, ulong firstIndex) del) shared 160 { 161 import std.range : only, chain; 162 import std.algorithm : map, filter, minElement; 163 164 ulong max = only(producerCount()) 165 .chain(token.dependencies[].filter!(x => x != ubyte.max) 166 .map!(x => counters[x])) 167 .minElement; 168 import std.stdio; 169 ulong myCounter = counters[token.ownSlot].atomicLoad!(MemoryOrder.acq); 170 assert (max >= myCounter); 171 ulong nextToRead = myCounter + 1; 172 if (nextToRead <= max) 173 { 174 const auto start = nextToRead % Size; 175 auto end = (max + 1) % Size; 176 T[] frame = cast(T[]) { 177 if (start > end) 178 return ringBuffer[start .. $]; 179 return ringBuffer[start .. end]; 180 }(); 181 182 del(frame, nextToRead); 183 counters[token.ownSlot].atomicStore!(MemoryOrder.rel)(max); 184 return true; 185 } 186 return false; 187 } 188 189 /// Generate a new consumer token. 190 ConsumerToken createConsumerToken() shared 191 { 192 ubyte token = atomicOp!("+=")(consumerCount, 1); 193 return ConsumerToken(token); 194 } 195 } 196 197 /// 198 unittest { 199 import std.functional : toDelegate; 200 alias D = Disruptor!int; 201 202 int testInt = ubyte.max; 203 auto doNothing = (int[] values, ulong idx) { 204 if (!values.empty) 205 testInt = values[0]; 206 }; 207 208 shared D d; 209 ConsumerToken consumer1 = d.createConsumerToken(); 210 ConsumerToken consumer2 = d.createConsumerToken(); 211 212 assert (!d.consume(consumer1, doNothing)); 213 assert (!d.consume(consumer2, doNothing)); 214 215 d.produce((ref int v, ulong _) { 216 v = 1; 217 }.toDelegate()); 218 219 assert (d.consume(consumer1, doNothing)); 220 assert (testInt == 1); 221 assert (!d.consume(consumer1, doNothing)); 222 testInt = 2; 223 assert (d.consume(consumer2, doNothing)); 224 assert (testInt == 1); 225 assert (!d.consume(consumer2, doNothing)); 226 } 227 228 /// 229 unittest 230 { 231 import std.functional : toDelegate; 232 alias D = Disruptor!int; 233 234 int testInt = ubyte.max; 235 auto doNothing = (int[] values, ulong idx) { 236 if (!values.empty) 237 testInt = values[0]; 238 }; 239 240 shared D d; 241 ConsumerToken consumer1 = d.createConsumerToken(); 242 ConsumerToken consumer2 = d.createConsumerToken(); 243 consumer2.waitFor(consumer1); 244 245 assert (!d.consume(consumer1, doNothing)); 246 assert (!d.consume(consumer2, doNothing)); 247 248 d.produce((ref int v, ulong _) { 249 v = 1; 250 }.toDelegate()); 251 252 testInt = ubyte.max; 253 assert (!d.consume(consumer2, doNothing)); 254 assert (testInt == ubyte.max); 255 assert (d.consume(consumer1, doNothing)); 256 assert (testInt == 1); 257 assert (!d.consume(consumer1, doNothing)); 258 assert (d.consume(consumer2, doNothing)); 259 }