1 module crybot.concurrency.disruptor; 2 3 import std.math : nextPow2; 4 import std.algorithm : map; 5 import std.range; 6 7 /// 8 struct ConsumerToken 9 { 10 ubyte[7] dependencies = [ ubyte.max, ubyte.max, ubyte.max, 11 ubyte.max, ubyte.max, ubyte.max, ubyte.max ]; 12 ubyte ownSlot; 13 14 this(ubyte slot) { ownSlot = slot; } 15 16 void waitFor(const ConsumerToken other) 17 { 18 import std.algorithm.searching : find; 19 dependencies[].find(ubyte.max).front = other.ownSlot; 20 } 21 } 22 23 /** 24 A single-producer, multiple consumer disruptor implementation 25 */ 26 struct Disruptor(T, ulong Size=nextPow2(10_000), ulong Consumers=63) 27 { 28 import core.atomic : atomicLoad, atomicStore, atomicOp, MemoryOrder; 29 30 /// counters[0] is the producers counter, 31 ulong[Consumers+1] counters; 32 33 /// the number of registered consumers 34 shared ubyte consumerCount = 0; 35 36 /// where the data is actually stored 37 T[Size] ringBuffer; 38 39 /** 40 Returns the last slot the producer has written to 41 or 0 if nothing has been produced yet. 42 43 The first slot the producer writes to is 1. 44 */ 45 ulong producerCount() const shared { return counters[0]; } 46 ulong minConsumerCount() const shared 47 { 48 import std.algorithm : map, minElement; 49 if (consumerCount == 0) 50 return 0; 51 return counters[1 .. consumerCount+1] 52 .map!((ref x) => x.atomicLoad!(MemoryOrder.acq)) 53 .minElement; 54 } 55 56 /** 57 Iff there is more room in the Disruptor, calls del 58 with a reference to the free slot. The second argument index 59 is the index of the slot. 60 61 Returns: 62 true, if the delegate was called 63 false, otherwise (Disruptor is full) 64 */ 65 bool produce(void delegate(ref T slot, ulong index) del) shared 66 { 67 import core.atomic : atomicFence, atomicStore; 68 ulong nextSlot = counters[0] + 1; 69 70 ulong minConsumer = minConsumerCount(); 71 bool full = minConsumer + Size < nextSlot; 72 if (!full) 73 { 74 T[] frame = cast(T[])(ringBuffer); 75 del(frame[nextSlot % Size], nextSlot); 76 counters[0].atomicStore!(MemoryOrder.rel)(nextSlot); 77 } 78 return !full; 79 } 80 81 /** 82 Consume from the Disruptor. Calls del with the slice of produced but not 83 consumed elements. The argument firstIndex is the index of the 84 first element in slice. 85 86 Only calls del if there is something to consume (slice is never empty). 87 88 Returns: true, if del was called, otherwise false. 89 */ 90 bool consume(ConsumerToken token, void delegate(T[] slice, ulong firstIndex) del) shared 91 { 92 import std.range : only, chain; 93 import std.algorithm : map, filter, minElement; 94 95 ulong max = only(producerCount()) 96 .chain(token.dependencies[].filter!(x => x != ubyte.max) 97 .map!(x => counters[x])) 98 .minElement; 99 ulong myCounter = counters[token.ownSlot].atomicLoad!(MemoryOrder.acq); 100 assert (max >= myCounter); 101 ulong nextToRead = myCounter + 1; 102 if (nextToRead <= max) 103 { 104 const auto start = nextToRead % Size; 105 auto end = (max + 1) % Size; 106 T[] frame = cast(T[]) { 107 if (start > end) 108 return ringBuffer[start .. $]; 109 return ringBuffer[start .. end]; 110 }(); 111 112 del(frame, nextToRead); 113 counters[token.ownSlot].atomicStore!(MemoryOrder.rel)(max); 114 return true; 115 } 116 return false; 117 } 118 119 /// Generate a new consumer token. 120 ConsumerToken createConsumerToken() shared 121 { 122 ubyte token = atomicOp!("+=")(consumerCount, 1); 123 return ConsumerToken(token); 124 } 125 } 126 127 /// 128 unittest { 129 import std.functional : toDelegate; 130 alias D = Disruptor!int; 131 132 int testInt = ubyte.max; 133 auto doNothing = (int[] values, ulong idx) { 134 if (!values.empty) 135 testInt = values[0]; 136 }; 137 138 shared D d; 139 ConsumerToken consumer1 = d.createConsumerToken(); 140 ConsumerToken consumer2 = d.createConsumerToken(); 141 142 assert (!d.consume(consumer1, doNothing)); 143 assert (!d.consume(consumer2, doNothing)); 144 145 d.produce((ref int v, ulong _) { 146 v = 1; 147 }.toDelegate()); 148 149 assert (d.consume(consumer1, doNothing)); 150 assert (testInt == 1); 151 assert (!d.consume(consumer1, doNothing)); 152 testInt = 2; 153 assert (d.consume(consumer2, doNothing)); 154 assert (testInt == 1); 155 assert (!d.consume(consumer2, doNothing)); 156 } 157 158 /// 159 unittest 160 { 161 import std.functional : toDelegate; 162 alias D = Disruptor!int; 163 164 int testInt = ubyte.max; 165 auto doNothing = (int[] values, ulong idx) { 166 if (!values.empty) 167 testInt = values[0]; 168 }; 169 170 shared D d; 171 ConsumerToken consumer1 = d.createConsumerToken(); 172 ConsumerToken consumer2 = d.createConsumerToken(); 173 consumer2.waitFor(consumer1); 174 175 assert (!d.consume(consumer1, doNothing)); 176 assert (!d.consume(consumer2, doNothing)); 177 178 d.produce((ref int v, ulong _) { 179 v = 1; 180 }.toDelegate()); 181 182 testInt = ubyte.max; 183 assert (!d.consume(consumer2, doNothing)); 184 assert (testInt == ubyte.max); 185 assert (d.consume(consumer1, doNothing)); 186 assert (testInt == 1); 187 assert (!d.consume(consumer1, doNothing)); 188 assert (d.consume(consumer2, doNothing)); 189 }