1 module nbuff.buffer;
2 
3 import std.string;
4 import std.array;
5 import std.algorithm;
6 import std.conv;
7 import std.range;
8 import std.stdio;
9 import std.traits;
10 import std.format;
11 import core.exception;
12 import std.exception;
13 import std.range.primitives;
14 import std.experimental.logger;
15 
16 import core.memory: GC;
17 
18 private import std.experimental.allocator;
19 private import std.experimental.allocator.mallocator : Mallocator;
20 
21 ///
22 // network buffer
23 ///
24 
25 /// Range Empty exception
26 static immutable Exception RangeEmpty = new Exception("try to pop from empty Buffer"); // @suppress(dscanner.style.phobos_naming_convention) // @suppress(dscanner.style.phobos_naming_convention)
27 /// Requested index is out of range
28 static immutable Exception IndexOutOfRange = new Exception("Index out of range");
29 /// Buffer internal struct problem
30 static immutable Exception BufferError = new Exception("Buffer internal struct corrupted");
31 
32 // goals
33 // 1 minomal data copy
34 // 2 Range interface
35 // 3 minimal footprint
36 
37 
38 public alias BufferChunk =       immutable(ubyte)[];
39 public alias BufferChunksArray = immutable(BufferChunk)[];
40 
41 debug(nbuff) @safe @nogc nothrow
42 {
43     import std.experimental.logger;
44     package void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow
45     {
46         bool osx,ldc;
47         version(OSX)
48         {
49             osx = true;
50         }
51         version(LDC)
52         {
53             ldc = true;
54         }
55         if (!osx || !ldc)
56         {
57             // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240
58             import core.thread;
59             () @trusted @nogc nothrow
60             {
61                 try
62                 {
63                     //debug(nbuff)writefln("[%x] %s:%d " ~ f, Thread.getThis().id(), file, line, args);
64                 }
65                 catch(Exception e)
66                 {
67                     () @trusted @nogc nothrow
68                     {
69                         try
70                         {
71                             //debug(nbuff)errorf("[%x] %s:%d Exception: %s", Thread.getThis().id(), file, line, e);
72                         }
73                         catch
74                         {
75                         }
76                     }();
77                 }
78             }();
79         }
80     }    
81 }
82 
83 debug(nbuff)
84 {
85     void safe_printf(A...)(A a) @trusted
86     {
87         import core.stdc.stdio: printf;
88         //printf(&a[0][0], a[1..$]);
89     }
90 }
91 
92 package static MemPool _mempool;
93 static this()
94 {
95     for(int i=0;i<MemPool.IndexLimit;i++)
96     {
97         _mempool._poolSize[i] = 1024;
98         _mempool._pools[i] = Mallocator.instance.makeArray!(ubyte[])(1024);
99     }
100 }
101 private static bool in_exit;
102 static ~this()
103 {
104     in_exit = true;
105     for(int i=0;i<MemPool.IndexLimit;i++)
106     {
107         for(int j=0; j < _mempool._mark[i]; j++)
108         {
109             Mallocator.instance.dispose(_mempool._pools[i][j]);
110         }
111         _mempool._mark[i] = 0;
112         Mallocator.instance.dispose(_mempool._pools[i]);
113     }
114 }
115 
116 alias allocator = Mallocator.instance;
117 static immutable Exception MemPoolException = new Exception("MemPoolException");
118 static immutable Exception NbuffError = new Exception("NbuffError");
119 struct MemPool
120 {
121     // class MemPoolException: Exception
122     // {
123     //     this(string msg) @nogc @safe
124     //     {
125     //         super(msg);
126     //     }
127     // }
128 
129     import core.bitop;
130     private
131     {
132         enum MinSize = 64;
133         enum MaxSize = 64*1024;
134         enum IndexLimit = bsr(MaxSize);
135         enum MaxPoolWidth = 64*1024;
136         alias ChunkInPool = ubyte[];
137         alias Pool = ChunkInPool[];
138         Pool[IndexLimit]    _pools;
139         size_t[IndexLimit]  _poolSize;
140         size_t[IndexLimit]  _mark;
141     }
142     ~this() @safe @nogc
143     {
144         for(int i=0;i<MemPool.IndexLimit;i++)
145         {
146             () @trusted {
147                 for(int j=0; j < _mark[i]; j++)
148                 {
149                     allocator.deallocate(_pools[i][j]);
150                 }
151                 _mark[i] = 0;
152                 allocator.deallocate(_pools[i]);
153             }();
154         }
155     }
156     ChunkInPool alloc(size_t size) @nogc @safe
157     {
158         if ( size < MinSize )
159         {
160             size = MinSize;
161         }
162         if (size>MaxSize)
163         {
164             throw MemPoolException;
165         }
166         immutable i = bsr(size);
167         immutable index = _mark[i];
168         if (index == 0)
169         {
170             auto b = allocator.makeArray!(ubyte)(2<<i);
171             debug(nbuff) safe_tracef("allocated %d new bytes(because pool[%d] empty) at %x", size, i, &b[0]);
172             return b;
173         }
174         auto b = _pools[i][index-1];
175         _mark[i]--;
176         assert(_mark[i]>=0);
177         debug(nbuff) safe_tracef("allocated chunk from pool %d size %d", i, size);
178         return b;
179     }
180     void free(ChunkInPool c, size_t size) @nogc @trusted
181     {
182         if ( size < MinSize )
183         {
184             size = MinSize;
185         }
186         if (size>MaxSize)
187         {
188             throw MemPoolException;
189         }
190         if (in_exit)
191         {
192             // mem pool can be destroyed already
193             allocator.deallocate(c);
194             return;
195         }
196         immutable pool_index = bsr(size);
197         immutable index = _mark[pool_index];
198         if (index<_poolSize[pool_index])
199         {
200             _pools[pool_index][index] = c;
201             _mark[pool_index]++;
202             debug(nbuff) safe_tracef("freed to pool[%d], next position %d, size(%d)", pool_index, _mark[pool_index], c.length);
203         }
204         else if (_poolSize[pool_index] < MaxPoolWidth)
205         {
206             // double size
207             debug(nbuff) safe_tracef("double pool %d from %d (requested index=%d)", pool_index, _poolSize[pool_index], index);
208             auto ok = allocator.expandArray!(ubyte[])(_pools[pool_index], _poolSize[pool_index]);
209             if ( !ok )
210             {
211                 throw MemPoolException;
212             }
213             _poolSize[pool_index] *= 2;
214             _pools[pool_index][index] = c;
215             _mark[pool_index]++;
216             debug(nbuff) safe_tracef("pool %d doubled to %d", pool_index, _poolSize[pool_index]);
217         }
218         else
219         {
220             allocator.deallocate(c);
221         }
222     }
223 }
224 
225 
226 @("MemPool")
227 @safe @nogc unittest
228 {
229     MemPool __mempool;
230     //globalLogLevel = LogLevel.info;
231     for(int i=0;i<MemPool.IndexLimit;i++)
232     {
233         __mempool._poolSize[i] = 1024;
234         __mempool._pools[i] = Mallocator.instance.makeArray!(ubyte[])(1024);
235     }
236 
237     for(size_t size=128;size<=64*1024; size = size + size/2)
238     {
239         auto m = __mempool.alloc(size);
240         __mempool.free(m, size);
241     }
242 
243     for(size_t size=128;size<=64*1024; size = size + size/2)
244     {
245         auto m = __mempool.alloc(size);
246         __mempool.free(m, size);
247     }
248     auto m = __mempool.alloc(128);
249     copy("abcd".representation, m);
250     __mempool.free(m, 128);
251     ubyte[][64*1024] cip;
252     for(int c=0;c<128;c++)
253     {
254         for(int i=0;i<64*1024;i++)
255         {
256             cip[i] = __mempool.alloc(i);
257         }
258         for(int i=0;i<64*1024;i++)
259         {
260             __mempool.free(cip[i],i);
261         }
262         for(int i=0;i<64*1024;i++)
263         {
264             cip[i] = __mempool.alloc(i);
265         }
266         for(int i=0;i<64*1024;i++)
267         {
268             __mempool.free(cip[i],i);
269         }
270     }
271 }
272 
273 struct SmartPtr(T)
274 {
275     private struct Impl
276     {
277         T       _object;
278         int     _count;
279         alias   _object this;
280     }
281     public
282     {
283         Impl*   _impl;
284     }
285     this(Args...)(auto ref Args args, string file = __FILE__, int line = __LINE__) @trusted
286     {
287         import std.functional: forward;
288         //_impl = allocator.make!(Impl)(T(args),1);
289         _impl = cast(typeof(_impl)) allocator.allocate(Impl.sizeof);
290         _impl._count = 1;
291         emplace(&_impl._object, forward!args);
292         debug(nbuff) safe_tracef("emplaced", _count);
293     }
294     this(this)
295     {
296         if (_impl) inc;
297     }
298     string toString()
299     {
300         return "(rc: %d, object: %s)".format(_impl?_impl._count:0, _impl?_impl._object.to!string():"none");
301     }
302     void construct() @trusted
303     {
304         if (_impl) rel;
305         _impl = cast(typeof(_impl)) allocator.allocate(Impl.sizeof);
306         _impl._count = 1;
307         emplace(&_impl._object);
308         // _impl = allocator.make!(Impl)(T.init,1);
309     }
310     ~this()
311     {
312         if (_impl is null)
313         {
314             return;
315         }
316         if (dec == 0)
317         {
318             () @trusted {dispose(allocator, _impl);}();
319         }
320     }
321     void opAssign(ref typeof(this) other, string file = __FILE__, int line = __LINE__)
322     {
323         if (_impl == other._impl)
324         {
325             return;
326         }
327         if (_impl)
328         {
329             rel;
330         }
331         _impl = other._impl;
332         if (_impl)
333         {
334             inc;
335         }
336     }
337     private void inc() @safe @nogc
338     {
339         _impl._count++;
340     }
341     private auto dec() @safe @nogc
342     {
343         return --_impl._count;
344     }
345     private void rel() @trusted
346     {
347         if ( _impl is null )
348         {
349             return;
350         }
351         if (dec == 0)
352         {
353             dispose(allocator, _impl);
354         }
355     }
356     alias _impl this;
357 }
358 
359 auto smart_ptr(T, Args...)(Args args)
360 {
361     return SmartPtr!(T)(args);
362 }
363 
364 @("smart_ptr")
365 @safe
366 @nogc
367 unittest
368 {
369     struct S
370     {
371         int i;
372         this(int v)
373         {
374             i = v;
375         }
376         void set(int v)
377         {
378             i = v;
379         }
380     }
381     safe_tracef("test");
382     auto ptr0 = smart_ptr!S(1);
383     assert(ptr0._impl._count == 1);
384     assert(ptr0.i == 1);
385     ptr0.set(2);
386     assert(ptr0.i == 2);
387     SmartPtr!S ptr1;
388     ptr1.construct();
389     assert(ptr1._impl._count == 1);
390     SmartPtr!S ptr2 = ptr0;
391     assert(ptr0._impl._count == 2);
392     ptr2 = ptr1;
393     assert(ptr2._impl._count == 2);
394 }
395 
396 struct UniquePtr(T)
397 {
398     private struct Impl
399     {
400         T       _object;
401         alias   _object this;
402     }
403     @disable this(this); // only move
404     private
405     {
406         Impl*   _impl;
407     }
408     this(Args...)(auto ref Args args) @safe
409     {
410         auto v = T(args);
411         _impl = allocator.make!(Impl)();
412         swap(v, _impl._object);
413     }
414     ~this() @trusted @nogc
415     {
416         if ( _impl)
417         {
418             dispose(allocator, _impl);
419             _impl = null;
420         }
421     }
422     private void rel() @trusted
423     {
424         if ( _impl is null )
425         {
426             return;
427         }
428         dispose(allocator, _impl);
429         _impl = null;
430     }
431     void release()
432     {
433         rel;
434     }
435     void borrow(ref typeof(this) other)
436     {
437         if ( _impl )
438         {
439             rel;
440         }
441         swap(_impl,other._impl);
442     }
443     bool isNull() @safe @nogc nothrow
444     {
445         return _impl is null;
446     }
447     auto opDispatch(string op, Args...)(Args args)
448     {
449         mixin("return _impl._object.%s;".format(op));
450     }
451     alias _impl this;
452 }
453 
454 auto unique_ptr(T, Args...)(Args args)
455 {
456     return UniquePtr!(T)(args);
457 }
458 
459 @("unique_ptr")
460 @safe
461 @nogc
462 unittest
463 {
464     static struct S
465     {
466         int i;
467     }
468     UniquePtr!S ptr0 = UniquePtr!S(1);
469     assert(ptr0.i == 1);
470     UniquePtr!S ptr1;
471     ptr1.borrow(ptr0);
472     assert(ptr1.i == 1);
473     auto ptr2 = unique_ptr!S(2);
474     assert(ptr2.i == 2);
475     ptr2.release();
476 }
477 
478 struct MutableMemoryChunk
479 {
480     debug(nbuff) static int __id;
481     private
482     {
483         ubyte[] _data;
484         size_t  _size;
485     }
486     debug(nbuff) public
487     {
488         int     _id;
489     }
490     @disable this(this);
491 
492     this(size_t s) @safe @nogc
493     {
494         if (s<=MemPool.MaxSize)
495         {
496             debug(nbuff) _id = __id++;
497             debug(nbuff) tracef("alloc from pool %d\n", _id);
498             _data = _mempool.alloc(s);
499         }
500         else
501         {
502             _data = allocator.makeArray!ubyte(s);
503         }
504         _size = s;
505     }
506     ~this() @safe @nogc
507     {
508         debug(nbuff) safe_tracef("destroy: %s, %s", _data, _size);
509         if ( _size == 0 )
510         {
511             return;
512         }
513         if ( _size <= MemPool.MaxSize)
514         {
515             _mempool.free(_data, _size);
516         }
517         else
518         {
519             () @trusted {dispose(allocator, _data);}();
520         }
521     }
522     private immutable(ubyte[]) consume() @system @nogc
523     {
524         auto v = assumeUnique(_data);
525         _data = null;
526         _size = 0;
527         return v;
528     }
529 
530     auto size() pure inout @safe @nogc nothrow
531     {
532         return _size;
533     }
534     auto data() pure inout @system @nogc nothrow
535     {
536         return _data;
537     }
538     alias _data this;
539 }
540 
541 @("MutableMemoryChunk")
542 unittest
543 {
544     import std.stdio;
545     import std.array;
546     {
547         auto c = MutableMemoryChunk(16);
548         auto data = c.data();
549         auto size = c.size();
550         data[0] = 1;
551         data[1..5] = [2, 3, 4, 5];
552         assert(c.data[0] == 1);
553         ubyte[128] payload = 2;
554         data = data ~ payload; // you can append but this do not change anything for c
555         assert(c.data[0] == 1 && c.size() == size);
556         //copy(payload.array, c.data); XXX check
557         assert(c.data[0] == 1 && c.size() == size);
558         auto v = c.consume();
559         assert(equal(v[0..5], [1,2,3,4,5]));
560         _mempool.free(cast(ubyte[])v, size);
561     }
562     {
563         auto c = MutableMemoryChunk(16);
564         auto data = c.data();
565         auto size = c.size();
566         data[0] = 1;
567         data[1..5] = [2, 3, 4, 5];
568         assert(c.data[0] == 1);
569         ubyte[128] payload = 2;
570         data = data ~ payload; // you can append but this do not change anything for c
571         assert(c.data[0] == 1 && c.size() == size);
572         //copy(payload.array, c.data); XXX check
573         assert(c.data[0] == 1 && c.size() == size);
574         auto v = c.consume();
575         assert(equal(v[0..5], [1,2,3,4,5]));
576         _mempool.free(cast(ubyte[])v, size);
577     }
578     auto mutmemptr = unique_ptr!MutableMemoryChunk(16);
579     assert(mutmemptr.size==16);
580 }
581 
582 struct ImmutableMemoryChunk
583 {
584     debug(nbuff) public
585     {
586         int     _id = -1;
587     }
588     private
589     {
590         immutable(ubyte[]) _data;
591         immutable size_t   _size;
592     }
593 
594     @disable this(this);
595 
596     this(ref MutableMemoryChunk c) @trusted @nogc
597     {
598         // trusted because
599         // 1. Chunk have disabled copy constructor so we have single copy of memory under chunk
600         // 2. user can't change data location
601         _size = c.size;
602         _data = assumeUnique(c.consume());
603         debug(nbuff) _id = c._id;
604     }
605     this(string s) @safe @nogc
606     {
607         () @trusted {
608             GC.addRange(&this._data, _data.sizeof);
609         }();
610         _data = s.representation();
611         _size = 0;
612     }
613     this(immutable(ubyte)[] s) @safe @nogc
614     {
615         () @trusted {
616             GC.addRange(&this._data, _data.sizeof);
617         }();
618         _data = s;
619         _size = 0;
620     }
621     ~this() @trusted @nogc
622     {
623         // trusted because ...see constructor
624         if ( _data !is null && _size == 0)
625         {
626             GC.removeRange(&this._data);
627         }
628         if ( _data is null || _size == 0 )
629         {
630             return;
631         }
632         if (_size <= MemPool.MaxSize)
633         {
634             debug(nbuff) safe_printf("return mem to pool %d\n", _id);
635             _mempool.free(cast(ubyte[])_data, _size);
636         }
637         else
638         {
639             debug(nbuff) safe_printf("disposing large buffer\n");
640             dispose(allocator, _data.ptr);
641         }
642     }
643     string toString()
644     {
645         return "[%(%0.2x,%)]".format(_data);
646     }
647     auto size() pure inout @safe @nogc nothrow
648     {
649         return _size;
650     }
651     auto data() pure inout @safe @nogc nothrow
652     {
653         return _data;
654     }
655     alias _data this;
656 }
657 
658 @("ImmutableMemoryChunk")
659 unittest
660 {
661     import std.traits;
662     MutableMemoryChunk c = MutableMemoryChunk(16);
663     c.data[0..8] = [0, 1, 2, 3, 4, 5, 6, 7];
664     ImmutableMemoryChunk ic = ImmutableMemoryChunk(c);
665     assert(equal(ic.data[0..8], [0, 1, 2, 3, 4, 5, 6, 7]));
666 
667     assert(!__traits(compiles, {ic._data[0] = 2;}));
668     assert(!__traits(compiles, {ic._data ~= "123".representation;}));
669 
670     auto d = ic.data;
671     assert(!__traits(compiles, {d[0] = 2;}));
672     assert(!__traits(compiles, {d ~= "123".representation;}));
673     c = MutableMemoryChunk(16);
674     auto imc = SmartPtr!ImmutableMemoryChunk(c);
675 }
676 
677 struct NbuffChunk
678 {
679 
680     private
681     {
682         size_t                          _beg, _end;
683         SmartPtr!(ImmutableMemoryChunk) _memory;
684     }
685     // ~this() @trusted @nogc
686     // {
687     //     if (_memory)
688     //     {
689     //         debug(nbuff) printf("on ~NbuffChunk: %d\n", _memory._count);
690     //         debug(nbuff) printf("on ~NbuffChunk: %s\n", toStringz(dump()));
691     //     }
692     // }
693     this(ref UniquePtr!MutableMemoryChunk c, size_t l) @safe @nogc
694     {
695         _memory = SmartPtr!ImmutableMemoryChunk(c._impl._object);
696         _end = l;
697         c.release;
698     }
699     this(string s) @safe @nogc
700     {
701         _memory = SmartPtr!ImmutableMemoryChunk(s);
702         _end = s.length;
703     }
704     this(immutable(ubyte)[] s) @safe @nogc
705     {
706         _memory = SmartPtr!ImmutableMemoryChunk(s);
707         _end = s.length;
708     }
709     string dump() inout @safe
710     {
711         import std.range: chunks;
712         import std.ascii;
713 
714         int p;
715         string res = "▌%-72.72s▐\n".format("_beg=%d _end=%d _size=%d".format(_beg, _end,_memory._impl._size));
716         foreach(c; chunks(_memory._impl._object[0.._end], 16))
717         {
718             res ~= "▌%5.5d ".format(p);
719             foreach(s; c)
720             {
721                 if (p == _beg)
722                 {
723                     res ~= "▛%02.2x".format(s);
724                 }
725                 else
726                 {
727                     res ~= " %02.2x".format(s);
728                 }
729                 p++;
730             }
731             if (p == _end)
732             {
733                 res ~= "▟";
734             }
735             if (c.length < 16)
736             {
737                 res ~= repeat("◦◦", (16 - c.length)).join(" ");
738             }
739             if ( p == _end && p % 16 == 0 )
740             {
741                 res ~= " ";
742             }
743             else
744             {
745                 res ~= "  ";
746             }
747             foreach(s; c)
748             {
749                 if ( isPrintable(s) )
750                 {
751                     res ~= "%c".format(cast(char)s);
752                 }
753                 else
754                 {
755                     if (s == 13)
756                     {
757                         res ~= "⇦";
758                     }
759                     else if ( s == 10 )
760                     {
761                         res ~= "⇓";
762                     }
763                     else
764                     {
765                         res ~= ".";
766                     }
767                 }
768             }
769             if (c.length < 16)
770             {
771                 res ~= repeat("◦", (16 - c.length)).join();
772             }
773             res ~= "▐\n";
774         }
775         return res;
776     }
777     string toString() inout @trusted
778     {
779         if ( _memory._impl is null)
780         {
781             return null;
782         }
783         return cast(string)data().idup;
784     }
785     string toLower() @safe
786     {
787         // trusted as data do not leave scope
788         return this.toString().toLower;
789     }
790     string toUpper() @safe
791     {
792         // trusted as data do not leave scope
793         return this.toString.toUpper();
794     }
795     public auto size() pure inout nothrow @safe @nogc
796     {
797         return _memory._size;
798     }
799     public auto length() @safe @nogc inout
800     {
801         return _end - _beg;
802     }
803     public auto data() inout @system @nogc
804     {
805         return _memory._impl._object[_beg.._end];
806     }
807     void opAssign(T)(auto ref T other, string file = __FILE__, int line = __LINE__) @safe @nogc
808     {
809         if (this is other)
810         {
811             return;
812         }
813         _memory = other._memory;
814         _beg = other._beg;
815         _end = other._end;
816     }
817 
818     auto opIndex(size_t index)
819     {
820         if (index >= _end - _beg)
821         {
822             throw NbuffError;
823         }
824         return _memory._impl._object[_beg + index];
825     }
826     auto opIndex()
827     {
828         return save();
829     }
830     NbuffChunk opSlice(size_t start, size_t end) @safe @nogc
831     {
832         assert(start<=length && end<=length);
833         auto res = save();
834         res._beg += start;
835         res._end = res._beg + end - start;
836         return res;
837     }
838     auto opEquals(const(ubyte)[] b)
839     {
840         return b == _memory._object[_beg.._end];
841     }
842     auto opDollar()
843     {
844         return _end - _beg;
845     }
846     bool empty() pure @nogc @safe
847     {
848         return _beg == _end;
849     }
850     auto front() @safe @nogc
851     {
852         return _memory._impl._object[_beg];
853     }
854     auto back() @safe @nogc
855     {
856         return _memory._impl._object[_end - 1];
857     }
858     void popFront() @safe @nogc
859     {
860         assert(_beg < _end);
861         _beg++;
862     }
863     void popFrontN(size_t n) @safe @nogc
864     {
865         debug(nbuff) safe_tracef("popn %d of %d", n, length);
866         assert(n <= length);
867         _beg += n;
868     }
869     void popBack() @safe @nogc
870     {
871         assert(_beg < _end);
872         _end--;
873     }
874     void popBackN(size_t n) @safe @nogc
875     {
876         assert(n <= length);
877         _end -= length;
878     }
879     NbuffChunk save() @safe @nogc
880     {
881         NbuffChunk c;
882         c._beg = _beg;
883         c._end = _end;
884         c._memory = _memory;
885         return c;
886     }
887 }
888 
889 // class NbuffError: Exception
890 // {
891 //     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
892 //     {
893 //         super(msg, file, line);
894 //     }
895 // }
896 alias MutableNbuffChunk = UniquePtr!MutableMemoryChunk;
897 /// Smart buffer.
898 /// Usage scenario:
899 /// You are reading bulk newline delimited lines (or any other way structured data) from TCP socket and process it as soon as possible.
900 /// Every time you received something like:
901 ///
902 /// 'line1\nline2\nli' <- note incomplete last line.
903 ///
904 /// from network to your socket buffer you can process 'line1' and 'line2' and then you have keep whole buffer (or copy 
905 /// and save it incomplete part 'li') just because you have some incomplete data.
906 ///
907 /// This leads to unnecessary allocations and data movement (if you choose to free old buffer and save incomplete part)
908 /// or memory wasting (if you choose to preallocate very large buffer and keep processed and incomplete data).
909 ///
910 /// Nbuff solve this problem by using memory pool and smart pointers - it take memory chunks from pool for reading
911 /// from network(file, etc...), and authomatically return buffer to pool as soon as you processed all data in it and moved
912 /// 'processed' pointer forward.
913 /// 
914 /// So Nbuff looks like some "window" on the list of buffers, filled with network data, and as soon as buffer moves out of this
915 /// window and dereferenced it will be automatically returned to memory pool. Please note - there is no GC allocations, everything
916 /// is done usin malloc.
917 ///
918 /// Here is sample of buffer lifecycle (see code or docs for exact function signatures):
919 /// Nbuff nbuff; - initialize nbuff structure.
920 /// buffer = Nbuff.get(bsize) - gives you non-copyable mutable buffer of size >= bsize
921 /// socket.read(buffer.data) - fill buffer with network data.
922 /// nbuff.append(buffer) - convert mutable non-copyable buffer to immutable shareable buffer and append it to nbuff
923 /// valuable_data = nbuff.data(0, 100); - get immutable view to first 100 bytes of nbuff (they can be non-continous)
924 /// nbuff.pop(100) - release fist 100 bytes of nbuff, marking them as "processed".
925 ///     If there are any previously appended buffers which become unreferenced at this point, then they will be
926 ///     returned to the pool.
927 /// When nbuff goes out of scope all its buffers will be returned to pool.
928 ///
929 struct Nbuff
930 {
931     private
932     {
933         enum ChunksPerPage = 8;
934         struct Page
935         {
936             NbuffChunk[ChunksPerPage]   _chunks;
937             Page*                       _next;
938         }
939         size_t  _length;
940         size_t  _endChunkIndex;
941         size_t  _begChunkIndex;
942         Page    _pages;
943     }
944 
945     invariant
946     {
947         assert(_endChunkIndex == _begChunkIndex || _length > 0, "%d, %d, length = %s".format(_begChunkIndex, _endChunkIndex, _length));
948     }
949 
950     ///
951     /// copy references to RC-data
952     ///
953     this(this) @nogc @safe
954     {
955         // copy only allocated pages
956         Page* new_pages, last_new_page;
957         auto p = _pages._next;
958         while(p)
959         {
960             auto new_page = () @trusted {return allocator.make!Page();}();
961             new_page._chunks = p._chunks;
962             if ( new_pages is null)
963             {
964                 new_pages = new_page;
965                 last_new_page = new_pages;
966             }
967             else
968             {
969                 last_new_page._next = new_page;
970                 last_new_page = new_page;
971             }
972             p = p._next;
973         }
974         _pages._next = new_pages;
975     }
976 
977     this(string s) @nogc @safe
978     {
979         append(s);
980     }
981     this(immutable(ubyte)[] s) @nogc @safe
982     {
983         auto c = NbuffChunk(s);
984         append(c);
985     }
986     ///
987     /// copy references to RC-data
988     ///
989     void opAssign(T)(auto ref T other) @safe @nogc
990     {
991         if ( other is this )
992         {
993             return;
994         }
995         // release everything `this` holds
996         auto p = _pages._next;
997         while(p)
998         {
999             auto next = p._next;
1000             () @trusted {dispose(allocator, p);}();
1001             p = next;
1002         }
1003         // copy info from other to this
1004         _length = other._length;
1005         _begChunkIndex = other._begChunkIndex;
1006         _endChunkIndex = other._endChunkIndex;
1007         _pages._chunks = other._pages._chunks;
1008         _pages._next = null;
1009         if (  empty || ( _begChunkIndex < ChunksPerPage && _endChunkIndex < ChunksPerPage))
1010         {
1011             return;
1012         }
1013         // copy only allocated pages
1014         Page* new_pages, last_new_page;
1015         p = other._pages._next;
1016         while(p)
1017         {
1018             auto new_page = () @trusted {return allocator.make!Page();}();
1019             //assert(!new_page);
1020             new_page._chunks = p._chunks;
1021             if ( new_pages is null)
1022             {
1023                 new_pages = new_page;
1024                 last_new_page = new_pages;
1025             }
1026             else
1027             {
1028                 last_new_page._next = new_page;
1029                 last_new_page = new_page;
1030             }
1031             p = p._next;
1032         }
1033         _pages._next = new_pages;
1034     }
1035 
1036     ///
1037     /// dispose all allocated pages and dec refs for any data
1038     ///
1039     ~this() @safe @nogc
1040     {
1041         auto p = _pages._next;
1042         while(p)
1043         {
1044             auto next = p._next;
1045             () @trusted {dispose(allocator, p);}();
1046             p = next;
1047         }
1048     }
1049 
1050     void clear() @nogc @safe
1051     {
1052         _length = 0;
1053         _begChunkIndex = _endChunkIndex = 0;
1054         auto p = _pages._next;
1055         while(p)
1056         {
1057             auto next = p._next;
1058             () @trusted {dispose(allocator, p);}();
1059             p = next;
1060         }
1061         _pages = Page();
1062     }
1063 
1064     string toString() @safe
1065     {
1066         return this.data.toString();
1067     }
1068     string dump() @safe
1069     {
1070         string[] res;
1071         res ~= "▛%s▜".format(repeat("▀",72).join());
1072         res ~= "▌%-72.72s▐".format("_length   = %d".format(_length));
1073         res ~= "▌%-72.72s▐".format("_begIndex = %d".format(_begChunkIndex));
1074         res ~= "▌%-72.72s▐".format("_endIndex = %d".format(_endChunkIndex));
1075         auto p = chunkIndexToPage(_begChunkIndex);
1076         auto chunkIndex = _begChunkIndex % ChunksPerPage;
1077         auto i = _begChunkIndex;
1078         while(i<_endChunkIndex)
1079         {
1080             res ~= "▌%-72.72s▐".format("chunk %d ".format(i));
1081             res ~= p._chunks[chunkIndex].dump()[0..$-1];
1082             i++;
1083             chunkIndex++;
1084             if (chunkIndex>=ChunksPerPage)
1085             {
1086                 p = p._next;
1087                 res ~= "▌%-72.72s▐".format("Page %x".format(p));
1088                 chunkIndex = 0;
1089             }
1090         }
1091         res ~= "▙%s▟".format(repeat("▄",72).join());
1092         return res.join("\n");
1093     }
1094 
1095     static auto get(size_t size) @safe @nogc
1096     {
1097         // take memory from pool
1098         return MutableNbuffChunk(size);
1099     }
1100 
1101     bool empty() pure inout nothrow @safe @nogc
1102     {
1103         return _endChunkIndex == _begChunkIndex;
1104     }
1105 
1106     auto length() pure nothrow @nogc @safe inout
1107     {
1108         return _length;
1109     }
1110 
1111     void append(string s) @safe @nogc
1112     {
1113         debug(nbuff) safe_tracef("append NbuffChunk");
1114         if (s.length==0)
1115         {
1116             throw NbuffError;
1117         }
1118         _length += s.length;
1119         if ( _endChunkIndex < ChunksPerPage)
1120         {
1121             _pages._chunks[_endChunkIndex++] = NbuffChunk(s);
1122             return;
1123         }
1124 
1125         Page* last_page = &_pages;
1126         auto pi = _endChunkIndex - ChunksPerPage;
1127         debug(nbuff) safe_tracef("pi: %d", pi);
1128         debug(nbuff) safe_tracef("last_page.next = %x", last_page._next);
1129         while(pi >= ChunksPerPage)
1130         {
1131             last_page = last_page._next;
1132             pi -= ChunksPerPage;
1133         }
1134         assert(0 <= pi && pi < ChunksPerPage );
1135         debug(nbuff) safe_tracef("last_page.next = %x, pi=%d", last_page._next, pi);
1136         if (last_page._next is null)
1137         {
1138             // have to create new page
1139             debug(nbuff) safe_tracef("create new page");
1140             last_page._next = allocator.make!Page();
1141         }
1142         last_page._next._chunks[pi] = NbuffChunk(s);
1143         _endChunkIndex++;
1144     }
1145 
1146     void append(ref UniquePtr!(MutableMemoryChunk) c, size_t l) @safe @nogc
1147     {
1148         debug(nbuff) safe_tracef("append NbuffChunk");
1149         if (l==0)
1150         {
1151             throw NbuffError;
1152         }
1153         _length += l;
1154         if ( _endChunkIndex < ChunksPerPage)
1155         {
1156             // just convert it to immutable chunk and store it
1157             _pages._chunks[_endChunkIndex++] = NbuffChunk(c,l);
1158             return;
1159         }
1160         // go to last page and store chunk there
1161         Page* last_page = &_pages;
1162         auto pi = _endChunkIndex - ChunksPerPage;
1163         debug(nbuff) safe_tracef("pi: %d", pi);
1164         debug(nbuff) safe_tracef("last_page.next = %x", last_page._next);
1165         while(pi >= ChunksPerPage)
1166         {
1167             last_page = last_page._next;
1168             pi -= ChunksPerPage;
1169         }
1170         assert(0 <= pi && pi < ChunksPerPage );
1171         debug(nbuff) safe_tracef("last_page.next = %x, pi=%d", last_page._next, pi);
1172         if (last_page._next is null)
1173         {
1174             // have to create new page
1175             debug(nbuff) safe_tracef("create new page");
1176             last_page._next = allocator.make!Page();
1177         }
1178         last_page._next._chunks[pi] = NbuffChunk(c,l);
1179         _endChunkIndex++;
1180     }
1181 
1182     void append(ref NbuffChunk source) @safe @nogc
1183     {
1184         append(source, 0, source.length);
1185     }
1186     /// append some part of other nbuff
1187     void append(ref NbuffChunk source, size_t pos, size_t len) @safe @nogc
1188     {
1189         if (len==0)
1190         {
1191             throw NbuffError;
1192         }
1193         _length += len;
1194         if ( _endChunkIndex < ChunksPerPage)
1195         {
1196             _pages._chunks[_endChunkIndex]._memory = source._memory;
1197             _pages._chunks[_endChunkIndex]._beg = source._beg + pos;
1198             _pages._chunks[_endChunkIndex]._end = source._beg + pos + len;
1199             _endChunkIndex++;
1200             return;
1201         }
1202         Page* last_page = &_pages;
1203         auto pi = _endChunkIndex - ChunksPerPage;
1204         debug(nbuff) safe_tracef("pi: %d", pi);
1205         debug(nbuff) safe_tracef("last_page.next = %x", last_page._next);
1206         while(pi >= ChunksPerPage)
1207         {
1208             last_page = last_page._next;
1209             pi -= ChunksPerPage;
1210         }
1211         assert(0 <= pi && pi < ChunksPerPage );
1212         debug(nbuff) safe_tracef("last_page.next = %x, pi=%d", last_page._next, pi);
1213         if (last_page._next is null)
1214         {
1215             // have to create new page
1216             debug(nbuff) safe_tracef("create new page");
1217             last_page._next = allocator.make!Page();
1218         }
1219         last_page._next._chunks[pi] = source;
1220         last_page._next._chunks[pi]._beg += pos;
1221         last_page._next._chunks[pi]._end = last_page._next._chunks[pi]._beg+len;
1222         _endChunkIndex++;
1223     }
1224     /// return first stored chunk
1225     auto frontChunk() @safe @nogc
1226     {
1227         if ( empty )
1228         {
1229             assert(0, "You are looking front chunk in empty Nbuff");
1230         }
1231         Page *p = chunkIndexToPage(_begChunkIndex);
1232         int   i = _begChunkIndex % ChunksPerPage;
1233         return p._chunks[i];
1234     }
1235     /// throw away first chunk
1236     void popChunk() @safe @nogc
1237     {
1238         if ( empty )
1239         {
1240             assert(0, "You are trying to pop chunk from empty Nbuff");
1241         }
1242         if ( _begChunkIndex < ChunksPerPage)
1243         {
1244             _length -= _pages._chunks[_begChunkIndex].length;
1245             _pages._chunks[_begChunkIndex++] = NbuffChunk();
1246             return;
1247         }
1248 
1249         Page* last_page = &_pages;
1250         debug(nbuff) safe_tracef("popping chunk %d", _begChunkIndex);
1251         auto pi = _begChunkIndex - ChunksPerPage;
1252         while(pi >= ChunksPerPage)
1253         {
1254             last_page = last_page._next;
1255             pi -= ChunksPerPage;
1256         }
1257         debug(nbuff) safe_tracef("popping chunk  - on page index = %d", pi);
1258         assert(0 <= pi && pi < ChunksPerPage );
1259         _length -= _pages._chunks[pi].length;
1260         last_page._next._chunks[pi] = NbuffChunk();
1261         _begChunkIndex++;
1262         if (empty)
1263         {
1264             debug(nbuff) safe_tracef("nbuff become empty");
1265             // _endChunkIndex = _begChunkIndex = 0;
1266             clear();
1267             return;
1268         }
1269         if (pi == ChunksPerPage - 1)
1270         {
1271             // we popped last chunk on page - it is completely free
1272             auto pageToDispose = last_page._next;
1273             debug(nbuff) safe_tracef("last chunk were popped from this page and nbuff is not empty, lp: %x, ptd: %x", last_page, pageToDispose);
1274             last_page._next = pageToDispose._next;
1275             () @trusted {dispose(allocator, pageToDispose);}();
1276             // adjust indexes
1277             _begChunkIndex -= ChunksPerPage;
1278             _endChunkIndex -= ChunksPerPage;
1279         }
1280     }
1281     /// pop n bytes from nbuff
1282     void pop(long n=1) @safe @nogc
1283     {
1284         assert(n <= _length);
1285         if (n == _length)
1286         {
1287             clear();
1288             return;
1289         }
1290         auto  toPop = n;
1291         while(toPop > 0)
1292         {
1293             if ( empty )
1294             {
1295                 assert(0, "You are trying to pop chunk from empty Nbuff");
1296             }
1297 
1298             auto page = chunkIndexToPage(_begChunkIndex);
1299             auto index = _begChunkIndex % ChunksPerPage;
1300             page._chunks[index]._beg++;
1301             _length--;
1302             toPop--;
1303             assert(_length >= 0);
1304             if (page._chunks[index]._beg == page._chunks[index]._end)
1305             {
1306                 // empty
1307                 debug(nbuff) tracef("dispose chunk %d", index);
1308                 page._chunks[index] = NbuffChunk();
1309                 if ( index == ChunksPerPage - 1 && _begChunkIndex>=ChunksPerPage) // last chunk on page is free
1310                 {
1311                     // release page
1312                     debug(nbuff) tracef("dispose page");
1313                     auto page_prev = chunkIndexToPage(_begChunkIndex - ChunksPerPage);
1314                     page_prev._next = page._next;
1315                     () @trusted {dispose(allocator, page);}();
1316                     _begChunkIndex++;
1317                     _begChunkIndex -= ChunksPerPage;
1318                     _endChunkIndex -= ChunksPerPage;
1319                 }
1320                 else
1321                 {
1322                     _begChunkIndex++;
1323                     debug(nbuff) tracef("new _begChunkIndex %d", _begChunkIndex);
1324                 }
1325             }
1326         }
1327     }
1328 
1329     private auto chunkIndexToPage(ulong index) pure inout @safe @nogc
1330     {
1331         auto skipPages = index / ChunksPerPage;
1332         auto p = &_pages;
1333         while(skipPages>0)
1334         {
1335             p = p._next;
1336             skipPages--;
1337         }
1338         return p;
1339     }
1340     ///
1341     /// return immutable continuous view to [beg, end] of this nbuff.
1342     /// Data copy:
1343     ///  We can avoid data copy if [beg, end] lies within single stored immutable chunk. Then we just return ref to this chunk
1344     ///  We can't avoid data copy if [beg, end] span several chunks. Then we have to join them.
1345     /// 
1346     NbuffChunk data(size_t beg, size_t end) @safe
1347     {
1348         if (beg>end)
1349         {
1350             throw NbuffError;
1351         }
1352         if (end>_length)
1353         {
1354             throw NbuffError;
1355         }
1356         if (beg == end || _length == 0)
1357         {
1358             return NbuffChunk();
1359         }
1360         auto bytesToSkip = beg;
1361         auto bytesToCopy = end-beg;
1362         auto page = chunkIndexToPage(_begChunkIndex);
1363         auto chunkIndex = _begChunkIndex % ChunksPerPage;
1364         size_t position;
1365         debug(nbuff) safe_tracef("bytesToSkip: %d", bytesToSkip);
1366         while(bytesToSkip>0)
1367         {
1368             auto chunk = &page._chunks[chunkIndex];
1369             debug(nbuff) safe_tracef("check chunk: [%s..%s](%d)",
1370                 chunk._beg, chunk._end, chunk.length);
1371 
1372             if (bytesToSkip>=chunk.length)
1373             {
1374                 // just skip this chunk
1375                 debug(nbuff) safe_tracef("skip it");
1376                 bytesToSkip -= chunk.length;
1377                 chunkIndex++;
1378                 if (chunkIndex == ChunksPerPage)
1379                 {
1380                     page = page._next;
1381                     chunkIndex = 0;
1382                     continue;
1383                 }
1384             }
1385             else
1386             {
1387                 debug(nbuff) safe_tracef("start from it");
1388                 position = bytesToSkip;
1389                 bytesToSkip = 0;
1390                 break;
1391             }
1392         }
1393         debug(nbuff) safe_tracef("start index = %d, position = %d", chunkIndex, position);
1394         assert(page !is null);
1395         assert(chunkIndex < ChunksPerPage);
1396         if ( page._chunks[chunkIndex]._beg + position + bytesToCopy <= page._chunks[chunkIndex]._end )
1397         {
1398             debug(nbuff) safe_tracef("return ref to single chunk");
1399             // return reference to this chunk only
1400             NbuffChunk res = page._chunks[chunkIndex];
1401             res._beg += position;
1402             res._end = res._beg + bytesToCopy;
1403             return res;
1404         }
1405         
1406         // otherwise join chunks
1407         auto mc = Nbuff.get(bytesToCopy);
1408         size_t bytesCopied = 0;
1409         while(bytesToCopy>0)
1410         {
1411             auto from = page._chunks[chunkIndex]._beg + position;
1412             auto to = page._chunks[chunkIndex]._end;
1413             auto tc = min(bytesToCopy, to - from);
1414             debug(nbuff) safe_tracef("from: %d, to: %d, tocopy=%d", from, to, tc);
1415             mc._impl._object[bytesCopied..bytesCopied+tc] = page._chunks[chunkIndex]._memory._object[from..from+tc];
1416             bytesCopied += tc;
1417             chunkIndex++;
1418             bytesToCopy -= tc;
1419             position = 0;
1420             if (chunkIndex>=ChunksPerPage)
1421             {
1422                 chunkIndex -= ChunksPerPage;
1423                 page = page._next;
1424             }
1425         }
1426         return NbuffChunk(mc, bytesCopied);
1427     }
1428     ///
1429     /// return immutable continuous view to this whole nbuff.
1430     /// Data copy:
1431     ///  We can avoid data copy if this nbuff store single immutable chunk. Then we just return ref to this chunk
1432     ///  We can't avoid data copy if nbuff span several chunks. Then we have to join them.
1433     /// 
1434     NbuffChunk data() @safe @nogc
1435     {
1436         if (_length==0)
1437         {
1438             return NbuffChunk();
1439         }
1440         // ubyte[] result = new ubyte[](_length);
1441         auto skipPages = _begChunkIndex / ChunksPerPage;
1442         int  bi = cast(int)_begChunkIndex, ei = cast(int)_endChunkIndex;
1443         Page* p = &_pages;
1444         auto toCopyChunks = ei - bi;
1445         int toCopyBytes = cast(int)_length;
1446         int bytesCopied = 0;
1447         while(skipPages>0)
1448         {
1449             bi -= ChunksPerPage;
1450             ei -= ChunksPerPage;
1451             p = p._next;
1452             skipPages--;
1453         }
1454         if(ei == bi + 1)
1455         {
1456             // there is only chunk in buffer, we can return it
1457             return p._chunks[bi];
1458         }
1459         assert(bi>=0 && ei>0);
1460         auto mc = Nbuff.get(_length);
1461         while(toCopyChunks>0 && toCopyBytes>0)
1462         {
1463             auto from = p._chunks[bi]._beg;
1464             auto to = p._chunks[bi]._end;
1465             auto tc = to - from;
1466             mc._impl._object[bytesCopied..bytesCopied+tc] = p._chunks[bi]._memory._object[from..to];
1467             bytesCopied += tc;
1468             bi++;
1469             toCopyBytes -= tc;
1470             toCopyChunks--;
1471             if (bi>=ChunksPerPage)
1472             {
1473                 bi -= ChunksPerPage;
1474                 ei -= ChunksPerPage;
1475                 p = p._next;
1476             }
1477         }
1478         return NbuffChunk(mc, _length);
1479     }
1480     ///
1481     /// return non-contiguous vie to slice of this nbuff.
1482     /// Data copy: none
1483     ///
1484     Nbuff opSlice(size_t start, size_t end) @nogc @safe
1485     {
1486         //debug(nbuff) tracef("slice %d..%d", start, end);
1487         if (start>end)
1488         {
1489             throw NbuffError;
1490         }
1491         if (end>_length)
1492         {
1493             throw NbuffError;
1494         }
1495         if (start==end)
1496         {
1497             return Nbuff();
1498         }
1499         auto bytesToSkip = start;
1500         auto bytesToCopy = end-start;
1501         auto page = chunkIndexToPage(_begChunkIndex);
1502         auto chunkIndex = _begChunkIndex % ChunksPerPage;
1503         size_t position;// = page._chunks[chunkIndex]._beg;
1504         // debug(nbuff) tracef("start index = %d", chunkIndex);
1505         // debug(nbuff) tracef("bytesToSkip = %d", bytesToSkip);
1506         // debug(nbuff) tracef("bytesToCopy = %d", bytesToCopy);
1507         while(bytesToSkip>0)
1508         {
1509             auto chunk = &page._chunks[chunkIndex];
1510             //debug(nbuff) tracef("check chunk: [%s..%s](%d)",
1511             //    chunk._beg, chunk._end, chunk.length);
1512 
1513             if (bytesToSkip>=chunk.length)
1514             {
1515                 // just skip this chunk
1516                 bytesToSkip -= chunk.length;
1517                 chunkIndex++;
1518                 if (chunkIndex == ChunksPerPage)
1519                 {
1520                     page = page._next;
1521                     chunkIndex = 0;
1522                     continue;
1523                 }
1524             }
1525             else
1526             {
1527                 position = bytesToSkip;
1528                 bytesToSkip = 0;
1529                 break;
1530             }
1531         }
1532         assert(page !is null);
1533         assert(chunkIndex < ChunksPerPage);
1534         Nbuff result = Nbuff();
1535         /// copy references
1536         while(bytesToCopy>0)
1537         {
1538             auto l = min(bytesToCopy, page._chunks[chunkIndex]._end - position - page._chunks[chunkIndex]._beg);
1539             result.append(page._chunks[chunkIndex], position, l);
1540             bytesToCopy -= l;
1541             position = 0;
1542             chunkIndex++;
1543             if (chunkIndex == ChunksPerPage)
1544             {
1545                 page = page._next;
1546                 chunkIndex = 0;
1547             }
1548         }
1549         return result;
1550     }
1551 
1552     auto opDollar()
1553     {
1554         return _length;
1555     }
1556     bool opEquals(string other) pure const @safe @nogc
1557     {
1558         return this == other.representation;
1559     }
1560     bool opEquals(const ubyte[] other) pure const @safe @nogc
1561     {
1562         if (other.length != length)
1563         {
1564             return false;
1565         }
1566         return countUntil(other) == 0;
1567     }
1568     bool opEquals(this R)(auto ref R other) pure @safe @nogc
1569     {
1570         if (this is other)
1571         {
1572             return true;
1573         }
1574         if (_length != other._length)
1575         {
1576             return false;
1577         }
1578         // real comparison
1579         bool equals = true;
1580         size_t to_compare = _length;
1581         Page* page_this = chunkIndexToPage(_begChunkIndex);
1582         Page* page_other = other.chunkIndexToPage(other._begChunkIndex);
1583         size_t chunk_index_this = _begChunkIndex;
1584         size_t chunk_index_other = other._begChunkIndex;
1585         size_t position_this = 0;
1586         size_t position_other = 0;
1587         while(equals && to_compare>0)
1588         {
1589             auto i_this  = chunk_index_this % ChunksPerPage;
1590             auto i_other = chunk_index_other % ChunksPerPage;
1591             auto chunk_t = &page_this._chunks[i_this];
1592             auto chunk_o = &page_other._chunks[i_other];
1593             assert(position_this < chunk_t._end);
1594             assert(position_other < chunk_o._end);
1595             auto l_t = chunk_t._end - chunk_t._beg - position_this;
1596             auto l_o = chunk_o._end - chunk_o._beg - position_other;
1597             auto l_c = min(l_t, l_o, to_compare);
1598             debug(nbuff) safe_tracef("compare %s and %s",
1599                 chunk_t._memory._data[chunk_t._beg + position_this..chunk_t._beg + position_this+l_c],
1600                 chunk_o._memory._data[chunk_o._beg + position_other..chunk_o._beg + position_other+l_c]);
1601             equals = equal(chunk_t._memory._data[chunk_t._beg + position_this..chunk_t._beg + position_this+l_c],
1602                            chunk_o._memory._data[chunk_o._beg + position_other..chunk_o._beg + position_other+l_c]);
1603             position_this += l_c;
1604             position_other += l_c;
1605             if (l_c == l_t)
1606             {
1607                 position_this = 0;
1608                 chunk_index_this++;
1609                 i_this++;
1610                 if (i_this == ChunksPerPage)
1611                 {
1612                     page_this = page_this._next;
1613                 }
1614             }
1615             if (l_c == l_o)
1616             {
1617                 position_other = 0;
1618                 chunk_index_other++;
1619                 i_other++;
1620                 if (i_other == ChunksPerPage)
1621                 {
1622                     page_other = page_other._next;
1623                 }
1624             }
1625             debug(nbuff) safe_tracef("compared %d bytes: %s", l_c, equals);
1626             to_compare -= l_c;
1627         }
1628         return equals;
1629     }
1630 
1631     auto opIndex(size_t i) @safe @nogc inout
1632     {
1633         if ( i >= _length)
1634         {
1635             throw NbuffError;
1636         }
1637         auto page = chunkIndexToPage(_begChunkIndex);
1638         long chunkIndex = _begChunkIndex;
1639         while(i >= 0)
1640         {
1641             auto ci = chunkIndex % ChunksPerPage;
1642             auto chunk = &page._chunks[ci];
1643             if (chunk.length > i)
1644             {
1645                 return chunk._memory._impl._object[chunk._beg + i];
1646             }
1647             i -= chunk.length;
1648             chunkIndex++;
1649             if (ci+1 == ChunksPerPage)
1650             {
1651                 page = page._next;
1652             }
1653         }
1654         assert(0, "Index larger that length?");
1655     }
1656     Nbuff[3] findSplitOn(string s, size_t start_from = 0) @safe @nogc
1657     {
1658         return findSplitOn(s.representation, start_from);
1659     }
1660     Nbuff[3] findSplitOn(immutable(ubyte)[] b, size_t start_from = 0) @safe @nogc
1661     {
1662         Nbuff[3] result;
1663         int s = countUntil(b, start_from);
1664         if (s>=0)
1665         {
1666             result[0] = this[0..s];
1667             result[1] = this[s..s+b.length];
1668             result[2] = this[s+b.length..$];
1669         }
1670         return result;
1671     }
1672     bool beginsWith(const ubyte[] b) @safe @nogc
1673     {
1674         if ( length < b.length )
1675         {
1676             return false;
1677         }
1678         return data()[0..b.length] == b;
1679     }
1680     bool beginsWith(string b) @safe @nogc
1681     {
1682         if ( length < b.length )
1683         {
1684             return false;
1685         }
1686         return data()[0..b.length] == b.representation;
1687     }
1688 
1689     int countUntil(const(ubyte)[] b, size_t start_from = 0) pure inout @safe @nogc
1690     {
1691         if (b.length == 0)
1692         {
1693             throw NbuffError;
1694         }
1695         if (_length == 0)
1696         {
1697             return -1;
1698         }
1699         if (start_from >= _length)
1700         {
1701             throw NbuffError;
1702         }
1703         if (start_from == -1 || _length == 0)
1704         {
1705             return -1;
1706         }
1707         int index = 0; // index is position at which we test pattern 
1708         // skip start_from
1709         auto page = chunkIndexToPage(_begChunkIndex);
1710         long chunkIndex = _begChunkIndex;
1711         auto ci = chunkIndex % ChunksPerPage; // ci - index of the chunk on current page
1712         auto chunk = &page._chunks[ci];
1713         while(start_from > 0) // skip requested number of bytes
1714         {
1715             if (chunk.length > start_from)
1716             {
1717                 break;
1718             }
1719             index += chunk.length;
1720             start_from -= chunk.length;
1721             chunkIndex++;
1722             ci++;
1723             if (ci == ChunksPerPage)
1724             {
1725                 page = page._next;
1726                 ci = 0;
1727             }
1728             chunk = &page._chunks[ci];
1729         }
1730         // start_from can be > 0 if index points inside chunk
1731         index += start_from;
1732         auto position_this = start_from; // position_this - offset in Nbuff chunk we looking at
1733         auto position_needle = 0;        // position_needlse - offset inside pattern
1734 
1735         debug(nbuff) safe_tracef("Start search from chunk index: %s, position: %s", chunkIndex, position_this);
1736         immutable needle_length = b.length;
1737         while(index + needle_length <= _length)
1738         {
1739             size_t to_compare = needle_length;
1740             debug(nbuff) safe_tracef("test from this[%s]=%02x, compare len=%d", index, this[index], to_compare);
1741             // all 'local' variables used as we might overlap chunk
1742             auto local_chunkIndex = chunkIndex;
1743             auto local_position_this = position_this;
1744             auto local_position_needle = position_needle;
1745             auto local_ci = ci;
1746             auto local_chunk = chunk;
1747             auto local_page = page;
1748             while(to_compare>0)
1749             {
1750                 auto c_l = min(to_compare, local_chunk.length-local_position_this); // we can comapre only until the end of the chunk
1751                 immutable equals = equal(
1752                     local_chunk._memory._data[local_chunk._beg+local_position_this..local_chunk._beg+local_position_this+c_l],
1753                     b[local_position_needle..local_position_needle+c_l]
1754                 );
1755                 if (!equals)
1756                 {
1757                     break;
1758                 }
1759                 local_position_this += c_l;
1760                 local_position_needle += c_l;
1761                 to_compare -= c_l;
1762                 if ( to_compare == 0)
1763                 {
1764                     break;
1765                 }
1766                 if (local_position_this==chunk.length)
1767                 {
1768                     debug(nbuff) safe_tracef("continue on next chunk");
1769                     local_position_this = 0;
1770                     local_chunkIndex++;
1771                     if (local_chunkIndex == _endChunkIndex)
1772                     {
1773                         break;
1774                     }
1775                     local_ci++;
1776                     if (local_ci==ChunksPerPage)
1777                     {
1778                         debug(nbuff) safe_tracef("continue on next page");
1779                         local_ci = 0;
1780                         local_page = local_page._next;
1781                     }
1782                     local_chunk = &local_page._chunks[local_ci];
1783                 }
1784                 
1785             }
1786             if (to_compare == 0)
1787             {
1788                 debug(nbuff) safe_tracef("return %d", index);
1789                 return index;
1790             }
1791             index++;
1792             debug(nbuff) safe_tracef("start from next position %d", index);
1793             position_this++;
1794             position_needle = 0;
1795             if ( position_this == chunk.length)
1796             {
1797                 // switch to next chunk
1798                 debug(nbuff) safe_tracef("next chunk");
1799                 position_this = 0;
1800                 chunkIndex++;
1801                 if (chunkIndex == _endChunkIndex)
1802                 {
1803                     return -1;
1804                 }
1805                 ci++;
1806                 if (ci == ChunksPerPage)
1807                 {
1808                     debug(nbuff) safe_tracef("and next page");
1809                     page = page._next;
1810                     ci = 0;
1811                 }
1812                 chunk = &page._chunks[ci];
1813             }
1814         }
1815         return -1;
1816     }
1817     version(Posix)
1818     {
1819         import core.sys.posix.sys.uio: iovec;
1820         /// build iovec from this nbuff
1821         /// Paramenter v - ref to array of iovec structs
1822         /// n - size of array (must be >0)
1823         /// Return number of actually filled entries;
1824         int toIoVec(iovec* v, int n) @system
1825         {
1826             assert(n>0);
1827             if (empty)
1828             {
1829                 return 0;
1830             }
1831             int vi = 0;
1832             auto skipPages = _begChunkIndex / ChunksPerPage;
1833             int  bi = cast(int)_begChunkIndex, ei = cast(int)_endChunkIndex;
1834             Page* p = &_pages;
1835             while(skipPages > 0)
1836             {
1837                 bi -= ChunksPerPage;
1838                 ei -= ChunksPerPage;
1839                 p = p._next;
1840                 skipPages--;
1841             }
1842 
1843             // fill no more than n and no more than we have chunks
1844             int i = bi, j = 0;
1845             for(; j < n && i<ei; j++)
1846             {
1847                 auto beg = p._chunks[i]._beg;
1848                 void* base = cast(void*)p._chunks[i]._memory._impl._object.ptr + beg;
1849                 v[j].iov_base = base;
1850                 v[j].iov_len  = p._chunks[i].length;
1851                 i++;
1852                 if (i == ChunksPerPage)
1853                 {
1854                     i -= ChunksPerPage;
1855                     ei -= ChunksPerPage;
1856                     p = p._next;
1857                 }
1858             }
1859             return j;
1860         }
1861     }
1862 }
1863 
1864 
1865 @("NbuffChunk")
1866 @safe
1867 unittest
1868 {
1869     import std.range.primitives;
1870     static assert(isInputRange!NbuffChunk);
1871     static assert(isForwardRange!NbuffChunk);
1872     static assert(isBidirectionalRange!NbuffChunk);
1873     static assert(hasLength!NbuffChunk);
1874     static assert(is(typeof(lvalueOf!NbuffChunk[1]) == ElementType!NbuffChunk));
1875     static assert(isRandomAccessRange!NbuffChunk);
1876     auto c = NbuffChunk("abcd");
1877     assert(equal(c, "abcd"));
1878     assert(c == "abcd".representation);
1879     assert(equal(c[1..3], "bc"));
1880     assert(c[1..3][0] == 'b');
1881     assert(c[1..3][$-1] == 'c');
1882 }
1883 
1884 @("Nbuff0")
1885 @nogc
1886 unittest
1887 {
1888     import std.string;
1889     import std.stdio;
1890     Nbuff b;
1891     auto d = b;
1892     auto chunk = Nbuff.get(512);
1893     copy("Abc".representation, chunk.data);
1894     b.append(chunk, 3);
1895     chunk = Nbuff.get(16);
1896     copy("Def".representation, chunk.data);
1897     b.append(chunk, 3);
1898     d = b;
1899 }
1900 
1901 @("Nbuff1")
1902 @nogc
1903 unittest
1904 {
1905     import std.string;
1906     {
1907         Nbuff b;
1908         auto chunk = Nbuff.get(11);
1909         copy("Abc".representation, chunk.data);
1910         b.append(chunk, 3);
1911     }
1912     {
1913         Nbuff b;
1914         auto chunk = Nbuff.get(11);
1915         copy("Abc".representation, chunk.data);
1916         b.append(chunk, 3);
1917     }
1918     Nbuff b;
1919     auto d = b;
1920     auto chunk = Nbuff.get(512);
1921     copy("Def".representation, chunk.data);
1922     b.append(chunk, 3);
1923     b.popChunk();
1924 }
1925 
1926 @("Nbuff2")
1927 unittest
1928 {
1929     Nbuff b;
1930     for(int i=1; i <= 2*Nbuff.ChunksPerPage + 1; i++)
1931     {
1932         auto chunk = Nbuff.get(64);
1933         b.append(chunk, i);
1934     }
1935     for(int i=0;i<2*Nbuff.ChunksPerPage; i++)
1936     {
1937         b.popChunk();
1938     }
1939     b.popChunk();
1940     Nbuff c;
1941     c = b;
1942 }
1943 
1944 @("Nbuff3")
1945 unittest
1946 {
1947     Nbuff b;
1948     auto chunk = Nbuff.get(16);
1949     copy("Hello".representation, chunk.data);
1950     b.append(chunk, 5);
1951     chunk = Nbuff.get(16);
1952     copy(",".representation, chunk.data);
1953     b.append(chunk, 1);
1954     chunk = Nbuff.get(16);
1955     copy(" world!".representation, chunk.data);
1956     b.append(chunk, 7);
1957     auto d = b.data();
1958     assert(equal(d, "Hello, world!".representation));
1959     assert(d[0] == 'H');
1960     assert(d[$-1] == '!');
1961 }
1962 
1963 @("Nbuff4")
1964 unittest
1965 {
1966     Nbuff b;
1967     for(int i=0; i<32; i++)
1968     {
1969         string s = "%s,".format(i);
1970         auto chunk = Nbuff.get(16);
1971         copy(s.representation, chunk.data);
1972         b.append(chunk, s.length);
1973     }
1974     assert(equal(b.data, "0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,"));
1975 }
1976 
1977 @("Nbuff5")
1978 unittest
1979 {
1980     Nbuff b;
1981     for(int i=0; i<32; i++)
1982     {
1983         string s = "%s,".format(i);
1984         auto chunk = Nbuff.get(16);
1985         copy(s.representation, chunk.data);
1986         b.append(chunk, s.length);
1987     }
1988     assert(b.length == 86);
1989     b.pop();
1990     assert(equal(b.data, ",1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,"));
1991     b.pop();
1992     assert(equal(b.data, "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,"));
1993     b.pop(2);
1994     assert(equal(b.data, "2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,"));
1995     b.pop(34);
1996     assert(equal(b.data, "16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,"));
1997     b.pop(48);
1998     assert(b.length == 0, "b.length=%d".format(b.length));
1999 }
2000 
2001 @("Nbuff6")
2002 unittest
2003 {
2004     Nbuff b, d;
2005     for(int i=0; i<32; i++)
2006     {
2007         string s = "%s,".format(i);
2008         auto chunk = Nbuff.get(16);
2009         copy(s.representation, chunk.data);
2010         b.append(chunk, s.length);
2011     }
2012     b.pop();
2013     assert(equal(b.data, ",1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,"));
2014     {
2015         auto c = b[2..8]; // ",2,3,4"
2016         assert(equal(",2,3,4", c.data));
2017         d = c;
2018     }
2019     b.clear();
2020     for(int i=0; i<99; i++)
2021     {
2022         string s = "%02d,".format(i);
2023         auto chunk = Nbuff.get(16);
2024         copy(s.representation, chunk.data);
2025         b.append(chunk, s.length);
2026     }
2027     auto c = b[45..96];
2028     assert(equal(c.data, "15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,"));
2029 }
2030 @("Nbuff7")
2031 unittest
2032 {
2033     globalLogLevel = LogLevel.info;
2034     Nbuff b;
2035     for(int i=0; i<32; i++)
2036     {
2037         string s = "%s,".format(i);
2038         auto chunk = Nbuff.get(16);
2039         copy(s.representation, chunk.data);
2040         b.append(chunk, s.length);
2041     }
2042     auto c = b[0..$];
2043     assert(b==c);
2044     auto d = b[0..10];
2045     auto e = b[1..11];
2046     assert(d != e);
2047     Nbuff f, g;
2048     f.append("abc");
2049     g.append("bc");
2050     assert(f[1..$] == g);
2051     f.pop();
2052     assert(f == g);
2053     f.popChunk();
2054     assert(f.length == 0);
2055     // mix string and chunk appends
2056     f.append("Length: ");
2057     auto chunk = Nbuff.get(16);
2058     copy("100\n".representation, chunk.data);
2059     f.append(chunk, 4);
2060     g.clear();
2061     g.append("L");
2062     g.append("ength: 100\n");
2063     globalLogLevel = LogLevel.trace;
2064     assert(f == g);
2065     //
2066     assert(f[0] == 'L');
2067     assert(g[1] == 'e');
2068     assert(g[11] == '\n');
2069 }
2070 @("Nbuff8")
2071 unittest
2072 {
2073     Nbuff b;
2074     b.append("012");
2075     b.append("345");
2076     b.append("678");
2077     assert(b.countUntil("01".representation, 0)==0);
2078     assert(b.countUntil("12".representation, 0)==1);
2079     assert(b.countUntil("23".representation, 0)==2);
2080     assert(b.countUntil("23".representation, 2)==2);
2081     assert(b.countUntil("345".representation, 2)==3);
2082     assert(b.countUntil("345".representation, 3)==3);
2083     assert(b.countUntil("23456".representation,0) == 2);
2084     b.clear();
2085     for(int i=0;i<100;i++)
2086     {
2087         b.append("%d".format(i));
2088     }
2089     assert(b.countUntil("10".representation) == 10);
2090     assert(b.countUntil("90919".representation) == 170);
2091     assert(b.countUntil("99".representation, 170) == 188);
2092     for(int skip=0; skip<=170; skip++)
2093     {
2094         assert(b.countUntil("90919".representation, skip) == 170);
2095     }
2096 }
2097 @("Nbuff9")
2098 unittest
2099 {
2100     globalLogLevel = LogLevel.info;
2101     Nbuff b, line;
2102     b.append("\na\nbb\n");
2103     auto c = b.countUntil("\n".representation);
2104     b = b[c+1..$];
2105     assert(b.data == "a\nbb\n".representation);
2106     c = b.countUntil("\n".representation);
2107     line = b[0..c];
2108     assert(line.data == "a".representation);
2109 
2110     b = b[c+1..$];
2111     assert(b.data == "bb\n".representation);
2112     c = b.countUntil("\n".representation);
2113     b = b[c+1..$];
2114     b.append("ccc\nrest");
2115     c = b.countUntil("\n".representation);
2116     b = b[c+1..$];
2117     c = b.countUntil("\n".representation);
2118 }
2119 
2120 @("Nbuff10")
2121 unittest
2122 {
2123     auto c = UniquePtr!MutableMemoryChunk(64);
2124     c.data[0] = 1;
2125     auto n = NbuffChunk("abc");
2126 }
2127 
2128 @("Nbuff11")
2129 unittest
2130 {
2131     globalLogLevel = LogLevel.info;
2132     // init from string
2133     auto b = Nbuff("abc");
2134     assert(b.length == 3);
2135     assert(b.data.data == NbuffChunk("abc"));
2136     b.pop();
2137     NbuffChunk c = b.data(0,1);
2138     assert(c.data == "b");
2139     c = b.data(1,2);
2140     assert(c.data == "c");
2141     b = Nbuff("abc");
2142     b.append("def");
2143     b.append("ghi");
2144     c = b.data(4,6);
2145     assert(c.data == "ef");
2146     c = b.data(2,4);
2147     assert(c.data == "cd");
2148     c = b.data(2,7);
2149     assert(c.data == "cdefg");
2150 }
2151 
2152 version(Posix)
2153 {
2154     @("iovec")
2155     unittest
2156     {
2157         import core.sys.posix.sys.uio: iovec;
2158         Nbuff b;
2159         iovec[64] iov64;
2160         // fill nbuff
2161         for(int k=0;k<32;k++)
2162         {
2163             b.append("%s\n".format(k));
2164         }
2165         // pop 15 chunks (so we will cross page boundary)
2166         iota(15).each!(_ => b.popChunk());
2167         auto i = b.toIoVec(&iov64[0], 64);
2168         assert(i == 17);
2169         for(int k=0;k<i;k++)
2170         {
2171             int j = k + 15;
2172             assert("%s\n".format(j) == cast(string)iov64[k].iov_base[0..iov64[k].iov_len]);
2173             //writef("%s:%s", j, cast(string)iov64[k].iov_base[0..iov64[k].iov_len]);
2174         }
2175 
2176     }
2177 }