1 module nbuff.buffer; 2 3 import std.string; 4 import std.array; 5 import std.algorithm; 6 import std.conv; 7 import std.range; 8 import std.stdio; 9 import std.traits; 10 import std.format; 11 import core.exception; 12 import std.exception; 13 import std.range.primitives; 14 import std.experimental.logger; 15 16 import core.memory: GC; 17 18 private import std.experimental.allocator; 19 private import std.experimental.allocator.mallocator : Mallocator; 20 21 /// 22 // network buffer 23 /// 24 25 /// Range Empty exception 26 static immutable Exception RangeEmpty = new Exception("try to pop from empty Buffer"); // @suppress(dscanner.style.phobos_naming_convention) // @suppress(dscanner.style.phobos_naming_convention) 27 /// Requested index is out of range 28 static immutable Exception IndexOutOfRange = new Exception("Index out of range"); 29 /// Buffer internal struct problem 30 static immutable Exception BufferError = new Exception("Buffer internal struct corrupted"); 31 32 // goals 33 // 1 minomal data copy 34 // 2 Range interface 35 // 3 minimal footprint 36 37 38 public alias BufferChunk = immutable(ubyte)[]; 39 public alias BufferChunksArray = immutable(BufferChunk)[]; 40 41 debug(nbuff) @safe @nogc nothrow 42 { 43 import std.experimental.logger; 44 package void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow 45 { 46 bool osx,ldc; 47 version(OSX) 48 { 49 osx = true; 50 } 51 version(LDC) 52 { 53 ldc = true; 54 } 55 if (!osx || !ldc) 56 { 57 // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240 58 import core.thread; 59 () @trusted @nogc nothrow 60 { 61 try 62 { 63 //debug(nbuff)writefln("[%x] %s:%d " ~ f, Thread.getThis().id(), file, line, args); 64 } 65 catch(Exception e) 66 { 67 () @trusted @nogc nothrow 68 { 69 try 70 { 71 //debug(nbuff)errorf("[%x] %s:%d Exception: %s", Thread.getThis().id(), file, line, e); 72 } 73 catch 74 { 75 } 76 }(); 77 } 78 }(); 79 } 80 } 81 } 82 83 debug(nbuff) 84 { 85 void safe_printf(A...)(A a) @trusted 86 { 87 import core.stdc.stdio: printf; 88 //printf(&a[0][0], a[1..$]); 89 } 90 } 91 92 package static MemPool _mempool; 93 static this() 94 { 95 for(int i=0;i<MemPool.IndexLimit;i++) 96 { 97 _mempool._poolSize[i] = 1024; 98 _mempool._pools[i] = Mallocator.instance.makeArray!(ubyte[])(1024); 99 } 100 } 101 private static bool in_exit; 102 static ~this() 103 { 104 in_exit = true; 105 for(int i=0;i<MemPool.IndexLimit;i++) 106 { 107 for(int j=0; j < _mempool._mark[i]; j++) 108 { 109 Mallocator.instance.dispose(_mempool._pools[i][j]); 110 } 111 _mempool._mark[i] = 0; 112 Mallocator.instance.dispose(_mempool._pools[i]); 113 } 114 } 115 116 alias allocator = Mallocator.instance; 117 static immutable Exception MemPoolException = new Exception("MemPoolException"); 118 static immutable Exception NbuffError = new Exception("NbuffError"); 119 struct MemPool 120 { 121 // class MemPoolException: Exception 122 // { 123 // this(string msg) @nogc @safe 124 // { 125 // super(msg); 126 // } 127 // } 128 129 import core.bitop; 130 private 131 { 132 enum MinSize = 64; 133 enum MaxSize = 64*1024; 134 enum IndexLimit = bsr(MaxSize); 135 enum MaxPoolWidth = 64*1024; 136 alias ChunkInPool = ubyte[]; 137 alias Pool = ChunkInPool[]; 138 Pool[IndexLimit] _pools; 139 size_t[IndexLimit] _poolSize; 140 size_t[IndexLimit] _mark; 141 } 142 ~this() @safe @nogc 143 { 144 for(int i=0;i<MemPool.IndexLimit;i++) 145 { 146 () @trusted { 147 for(int j=0; j < _mark[i]; j++) 148 { 149 allocator.deallocate(_pools[i][j]); 150 } 151 _mark[i] = 0; 152 allocator.deallocate(_pools[i]); 153 }(); 154 } 155 } 156 ChunkInPool alloc(size_t size) @nogc @safe 157 { 158 if ( size < MinSize ) 159 { 160 size = MinSize; 161 } 162 if (size>MaxSize) 163 { 164 throw MemPoolException; 165 } 166 immutable i = bsr(size); 167 immutable index = _mark[i]; 168 if (index == 0) 169 { 170 auto b = allocator.makeArray!(ubyte)(2<<i); 171 debug(nbuff) safe_tracef("allocated %d new bytes(because pool[%d] empty) at %x", size, i, &b[0]); 172 return b; 173 } 174 auto b = _pools[i][index-1]; 175 _mark[i]--; 176 assert(_mark[i]>=0); 177 debug(nbuff) safe_tracef("allocated chunk from pool %d size %d", i, size); 178 return b; 179 } 180 void free(ChunkInPool c, size_t size) @nogc @trusted 181 { 182 if ( size < MinSize ) 183 { 184 size = MinSize; 185 } 186 if (size>MaxSize) 187 { 188 throw MemPoolException; 189 } 190 if (in_exit) 191 { 192 // mem pool can be destroyed already 193 allocator.deallocate(c); 194 return; 195 } 196 immutable pool_index = bsr(size); 197 immutable index = _mark[pool_index]; 198 if (index<_poolSize[pool_index]) 199 { 200 _pools[pool_index][index] = c; 201 _mark[pool_index]++; 202 debug(nbuff) safe_tracef("freed to pool[%d], next position %d, size(%d)", pool_index, _mark[pool_index], c.length); 203 } 204 else if (_poolSize[pool_index] < MaxPoolWidth) 205 { 206 // double size 207 debug(nbuff) safe_tracef("double pool %d from %d (requested index=%d)", pool_index, _poolSize[pool_index], index); 208 auto ok = allocator.expandArray!(ubyte[])(_pools[pool_index], _poolSize[pool_index]); 209 if ( !ok ) 210 { 211 throw MemPoolException; 212 } 213 _poolSize[pool_index] *= 2; 214 _pools[pool_index][index] = c; 215 _mark[pool_index]++; 216 debug(nbuff) safe_tracef("pool %d doubled to %d", pool_index, _poolSize[pool_index]); 217 } 218 else 219 { 220 allocator.deallocate(c); 221 } 222 } 223 } 224 225 226 @("MemPool") 227 @safe @nogc unittest 228 { 229 MemPool __mempool; 230 //globalLogLevel = LogLevel.info; 231 for(int i=0;i<MemPool.IndexLimit;i++) 232 { 233 __mempool._poolSize[i] = 1024; 234 __mempool._pools[i] = Mallocator.instance.makeArray!(ubyte[])(1024); 235 } 236 237 for(size_t size=128;size<=64*1024; size = size + size/2) 238 { 239 auto m = __mempool.alloc(size); 240 __mempool.free(m, size); 241 } 242 243 for(size_t size=128;size<=64*1024; size = size + size/2) 244 { 245 auto m = __mempool.alloc(size); 246 __mempool.free(m, size); 247 } 248 auto m = __mempool.alloc(128); 249 copy("abcd".representation, m); 250 __mempool.free(m, 128); 251 ubyte[][64*1024] cip; 252 for(int c=0;c<128;c++) 253 { 254 for(int i=0;i<64*1024;i++) 255 { 256 cip[i] = __mempool.alloc(i); 257 } 258 for(int i=0;i<64*1024;i++) 259 { 260 __mempool.free(cip[i],i); 261 } 262 for(int i=0;i<64*1024;i++) 263 { 264 cip[i] = __mempool.alloc(i); 265 } 266 for(int i=0;i<64*1024;i++) 267 { 268 __mempool.free(cip[i],i); 269 } 270 } 271 } 272 273 struct SmartPtr(T) 274 { 275 private struct Impl 276 { 277 T _object; 278 int _count; 279 alias _object this; 280 } 281 public 282 { 283 Impl* _impl; 284 } 285 this(Args...)(auto ref Args args, string file = __FILE__, int line = __LINE__) @trusted 286 { 287 import std.functional: forward; 288 //_impl = allocator.make!(Impl)(T(args),1); 289 _impl = cast(typeof(_impl)) allocator.allocate(Impl.sizeof); 290 _impl._count = 1; 291 emplace(&_impl._object, forward!args); 292 debug(nbuff) safe_tracef("emplaced", _count); 293 } 294 this(this) 295 { 296 if (_impl) inc; 297 } 298 string toString() 299 { 300 return "(rc: %d, object: %s)".format(_impl?_impl._count:0, _impl?_impl._object.to!string():"none"); 301 } 302 void construct() @trusted 303 { 304 if (_impl) rel; 305 _impl = cast(typeof(_impl)) allocator.allocate(Impl.sizeof); 306 _impl._count = 1; 307 emplace(&_impl._object); 308 // _impl = allocator.make!(Impl)(T.init,1); 309 } 310 ~this() 311 { 312 if (_impl is null) 313 { 314 return; 315 } 316 if (dec == 0) 317 { 318 () @trusted {dispose(allocator, _impl);}(); 319 } 320 } 321 void opAssign(ref typeof(this) other, string file = __FILE__, int line = __LINE__) 322 { 323 if (_impl == other._impl) 324 { 325 return; 326 } 327 if (_impl) 328 { 329 rel; 330 } 331 _impl = other._impl; 332 if (_impl) 333 { 334 inc; 335 } 336 } 337 private void inc() @safe @nogc 338 { 339 _impl._count++; 340 } 341 private auto dec() @safe @nogc 342 { 343 return --_impl._count; 344 } 345 private void rel() @trusted 346 { 347 if ( _impl is null ) 348 { 349 return; 350 } 351 if (dec == 0) 352 { 353 dispose(allocator, _impl); 354 } 355 } 356 alias _impl this; 357 } 358 359 auto smart_ptr(T, Args...)(Args args) 360 { 361 return SmartPtr!(T)(args); 362 } 363 364 @("smart_ptr") 365 @safe 366 @nogc 367 unittest 368 { 369 struct S 370 { 371 int i; 372 this(int v) 373 { 374 i = v; 375 } 376 void set(int v) 377 { 378 i = v; 379 } 380 } 381 safe_tracef("test"); 382 auto ptr0 = smart_ptr!S(1); 383 assert(ptr0._impl._count == 1); 384 assert(ptr0.i == 1); 385 ptr0.set(2); 386 assert(ptr0.i == 2); 387 SmartPtr!S ptr1; 388 ptr1.construct(); 389 assert(ptr1._impl._count == 1); 390 SmartPtr!S ptr2 = ptr0; 391 assert(ptr0._impl._count == 2); 392 ptr2 = ptr1; 393 assert(ptr2._impl._count == 2); 394 } 395 396 struct UniquePtr(T) 397 { 398 private struct Impl 399 { 400 T _object; 401 alias _object this; 402 } 403 @disable this(this); // only move 404 private 405 { 406 Impl* _impl; 407 } 408 this(Args...)(auto ref Args args) @safe 409 { 410 auto v = T(args); 411 _impl = allocator.make!(Impl)(); 412 swap(v, _impl._object); 413 } 414 ~this() @trusted @nogc 415 { 416 if ( _impl) 417 { 418 dispose(allocator, _impl); 419 _impl = null; 420 } 421 } 422 private void rel() @trusted 423 { 424 if ( _impl is null ) 425 { 426 return; 427 } 428 dispose(allocator, _impl); 429 _impl = null; 430 } 431 void release() 432 { 433 rel; 434 } 435 void borrow(ref typeof(this) other) 436 { 437 if ( _impl ) 438 { 439 rel; 440 } 441 swap(_impl,other._impl); 442 } 443 bool isNull() @safe @nogc nothrow 444 { 445 return _impl is null; 446 } 447 auto opDispatch(string op, Args...)(Args args) 448 { 449 mixin("return _impl._object.%s;".format(op)); 450 } 451 alias _impl this; 452 } 453 454 auto unique_ptr(T, Args...)(Args args) 455 { 456 return UniquePtr!(T)(args); 457 } 458 459 @("unique_ptr") 460 @safe 461 @nogc 462 unittest 463 { 464 static struct S 465 { 466 int i; 467 } 468 UniquePtr!S ptr0 = UniquePtr!S(1); 469 assert(ptr0.i == 1); 470 UniquePtr!S ptr1; 471 ptr1.borrow(ptr0); 472 assert(ptr1.i == 1); 473 auto ptr2 = unique_ptr!S(2); 474 assert(ptr2.i == 2); 475 ptr2.release(); 476 } 477 478 struct MutableMemoryChunk 479 { 480 debug(nbuff) static int __id; 481 private 482 { 483 ubyte[] _data; 484 size_t _size; 485 } 486 debug(nbuff) public 487 { 488 int _id; 489 } 490 @disable this(this); 491 492 this(size_t s) @safe @nogc 493 { 494 if (s<=MemPool.MaxSize) 495 { 496 debug(nbuff) _id = __id++; 497 debug(nbuff) tracef("alloc from pool %d\n", _id); 498 _data = _mempool.alloc(s); 499 } 500 else 501 { 502 _data = allocator.makeArray!ubyte(s); 503 } 504 _size = s; 505 } 506 ~this() @safe @nogc 507 { 508 debug(nbuff) safe_tracef("destroy: %s, %s", _data, _size); 509 if ( _size == 0 ) 510 { 511 return; 512 } 513 if ( _size <= MemPool.MaxSize) 514 { 515 _mempool.free(_data, _size); 516 } 517 else 518 { 519 () @trusted {dispose(allocator, _data);}(); 520 } 521 } 522 private immutable(ubyte[]) consume() @system @nogc 523 { 524 auto v = assumeUnique(_data); 525 _data = null; 526 _size = 0; 527 return v; 528 } 529 530 auto size() pure inout @safe @nogc nothrow 531 { 532 return _size; 533 } 534 auto data() pure inout @system @nogc nothrow 535 { 536 return _data; 537 } 538 alias _data this; 539 } 540 541 @("MutableMemoryChunk") 542 unittest 543 { 544 import std.stdio; 545 import std.array; 546 { 547 auto c = MutableMemoryChunk(16); 548 auto data = c.data(); 549 auto size = c.size(); 550 data[0] = 1; 551 data[1..5] = [2, 3, 4, 5]; 552 assert(c.data[0] == 1); 553 ubyte[128] payload = 2; 554 data = data ~ payload; // you can append but this do not change anything for c 555 assert(c.data[0] == 1 && c.size() == size); 556 //copy(payload.array, c.data); XXX check 557 assert(c.data[0] == 1 && c.size() == size); 558 auto v = c.consume(); 559 assert(equal(v[0..5], [1,2,3,4,5])); 560 _mempool.free(cast(ubyte[])v, size); 561 } 562 { 563 auto c = MutableMemoryChunk(16); 564 auto data = c.data(); 565 auto size = c.size(); 566 data[0] = 1; 567 data[1..5] = [2, 3, 4, 5]; 568 assert(c.data[0] == 1); 569 ubyte[128] payload = 2; 570 data = data ~ payload; // you can append but this do not change anything for c 571 assert(c.data[0] == 1 && c.size() == size); 572 //copy(payload.array, c.data); XXX check 573 assert(c.data[0] == 1 && c.size() == size); 574 auto v = c.consume(); 575 assert(equal(v[0..5], [1,2,3,4,5])); 576 _mempool.free(cast(ubyte[])v, size); 577 } 578 auto mutmemptr = unique_ptr!MutableMemoryChunk(16); 579 assert(mutmemptr.size==16); 580 } 581 582 struct ImmutableMemoryChunk 583 { 584 debug(nbuff) public 585 { 586 int _id = -1; 587 } 588 private 589 { 590 immutable(ubyte[]) _data; 591 immutable size_t _size; 592 } 593 594 @disable this(this); 595 596 this(ref MutableMemoryChunk c) @trusted @nogc 597 { 598 // trusted because 599 // 1. Chunk have disabled copy constructor so we have single copy of memory under chunk 600 // 2. user can't change data location 601 _size = c.size; 602 _data = assumeUnique(c.consume()); 603 debug(nbuff) _id = c._id; 604 } 605 this(string s) @safe @nogc 606 { 607 () @trusted { 608 GC.addRange(&this._data, _data.sizeof); 609 }(); 610 _data = s.representation(); 611 _size = 0; 612 } 613 this(immutable(ubyte)[] s) @safe @nogc 614 { 615 () @trusted { 616 GC.addRange(&this._data, _data.sizeof); 617 }(); 618 _data = s; 619 _size = 0; 620 } 621 ~this() @trusted @nogc 622 { 623 // trusted because ...see constructor 624 if ( _data !is null && _size == 0) 625 { 626 GC.removeRange(&this._data); 627 } 628 if ( _data is null || _size == 0 ) 629 { 630 return; 631 } 632 if (_size <= MemPool.MaxSize) 633 { 634 debug(nbuff) safe_printf("return mem to pool %d\n", _id); 635 _mempool.free(cast(ubyte[])_data, _size); 636 } 637 else 638 { 639 debug(nbuff) safe_printf("disposing large buffer\n"); 640 dispose(allocator, _data.ptr); 641 } 642 } 643 string toString() 644 { 645 return "[%(%0.2x,%)]".format(_data); 646 } 647 auto size() pure inout @safe @nogc nothrow 648 { 649 return _size; 650 } 651 auto data() pure inout @safe @nogc nothrow 652 { 653 return _data; 654 } 655 alias _data this; 656 } 657 658 @("ImmutableMemoryChunk") 659 unittest 660 { 661 import std.traits; 662 MutableMemoryChunk c = MutableMemoryChunk(16); 663 c.data[0..8] = [0, 1, 2, 3, 4, 5, 6, 7]; 664 ImmutableMemoryChunk ic = ImmutableMemoryChunk(c); 665 assert(equal(ic.data[0..8], [0, 1, 2, 3, 4, 5, 6, 7])); 666 667 assert(!__traits(compiles, {ic._data[0] = 2;})); 668 assert(!__traits(compiles, {ic._data ~= "123".representation;})); 669 670 auto d = ic.data; 671 assert(!__traits(compiles, {d[0] = 2;})); 672 assert(!__traits(compiles, {d ~= "123".representation;})); 673 c = MutableMemoryChunk(16); 674 auto imc = SmartPtr!ImmutableMemoryChunk(c); 675 } 676 677 struct NbuffChunk 678 { 679 680 private 681 { 682 size_t _beg, _end; 683 SmartPtr!(ImmutableMemoryChunk) _memory; 684 } 685 // ~this() @trusted @nogc 686 // { 687 // if (_memory) 688 // { 689 // debug(nbuff) printf("on ~NbuffChunk: %d\n", _memory._count); 690 // debug(nbuff) printf("on ~NbuffChunk: %s\n", toStringz(dump())); 691 // } 692 // } 693 this(ref UniquePtr!MutableMemoryChunk c, size_t l) @safe @nogc 694 { 695 _memory = SmartPtr!ImmutableMemoryChunk(c._impl._object); 696 _end = l; 697 c.release; 698 } 699 this(string s) @safe @nogc 700 { 701 _memory = SmartPtr!ImmutableMemoryChunk(s); 702 _end = s.length; 703 } 704 this(immutable(ubyte)[] s) @safe @nogc 705 { 706 _memory = SmartPtr!ImmutableMemoryChunk(s); 707 _end = s.length; 708 } 709 string dump() inout @safe 710 { 711 import std.range: chunks; 712 import std.ascii; 713 714 int p; 715 string res = "▌%-72.72s▐\n".format("_beg=%d _end=%d _size=%d".format(_beg, _end,_memory._impl._size)); 716 foreach(c; chunks(_memory._impl._object[0.._end], 16)) 717 { 718 res ~= "▌%5.5d ".format(p); 719 foreach(s; c) 720 { 721 if (p == _beg) 722 { 723 res ~= "▛%02.2x".format(s); 724 } 725 else 726 { 727 res ~= " %02.2x".format(s); 728 } 729 p++; 730 } 731 if (p == _end) 732 { 733 res ~= "▟"; 734 } 735 if (c.length < 16) 736 { 737 res ~= repeat("◦◦", (16 - c.length)).join(" "); 738 } 739 if ( p == _end && p % 16 == 0 ) 740 { 741 res ~= " "; 742 } 743 else 744 { 745 res ~= " "; 746 } 747 foreach(s; c) 748 { 749 if ( isPrintable(s) ) 750 { 751 res ~= "%c".format(cast(char)s); 752 } 753 else 754 { 755 if (s == 13) 756 { 757 res ~= "⇦"; 758 } 759 else if ( s == 10 ) 760 { 761 res ~= "⇓"; 762 } 763 else 764 { 765 res ~= "."; 766 } 767 } 768 } 769 if (c.length < 16) 770 { 771 res ~= repeat("◦", (16 - c.length)).join(); 772 } 773 res ~= "▐\n"; 774 } 775 return res; 776 } 777 string toString() inout @trusted 778 { 779 if ( _memory._impl is null) 780 { 781 return null; 782 } 783 return cast(string)data().idup; 784 } 785 string toLower() @safe 786 { 787 // trusted as data do not leave scope 788 return this.toString().toLower; 789 } 790 string toUpper() @safe 791 { 792 // trusted as data do not leave scope 793 return this.toString.toUpper(); 794 } 795 public auto size() pure inout nothrow @safe @nogc 796 { 797 return _memory._size; 798 } 799 public auto length() @safe @nogc inout 800 { 801 return _end - _beg; 802 } 803 public auto data() inout @system @nogc 804 { 805 return _memory._impl._object[_beg.._end]; 806 } 807 void opAssign(T)(auto ref T other, string file = __FILE__, int line = __LINE__) @safe @nogc 808 { 809 if (this is other) 810 { 811 return; 812 } 813 _memory = other._memory; 814 _beg = other._beg; 815 _end = other._end; 816 } 817 818 auto opIndex(size_t index) 819 { 820 if (index >= _end - _beg) 821 { 822 throw NbuffError; 823 } 824 return _memory._impl._object[_beg + index]; 825 } 826 auto opIndex() 827 { 828 return save(); 829 } 830 NbuffChunk opSlice(size_t start, size_t end) @safe @nogc 831 { 832 assert(start<=length && end<=length); 833 auto res = save(); 834 res._beg += start; 835 res._end = res._beg + end - start; 836 return res; 837 } 838 auto opEquals(const(ubyte)[] b) 839 { 840 return b == _memory._object[_beg.._end]; 841 } 842 auto opDollar() 843 { 844 return _end - _beg; 845 } 846 bool empty() pure @nogc @safe 847 { 848 return _beg == _end; 849 } 850 auto front() @safe @nogc 851 { 852 return _memory._impl._object[_beg]; 853 } 854 auto back() @safe @nogc 855 { 856 return _memory._impl._object[_end - 1]; 857 } 858 void popFront() @safe @nogc 859 { 860 assert(_beg < _end); 861 _beg++; 862 } 863 void popFrontN(size_t n) @safe @nogc 864 { 865 debug(nbuff) safe_tracef("popn %d of %d", n, length); 866 assert(n <= length); 867 _beg += n; 868 } 869 void popBack() @safe @nogc 870 { 871 assert(_beg < _end); 872 _end--; 873 } 874 void popBackN(size_t n) @safe @nogc 875 { 876 assert(n <= length); 877 _end -= length; 878 } 879 NbuffChunk save() @safe @nogc 880 { 881 NbuffChunk c; 882 c._beg = _beg; 883 c._end = _end; 884 c._memory = _memory; 885 return c; 886 } 887 } 888 889 // class NbuffError: Exception 890 // { 891 // this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 892 // { 893 // super(msg, file, line); 894 // } 895 // } 896 alias MutableNbuffChunk = UniquePtr!MutableMemoryChunk; 897 /// Smart buffer. 898 /// Usage scenario: 899 /// You are reading bulk newline delimited lines (or any other way structured data) from TCP socket and process it as soon as possible. 900 /// Every time you received something like: 901 /// 902 /// 'line1\nline2\nli' <- note incomplete last line. 903 /// 904 /// from network to your socket buffer you can process 'line1' and 'line2' and then you have keep whole buffer (or copy 905 /// and save it incomplete part 'li') just because you have some incomplete data. 906 /// 907 /// This leads to unnecessary allocations and data movement (if you choose to free old buffer and save incomplete part) 908 /// or memory wasting (if you choose to preallocate very large buffer and keep processed and incomplete data). 909 /// 910 /// Nbuff solve this problem by using memory pool and smart pointers - it take memory chunks from pool for reading 911 /// from network(file, etc...), and authomatically return buffer to pool as soon as you processed all data in it and moved 912 /// 'processed' pointer forward. 913 /// 914 /// So Nbuff looks like some "window" on the list of buffers, filled with network data, and as soon as buffer moves out of this 915 /// window and dereferenced it will be automatically returned to memory pool. Please note - there is no GC allocations, everything 916 /// is done usin malloc. 917 /// 918 /// Here is sample of buffer lifecycle (see code or docs for exact function signatures): 919 /// Nbuff nbuff; - initialize nbuff structure. 920 /// buffer = Nbuff.get(bsize) - gives you non-copyable mutable buffer of size >= bsize 921 /// socket.read(buffer.data) - fill buffer with network data. 922 /// nbuff.append(buffer) - convert mutable non-copyable buffer to immutable shareable buffer and append it to nbuff 923 /// valuable_data = nbuff.data(0, 100); - get immutable view to first 100 bytes of nbuff (they can be non-continous) 924 /// nbuff.pop(100) - release fist 100 bytes of nbuff, marking them as "processed". 925 /// If there are any previously appended buffers which become unreferenced at this point, then they will be 926 /// returned to the pool. 927 /// When nbuff goes out of scope all its buffers will be returned to pool. 928 /// 929 struct Nbuff 930 { 931 private 932 { 933 enum ChunksPerPage = 8; 934 struct Page 935 { 936 NbuffChunk[ChunksPerPage] _chunks; 937 Page* _next; 938 } 939 size_t _length; 940 size_t _endChunkIndex; 941 size_t _begChunkIndex; 942 Page _pages; 943 } 944 945 invariant 946 { 947 assert(_endChunkIndex == _begChunkIndex || _length > 0, "%d, %d, length = %s".format(_begChunkIndex, _endChunkIndex, _length)); 948 } 949 950 /// 951 /// copy references to RC-data 952 /// 953 this(this) @nogc @safe 954 { 955 // copy only allocated pages 956 Page* new_pages, last_new_page; 957 auto p = _pages._next; 958 while(p) 959 { 960 auto new_page = () @trusted {return allocator.make!Page();}(); 961 new_page._chunks = p._chunks; 962 if ( new_pages is null) 963 { 964 new_pages = new_page; 965 last_new_page = new_pages; 966 } 967 else 968 { 969 last_new_page._next = new_page; 970 last_new_page = new_page; 971 } 972 p = p._next; 973 } 974 _pages._next = new_pages; 975 } 976 977 this(string s) @nogc @safe 978 { 979 append(s); 980 } 981 this(immutable(ubyte)[] s) @nogc @safe 982 { 983 auto c = NbuffChunk(s); 984 append(c); 985 } 986 /// 987 /// copy references to RC-data 988 /// 989 void opAssign(T)(auto ref T other) @safe @nogc 990 { 991 if ( other is this ) 992 { 993 return; 994 } 995 // release everything `this` holds 996 auto p = _pages._next; 997 while(p) 998 { 999 auto next = p._next; 1000 () @trusted {dispose(allocator, p);}(); 1001 p = next; 1002 } 1003 // copy info from other to this 1004 _length = other._length; 1005 _begChunkIndex = other._begChunkIndex; 1006 _endChunkIndex = other._endChunkIndex; 1007 _pages._chunks = other._pages._chunks; 1008 _pages._next = null; 1009 if ( empty || ( _begChunkIndex < ChunksPerPage && _endChunkIndex < ChunksPerPage)) 1010 { 1011 return; 1012 } 1013 // copy only allocated pages 1014 Page* new_pages, last_new_page; 1015 p = other._pages._next; 1016 while(p) 1017 { 1018 auto new_page = () @trusted {return allocator.make!Page();}(); 1019 //assert(!new_page); 1020 new_page._chunks = p._chunks; 1021 if ( new_pages is null) 1022 { 1023 new_pages = new_page; 1024 last_new_page = new_pages; 1025 } 1026 else 1027 { 1028 last_new_page._next = new_page; 1029 last_new_page = new_page; 1030 } 1031 p = p._next; 1032 } 1033 _pages._next = new_pages; 1034 } 1035 1036 /// 1037 /// dispose all allocated pages and dec refs for any data 1038 /// 1039 ~this() @safe @nogc 1040 { 1041 auto p = _pages._next; 1042 while(p) 1043 { 1044 auto next = p._next; 1045 () @trusted {dispose(allocator, p);}(); 1046 p = next; 1047 } 1048 } 1049 1050 void clear() @nogc @safe 1051 { 1052 _length = 0; 1053 _begChunkIndex = _endChunkIndex = 0; 1054 auto p = _pages._next; 1055 while(p) 1056 { 1057 auto next = p._next; 1058 () @trusted {dispose(allocator, p);}(); 1059 p = next; 1060 } 1061 _pages = Page(); 1062 } 1063 1064 string toString() @safe 1065 { 1066 return this.data.toString(); 1067 } 1068 string dump() @safe 1069 { 1070 string[] res; 1071 res ~= "▛%s▜".format(repeat("▀",72).join()); 1072 res ~= "▌%-72.72s▐".format("_length = %d".format(_length)); 1073 res ~= "▌%-72.72s▐".format("_begIndex = %d".format(_begChunkIndex)); 1074 res ~= "▌%-72.72s▐".format("_endIndex = %d".format(_endChunkIndex)); 1075 auto p = chunkIndexToPage(_begChunkIndex); 1076 auto chunkIndex = _begChunkIndex % ChunksPerPage; 1077 auto i = _begChunkIndex; 1078 while(i<_endChunkIndex) 1079 { 1080 res ~= "▌%-72.72s▐".format("chunk %d ".format(i)); 1081 res ~= p._chunks[chunkIndex].dump()[0..$-1]; 1082 i++; 1083 chunkIndex++; 1084 if (chunkIndex>=ChunksPerPage) 1085 { 1086 p = p._next; 1087 res ~= "▌%-72.72s▐".format("Page %x".format(p)); 1088 chunkIndex = 0; 1089 } 1090 } 1091 res ~= "▙%s▟".format(repeat("▄",72).join()); 1092 return res.join("\n"); 1093 } 1094 1095 static auto get(size_t size) @safe @nogc 1096 { 1097 // take memory from pool 1098 return MutableNbuffChunk(size); 1099 } 1100 1101 bool empty() pure inout nothrow @safe @nogc 1102 { 1103 return _endChunkIndex == _begChunkIndex; 1104 } 1105 1106 auto length() pure nothrow @nogc @safe inout 1107 { 1108 return _length; 1109 } 1110 1111 void append(string s) @safe @nogc 1112 { 1113 debug(nbuff) safe_tracef("append NbuffChunk"); 1114 if (s.length==0) 1115 { 1116 throw NbuffError; 1117 } 1118 _length += s.length; 1119 if ( _endChunkIndex < ChunksPerPage) 1120 { 1121 _pages._chunks[_endChunkIndex++] = NbuffChunk(s); 1122 return; 1123 } 1124 1125 Page* last_page = &_pages; 1126 auto pi = _endChunkIndex - ChunksPerPage; 1127 debug(nbuff) safe_tracef("pi: %d", pi); 1128 debug(nbuff) safe_tracef("last_page.next = %x", last_page._next); 1129 while(pi >= ChunksPerPage) 1130 { 1131 last_page = last_page._next; 1132 pi -= ChunksPerPage; 1133 } 1134 assert(0 <= pi && pi < ChunksPerPage ); 1135 debug(nbuff) safe_tracef("last_page.next = %x, pi=%d", last_page._next, pi); 1136 if (last_page._next is null) 1137 { 1138 // have to create new page 1139 debug(nbuff) safe_tracef("create new page"); 1140 last_page._next = allocator.make!Page(); 1141 } 1142 last_page._next._chunks[pi] = NbuffChunk(s); 1143 _endChunkIndex++; 1144 } 1145 1146 void append(ref UniquePtr!(MutableMemoryChunk) c, size_t l) @safe @nogc 1147 { 1148 debug(nbuff) safe_tracef("append NbuffChunk"); 1149 if (l==0) 1150 { 1151 throw NbuffError; 1152 } 1153 _length += l; 1154 if ( _endChunkIndex < ChunksPerPage) 1155 { 1156 // just convert it to immutable chunk and store it 1157 _pages._chunks[_endChunkIndex++] = NbuffChunk(c,l); 1158 return; 1159 } 1160 // go to last page and store chunk there 1161 Page* last_page = &_pages; 1162 auto pi = _endChunkIndex - ChunksPerPage; 1163 debug(nbuff) safe_tracef("pi: %d", pi); 1164 debug(nbuff) safe_tracef("last_page.next = %x", last_page._next); 1165 while(pi >= ChunksPerPage) 1166 { 1167 last_page = last_page._next; 1168 pi -= ChunksPerPage; 1169 } 1170 assert(0 <= pi && pi < ChunksPerPage ); 1171 debug(nbuff) safe_tracef("last_page.next = %x, pi=%d", last_page._next, pi); 1172 if (last_page._next is null) 1173 { 1174 // have to create new page 1175 debug(nbuff) safe_tracef("create new page"); 1176 last_page._next = allocator.make!Page(); 1177 } 1178 last_page._next._chunks[pi] = NbuffChunk(c,l); 1179 _endChunkIndex++; 1180 } 1181 1182 void append(ref NbuffChunk source) @safe @nogc 1183 { 1184 append(source, 0, source.length); 1185 } 1186 /// append some part of other nbuff 1187 void append(ref NbuffChunk source, size_t pos, size_t len) @safe @nogc 1188 { 1189 if (len==0) 1190 { 1191 throw NbuffError; 1192 } 1193 _length += len; 1194 if ( _endChunkIndex < ChunksPerPage) 1195 { 1196 _pages._chunks[_endChunkIndex]._memory = source._memory; 1197 _pages._chunks[_endChunkIndex]._beg = source._beg + pos; 1198 _pages._chunks[_endChunkIndex]._end = source._beg + pos + len; 1199 _endChunkIndex++; 1200 return; 1201 } 1202 Page* last_page = &_pages; 1203 auto pi = _endChunkIndex - ChunksPerPage; 1204 debug(nbuff) safe_tracef("pi: %d", pi); 1205 debug(nbuff) safe_tracef("last_page.next = %x", last_page._next); 1206 while(pi >= ChunksPerPage) 1207 { 1208 last_page = last_page._next; 1209 pi -= ChunksPerPage; 1210 } 1211 assert(0 <= pi && pi < ChunksPerPage ); 1212 debug(nbuff) safe_tracef("last_page.next = %x, pi=%d", last_page._next, pi); 1213 if (last_page._next is null) 1214 { 1215 // have to create new page 1216 debug(nbuff) safe_tracef("create new page"); 1217 last_page._next = allocator.make!Page(); 1218 } 1219 last_page._next._chunks[pi] = source; 1220 last_page._next._chunks[pi]._beg += pos; 1221 last_page._next._chunks[pi]._end = last_page._next._chunks[pi]._beg+len; 1222 _endChunkIndex++; 1223 } 1224 /// return first stored chunk 1225 auto frontChunk() @safe @nogc 1226 { 1227 if ( empty ) 1228 { 1229 assert(0, "You are looking front chunk in empty Nbuff"); 1230 } 1231 Page *p = chunkIndexToPage(_begChunkIndex); 1232 int i = _begChunkIndex % ChunksPerPage; 1233 return p._chunks[i]; 1234 } 1235 /// throw away first chunk 1236 void popChunk() @safe @nogc 1237 { 1238 if ( empty ) 1239 { 1240 assert(0, "You are trying to pop chunk from empty Nbuff"); 1241 } 1242 if ( _begChunkIndex < ChunksPerPage) 1243 { 1244 _length -= _pages._chunks[_begChunkIndex].length; 1245 _pages._chunks[_begChunkIndex++] = NbuffChunk(); 1246 return; 1247 } 1248 1249 Page* last_page = &_pages; 1250 debug(nbuff) safe_tracef("popping chunk %d", _begChunkIndex); 1251 auto pi = _begChunkIndex - ChunksPerPage; 1252 while(pi >= ChunksPerPage) 1253 { 1254 last_page = last_page._next; 1255 pi -= ChunksPerPage; 1256 } 1257 debug(nbuff) safe_tracef("popping chunk - on page index = %d", pi); 1258 assert(0 <= pi && pi < ChunksPerPage ); 1259 _length -= _pages._chunks[pi].length; 1260 last_page._next._chunks[pi] = NbuffChunk(); 1261 _begChunkIndex++; 1262 if (empty) 1263 { 1264 debug(nbuff) safe_tracef("nbuff become empty"); 1265 // _endChunkIndex = _begChunkIndex = 0; 1266 clear(); 1267 return; 1268 } 1269 if (pi == ChunksPerPage - 1) 1270 { 1271 // we popped last chunk on page - it is completely free 1272 auto pageToDispose = last_page._next; 1273 debug(nbuff) safe_tracef("last chunk were popped from this page and nbuff is not empty, lp: %x, ptd: %x", last_page, pageToDispose); 1274 last_page._next = pageToDispose._next; 1275 () @trusted {dispose(allocator, pageToDispose);}(); 1276 // adjust indexes 1277 _begChunkIndex -= ChunksPerPage; 1278 _endChunkIndex -= ChunksPerPage; 1279 } 1280 } 1281 /// pop n bytes from nbuff 1282 void pop(long n=1) @safe @nogc 1283 { 1284 assert(n <= _length); 1285 if (n == _length) 1286 { 1287 clear(); 1288 return; 1289 } 1290 auto toPop = n; 1291 while(toPop > 0) 1292 { 1293 if ( empty ) 1294 { 1295 assert(0, "You are trying to pop chunk from empty Nbuff"); 1296 } 1297 1298 auto page = chunkIndexToPage(_begChunkIndex); 1299 auto index = _begChunkIndex % ChunksPerPage; 1300 page._chunks[index]._beg++; 1301 _length--; 1302 toPop--; 1303 assert(_length >= 0); 1304 if (page._chunks[index]._beg == page._chunks[index]._end) 1305 { 1306 // empty 1307 debug(nbuff) tracef("dispose chunk %d", index); 1308 page._chunks[index] = NbuffChunk(); 1309 if ( index == ChunksPerPage - 1 && _begChunkIndex>=ChunksPerPage) // last chunk on page is free 1310 { 1311 // release page 1312 debug(nbuff) tracef("dispose page"); 1313 auto page_prev = chunkIndexToPage(_begChunkIndex - ChunksPerPage); 1314 page_prev._next = page._next; 1315 () @trusted {dispose(allocator, page);}(); 1316 _begChunkIndex++; 1317 _begChunkIndex -= ChunksPerPage; 1318 _endChunkIndex -= ChunksPerPage; 1319 } 1320 else 1321 { 1322 _begChunkIndex++; 1323 debug(nbuff) tracef("new _begChunkIndex %d", _begChunkIndex); 1324 } 1325 } 1326 } 1327 } 1328 1329 private auto chunkIndexToPage(ulong index) pure inout @safe @nogc 1330 { 1331 auto skipPages = index / ChunksPerPage; 1332 auto p = &_pages; 1333 while(skipPages>0) 1334 { 1335 p = p._next; 1336 skipPages--; 1337 } 1338 return p; 1339 } 1340 /// 1341 /// return immutable continuous view to [beg, end] of this nbuff. 1342 /// Data copy: 1343 /// We can avoid data copy if [beg, end] lies within single stored immutable chunk. Then we just return ref to this chunk 1344 /// We can't avoid data copy if [beg, end] span several chunks. Then we have to join them. 1345 /// 1346 NbuffChunk data(size_t beg, size_t end) @safe 1347 { 1348 if (beg>end) 1349 { 1350 throw NbuffError; 1351 } 1352 if (end>_length) 1353 { 1354 throw NbuffError; 1355 } 1356 if (beg == end || _length == 0) 1357 { 1358 return NbuffChunk(); 1359 } 1360 auto bytesToSkip = beg; 1361 auto bytesToCopy = end-beg; 1362 auto page = chunkIndexToPage(_begChunkIndex); 1363 auto chunkIndex = _begChunkIndex % ChunksPerPage; 1364 size_t position; 1365 debug(nbuff) safe_tracef("bytesToSkip: %d", bytesToSkip); 1366 while(bytesToSkip>0) 1367 { 1368 auto chunk = &page._chunks[chunkIndex]; 1369 debug(nbuff) safe_tracef("check chunk: [%s..%s](%d)", 1370 chunk._beg, chunk._end, chunk.length); 1371 1372 if (bytesToSkip>=chunk.length) 1373 { 1374 // just skip this chunk 1375 debug(nbuff) safe_tracef("skip it"); 1376 bytesToSkip -= chunk.length; 1377 chunkIndex++; 1378 if (chunkIndex == ChunksPerPage) 1379 { 1380 page = page._next; 1381 chunkIndex = 0; 1382 continue; 1383 } 1384 } 1385 else 1386 { 1387 debug(nbuff) safe_tracef("start from it"); 1388 position = bytesToSkip; 1389 bytesToSkip = 0; 1390 break; 1391 } 1392 } 1393 debug(nbuff) safe_tracef("start index = %d, position = %d", chunkIndex, position); 1394 assert(page !is null); 1395 assert(chunkIndex < ChunksPerPage); 1396 if ( page._chunks[chunkIndex]._beg + position + bytesToCopy <= page._chunks[chunkIndex]._end ) 1397 { 1398 debug(nbuff) safe_tracef("return ref to single chunk"); 1399 // return reference to this chunk only 1400 NbuffChunk res = page._chunks[chunkIndex]; 1401 res._beg += position; 1402 res._end = res._beg + bytesToCopy; 1403 return res; 1404 } 1405 1406 // otherwise join chunks 1407 auto mc = Nbuff.get(bytesToCopy); 1408 size_t bytesCopied = 0; 1409 while(bytesToCopy>0) 1410 { 1411 auto from = page._chunks[chunkIndex]._beg + position; 1412 auto to = page._chunks[chunkIndex]._end; 1413 auto tc = min(bytesToCopy, to - from); 1414 debug(nbuff) safe_tracef("from: %d, to: %d, tocopy=%d", from, to, tc); 1415 mc._impl._object[bytesCopied..bytesCopied+tc] = page._chunks[chunkIndex]._memory._object[from..from+tc]; 1416 bytesCopied += tc; 1417 chunkIndex++; 1418 bytesToCopy -= tc; 1419 position = 0; 1420 if (chunkIndex>=ChunksPerPage) 1421 { 1422 chunkIndex -= ChunksPerPage; 1423 page = page._next; 1424 } 1425 } 1426 return NbuffChunk(mc, bytesCopied); 1427 } 1428 /// 1429 /// return immutable continuous view to this whole nbuff. 1430 /// Data copy: 1431 /// We can avoid data copy if this nbuff store single immutable chunk. Then we just return ref to this chunk 1432 /// We can't avoid data copy if nbuff span several chunks. Then we have to join them. 1433 /// 1434 NbuffChunk data() @safe @nogc 1435 { 1436 if (_length==0) 1437 { 1438 return NbuffChunk(); 1439 } 1440 // ubyte[] result = new ubyte[](_length); 1441 auto skipPages = _begChunkIndex / ChunksPerPage; 1442 int bi = cast(int)_begChunkIndex, ei = cast(int)_endChunkIndex; 1443 Page* p = &_pages; 1444 auto toCopyChunks = ei - bi; 1445 int toCopyBytes = cast(int)_length; 1446 int bytesCopied = 0; 1447 while(skipPages>0) 1448 { 1449 bi -= ChunksPerPage; 1450 ei -= ChunksPerPage; 1451 p = p._next; 1452 skipPages--; 1453 } 1454 if(ei == bi + 1) 1455 { 1456 // there is only chunk in buffer, we can return it 1457 return p._chunks[bi]; 1458 } 1459 assert(bi>=0 && ei>0); 1460 auto mc = Nbuff.get(_length); 1461 while(toCopyChunks>0 && toCopyBytes>0) 1462 { 1463 auto from = p._chunks[bi]._beg; 1464 auto to = p._chunks[bi]._end; 1465 auto tc = to - from; 1466 mc._impl._object[bytesCopied..bytesCopied+tc] = p._chunks[bi]._memory._object[from..to]; 1467 bytesCopied += tc; 1468 bi++; 1469 toCopyBytes -= tc; 1470 toCopyChunks--; 1471 if (bi>=ChunksPerPage) 1472 { 1473 bi -= ChunksPerPage; 1474 ei -= ChunksPerPage; 1475 p = p._next; 1476 } 1477 } 1478 return NbuffChunk(mc, _length); 1479 } 1480 /// 1481 /// return non-contiguous vie to slice of this nbuff. 1482 /// Data copy: none 1483 /// 1484 Nbuff opSlice(size_t start, size_t end) @nogc @safe 1485 { 1486 //debug(nbuff) tracef("slice %d..%d", start, end); 1487 if (start>end) 1488 { 1489 throw NbuffError; 1490 } 1491 if (end>_length) 1492 { 1493 throw NbuffError; 1494 } 1495 if (start==end) 1496 { 1497 return Nbuff(); 1498 } 1499 auto bytesToSkip = start; 1500 auto bytesToCopy = end-start; 1501 auto page = chunkIndexToPage(_begChunkIndex); 1502 auto chunkIndex = _begChunkIndex % ChunksPerPage; 1503 size_t position;// = page._chunks[chunkIndex]._beg; 1504 // debug(nbuff) tracef("start index = %d", chunkIndex); 1505 // debug(nbuff) tracef("bytesToSkip = %d", bytesToSkip); 1506 // debug(nbuff) tracef("bytesToCopy = %d", bytesToCopy); 1507 while(bytesToSkip>0) 1508 { 1509 auto chunk = &page._chunks[chunkIndex]; 1510 //debug(nbuff) tracef("check chunk: [%s..%s](%d)", 1511 // chunk._beg, chunk._end, chunk.length); 1512 1513 if (bytesToSkip>=chunk.length) 1514 { 1515 // just skip this chunk 1516 bytesToSkip -= chunk.length; 1517 chunkIndex++; 1518 if (chunkIndex == ChunksPerPage) 1519 { 1520 page = page._next; 1521 chunkIndex = 0; 1522 continue; 1523 } 1524 } 1525 else 1526 { 1527 position = bytesToSkip; 1528 bytesToSkip = 0; 1529 break; 1530 } 1531 } 1532 assert(page !is null); 1533 assert(chunkIndex < ChunksPerPage); 1534 Nbuff result = Nbuff(); 1535 /// copy references 1536 while(bytesToCopy>0) 1537 { 1538 auto l = min(bytesToCopy, page._chunks[chunkIndex]._end - position - page._chunks[chunkIndex]._beg); 1539 result.append(page._chunks[chunkIndex], position, l); 1540 bytesToCopy -= l; 1541 position = 0; 1542 chunkIndex++; 1543 if (chunkIndex == ChunksPerPage) 1544 { 1545 page = page._next; 1546 chunkIndex = 0; 1547 } 1548 } 1549 return result; 1550 } 1551 1552 auto opDollar() 1553 { 1554 return _length; 1555 } 1556 bool opEquals(string other) pure const @safe @nogc 1557 { 1558 return this == other.representation; 1559 } 1560 bool opEquals(const ubyte[] other) pure const @safe @nogc 1561 { 1562 if (other.length != length) 1563 { 1564 return false; 1565 } 1566 return countUntil(other) == 0; 1567 } 1568 bool opEquals(this R)(auto ref R other) pure @safe @nogc 1569 { 1570 if (this is other) 1571 { 1572 return true; 1573 } 1574 if (_length != other._length) 1575 { 1576 return false; 1577 } 1578 // real comparison 1579 bool equals = true; 1580 size_t to_compare = _length; 1581 Page* page_this = chunkIndexToPage(_begChunkIndex); 1582 Page* page_other = other.chunkIndexToPage(other._begChunkIndex); 1583 size_t chunk_index_this = _begChunkIndex; 1584 size_t chunk_index_other = other._begChunkIndex; 1585 size_t position_this = 0; 1586 size_t position_other = 0; 1587 while(equals && to_compare>0) 1588 { 1589 auto i_this = chunk_index_this % ChunksPerPage; 1590 auto i_other = chunk_index_other % ChunksPerPage; 1591 auto chunk_t = &page_this._chunks[i_this]; 1592 auto chunk_o = &page_other._chunks[i_other]; 1593 assert(position_this < chunk_t._end); 1594 assert(position_other < chunk_o._end); 1595 auto l_t = chunk_t._end - chunk_t._beg - position_this; 1596 auto l_o = chunk_o._end - chunk_o._beg - position_other; 1597 auto l_c = min(l_t, l_o, to_compare); 1598 debug(nbuff) safe_tracef("compare %s and %s", 1599 chunk_t._memory._data[chunk_t._beg + position_this..chunk_t._beg + position_this+l_c], 1600 chunk_o._memory._data[chunk_o._beg + position_other..chunk_o._beg + position_other+l_c]); 1601 equals = equal(chunk_t._memory._data[chunk_t._beg + position_this..chunk_t._beg + position_this+l_c], 1602 chunk_o._memory._data[chunk_o._beg + position_other..chunk_o._beg + position_other+l_c]); 1603 position_this += l_c; 1604 position_other += l_c; 1605 if (l_c == l_t) 1606 { 1607 position_this = 0; 1608 chunk_index_this++; 1609 i_this++; 1610 if (i_this == ChunksPerPage) 1611 { 1612 page_this = page_this._next; 1613 } 1614 } 1615 if (l_c == l_o) 1616 { 1617 position_other = 0; 1618 chunk_index_other++; 1619 i_other++; 1620 if (i_other == ChunksPerPage) 1621 { 1622 page_other = page_other._next; 1623 } 1624 } 1625 debug(nbuff) safe_tracef("compared %d bytes: %s", l_c, equals); 1626 to_compare -= l_c; 1627 } 1628 return equals; 1629 } 1630 1631 auto opIndex(size_t i) @safe @nogc inout 1632 { 1633 if ( i >= _length) 1634 { 1635 throw NbuffError; 1636 } 1637 auto page = chunkIndexToPage(_begChunkIndex); 1638 long chunkIndex = _begChunkIndex; 1639 while(i >= 0) 1640 { 1641 auto ci = chunkIndex % ChunksPerPage; 1642 auto chunk = &page._chunks[ci]; 1643 if (chunk.length > i) 1644 { 1645 return chunk._memory._impl._object[chunk._beg + i]; 1646 } 1647 i -= chunk.length; 1648 chunkIndex++; 1649 if (ci+1 == ChunksPerPage) 1650 { 1651 page = page._next; 1652 } 1653 } 1654 assert(0, "Index larger that length?"); 1655 } 1656 Nbuff[3] findSplitOn(string s, size_t start_from = 0) @safe @nogc 1657 { 1658 return findSplitOn(s.representation, start_from); 1659 } 1660 Nbuff[3] findSplitOn(immutable(ubyte)[] b, size_t start_from = 0) @safe @nogc 1661 { 1662 Nbuff[3] result; 1663 int s = countUntil(b, start_from); 1664 if (s>=0) 1665 { 1666 result[0] = this[0..s]; 1667 result[1] = this[s..s+b.length]; 1668 result[2] = this[s+b.length..$]; 1669 } 1670 return result; 1671 } 1672 bool beginsWith(const ubyte[] b) @safe @nogc 1673 { 1674 if ( length < b.length ) 1675 { 1676 return false; 1677 } 1678 return data()[0..b.length] == b; 1679 } 1680 bool beginsWith(string b) @safe @nogc 1681 { 1682 if ( length < b.length ) 1683 { 1684 return false; 1685 } 1686 return data()[0..b.length] == b.representation; 1687 } 1688 1689 int countUntil(const(ubyte)[] b, size_t start_from = 0) pure inout @safe @nogc 1690 { 1691 if (b.length == 0) 1692 { 1693 throw NbuffError; 1694 } 1695 if (_length == 0) 1696 { 1697 return -1; 1698 } 1699 if (start_from >= _length) 1700 { 1701 throw NbuffError; 1702 } 1703 if (start_from == -1 || _length == 0) 1704 { 1705 return -1; 1706 } 1707 int index = 0; // index is position at which we test pattern 1708 // skip start_from 1709 auto page = chunkIndexToPage(_begChunkIndex); 1710 long chunkIndex = _begChunkIndex; 1711 auto ci = chunkIndex % ChunksPerPage; // ci - index of the chunk on current page 1712 auto chunk = &page._chunks[ci]; 1713 while(start_from > 0) // skip requested number of bytes 1714 { 1715 if (chunk.length > start_from) 1716 { 1717 break; 1718 } 1719 index += chunk.length; 1720 start_from -= chunk.length; 1721 chunkIndex++; 1722 ci++; 1723 if (ci == ChunksPerPage) 1724 { 1725 page = page._next; 1726 ci = 0; 1727 } 1728 chunk = &page._chunks[ci]; 1729 } 1730 // start_from can be > 0 if index points inside chunk 1731 index += start_from; 1732 auto position_this = start_from; // position_this - offset in Nbuff chunk we looking at 1733 auto position_needle = 0; // position_needlse - offset inside pattern 1734 1735 debug(nbuff) safe_tracef("Start search from chunk index: %s, position: %s", chunkIndex, position_this); 1736 immutable needle_length = b.length; 1737 while(index + needle_length <= _length) 1738 { 1739 size_t to_compare = needle_length; 1740 debug(nbuff) safe_tracef("test from this[%s]=%02x, compare len=%d", index, this[index], to_compare); 1741 // all 'local' variables used as we might overlap chunk 1742 auto local_chunkIndex = chunkIndex; 1743 auto local_position_this = position_this; 1744 auto local_position_needle = position_needle; 1745 auto local_ci = ci; 1746 auto local_chunk = chunk; 1747 auto local_page = page; 1748 while(to_compare>0) 1749 { 1750 auto c_l = min(to_compare, local_chunk.length-local_position_this); // we can comapre only until the end of the chunk 1751 immutable equals = equal( 1752 local_chunk._memory._data[local_chunk._beg+local_position_this..local_chunk._beg+local_position_this+c_l], 1753 b[local_position_needle..local_position_needle+c_l] 1754 ); 1755 if (!equals) 1756 { 1757 break; 1758 } 1759 local_position_this += c_l; 1760 local_position_needle += c_l; 1761 to_compare -= c_l; 1762 if ( to_compare == 0) 1763 { 1764 break; 1765 } 1766 if (local_position_this==chunk.length) 1767 { 1768 debug(nbuff) safe_tracef("continue on next chunk"); 1769 local_position_this = 0; 1770 local_chunkIndex++; 1771 if (local_chunkIndex == _endChunkIndex) 1772 { 1773 break; 1774 } 1775 local_ci++; 1776 if (local_ci==ChunksPerPage) 1777 { 1778 debug(nbuff) safe_tracef("continue on next page"); 1779 local_ci = 0; 1780 local_page = local_page._next; 1781 } 1782 local_chunk = &local_page._chunks[local_ci]; 1783 } 1784 1785 } 1786 if (to_compare == 0) 1787 { 1788 debug(nbuff) safe_tracef("return %d", index); 1789 return index; 1790 } 1791 index++; 1792 debug(nbuff) safe_tracef("start from next position %d", index); 1793 position_this++; 1794 position_needle = 0; 1795 if ( position_this == chunk.length) 1796 { 1797 // switch to next chunk 1798 debug(nbuff) safe_tracef("next chunk"); 1799 position_this = 0; 1800 chunkIndex++; 1801 if (chunkIndex == _endChunkIndex) 1802 { 1803 return -1; 1804 } 1805 ci++; 1806 if (ci == ChunksPerPage) 1807 { 1808 debug(nbuff) safe_tracef("and next page"); 1809 page = page._next; 1810 ci = 0; 1811 } 1812 chunk = &page._chunks[ci]; 1813 } 1814 } 1815 return -1; 1816 } 1817 version(Posix) 1818 { 1819 import core.sys.posix.sys.uio: iovec; 1820 /// build iovec from this nbuff 1821 /// Paramenter v - ref to array of iovec structs 1822 /// n - size of array (must be >0) 1823 /// Return number of actually filled entries; 1824 int toIoVec(iovec* v, int n) @system 1825 { 1826 assert(n>0); 1827 if (empty) 1828 { 1829 return 0; 1830 } 1831 int vi = 0; 1832 auto skipPages = _begChunkIndex / ChunksPerPage; 1833 int bi = cast(int)_begChunkIndex, ei = cast(int)_endChunkIndex; 1834 Page* p = &_pages; 1835 while(skipPages > 0) 1836 { 1837 bi -= ChunksPerPage; 1838 ei -= ChunksPerPage; 1839 p = p._next; 1840 skipPages--; 1841 } 1842 1843 // fill no more than n and no more than we have chunks 1844 int i = bi, j = 0; 1845 for(; j < n && i<ei; j++) 1846 { 1847 auto beg = p._chunks[i]._beg; 1848 void* base = cast(void*)p._chunks[i]._memory._impl._object.ptr + beg; 1849 v[j].iov_base = base; 1850 v[j].iov_len = p._chunks[i].length; 1851 i++; 1852 if (i == ChunksPerPage) 1853 { 1854 i -= ChunksPerPage; 1855 ei -= ChunksPerPage; 1856 p = p._next; 1857 } 1858 } 1859 return j; 1860 } 1861 } 1862 } 1863 1864 1865 @("NbuffChunk") 1866 @safe 1867 unittest 1868 { 1869 import std.range.primitives; 1870 static assert(isInputRange!NbuffChunk); 1871 static assert(isForwardRange!NbuffChunk); 1872 static assert(isBidirectionalRange!NbuffChunk); 1873 static assert(hasLength!NbuffChunk); 1874 static assert(is(typeof(lvalueOf!NbuffChunk[1]) == ElementType!NbuffChunk)); 1875 static assert(isRandomAccessRange!NbuffChunk); 1876 auto c = NbuffChunk("abcd"); 1877 assert(equal(c, "abcd")); 1878 assert(c == "abcd".representation); 1879 assert(equal(c[1..3], "bc")); 1880 assert(c[1..3][0] == 'b'); 1881 assert(c[1..3][$-1] == 'c'); 1882 } 1883 1884 @("Nbuff0") 1885 @nogc 1886 unittest 1887 { 1888 import std.string; 1889 import std.stdio; 1890 Nbuff b; 1891 auto d = b; 1892 auto chunk = Nbuff.get(512); 1893 copy("Abc".representation, chunk.data); 1894 b.append(chunk, 3); 1895 chunk = Nbuff.get(16); 1896 copy("Def".representation, chunk.data); 1897 b.append(chunk, 3); 1898 d = b; 1899 } 1900 1901 @("Nbuff1") 1902 @nogc 1903 unittest 1904 { 1905 import std.string; 1906 { 1907 Nbuff b; 1908 auto chunk = Nbuff.get(11); 1909 copy("Abc".representation, chunk.data); 1910 b.append(chunk, 3); 1911 } 1912 { 1913 Nbuff b; 1914 auto chunk = Nbuff.get(11); 1915 copy("Abc".representation, chunk.data); 1916 b.append(chunk, 3); 1917 } 1918 Nbuff b; 1919 auto d = b; 1920 auto chunk = Nbuff.get(512); 1921 copy("Def".representation, chunk.data); 1922 b.append(chunk, 3); 1923 b.popChunk(); 1924 } 1925 1926 @("Nbuff2") 1927 unittest 1928 { 1929 Nbuff b; 1930 for(int i=1; i <= 2*Nbuff.ChunksPerPage + 1; i++) 1931 { 1932 auto chunk = Nbuff.get(64); 1933 b.append(chunk, i); 1934 } 1935 for(int i=0;i<2*Nbuff.ChunksPerPage; i++) 1936 { 1937 b.popChunk(); 1938 } 1939 b.popChunk(); 1940 Nbuff c; 1941 c = b; 1942 } 1943 1944 @("Nbuff3") 1945 unittest 1946 { 1947 Nbuff b; 1948 auto chunk = Nbuff.get(16); 1949 copy("Hello".representation, chunk.data); 1950 b.append(chunk, 5); 1951 chunk = Nbuff.get(16); 1952 copy(",".representation, chunk.data); 1953 b.append(chunk, 1); 1954 chunk = Nbuff.get(16); 1955 copy(" world!".representation, chunk.data); 1956 b.append(chunk, 7); 1957 auto d = b.data(); 1958 assert(equal(d, "Hello, world!".representation)); 1959 assert(d[0] == 'H'); 1960 assert(d[$-1] == '!'); 1961 } 1962 1963 @("Nbuff4") 1964 unittest 1965 { 1966 Nbuff b; 1967 for(int i=0; i<32; i++) 1968 { 1969 string s = "%s,".format(i); 1970 auto chunk = Nbuff.get(16); 1971 copy(s.representation, chunk.data); 1972 b.append(chunk, s.length); 1973 } 1974 assert(equal(b.data, "0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,")); 1975 } 1976 1977 @("Nbuff5") 1978 unittest 1979 { 1980 Nbuff b; 1981 for(int i=0; i<32; i++) 1982 { 1983 string s = "%s,".format(i); 1984 auto chunk = Nbuff.get(16); 1985 copy(s.representation, chunk.data); 1986 b.append(chunk, s.length); 1987 } 1988 assert(b.length == 86); 1989 b.pop(); 1990 assert(equal(b.data, ",1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,")); 1991 b.pop(); 1992 assert(equal(b.data, "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,")); 1993 b.pop(2); 1994 assert(equal(b.data, "2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,")); 1995 b.pop(34); 1996 assert(equal(b.data, "16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,")); 1997 b.pop(48); 1998 assert(b.length == 0, "b.length=%d".format(b.length)); 1999 } 2000 2001 @("Nbuff6") 2002 unittest 2003 { 2004 Nbuff b, d; 2005 for(int i=0; i<32; i++) 2006 { 2007 string s = "%s,".format(i); 2008 auto chunk = Nbuff.get(16); 2009 copy(s.representation, chunk.data); 2010 b.append(chunk, s.length); 2011 } 2012 b.pop(); 2013 assert(equal(b.data, ",1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,")); 2014 { 2015 auto c = b[2..8]; // ",2,3,4" 2016 assert(equal(",2,3,4", c.data)); 2017 d = c; 2018 } 2019 b.clear(); 2020 for(int i=0; i<99; i++) 2021 { 2022 string s = "%02d,".format(i); 2023 auto chunk = Nbuff.get(16); 2024 copy(s.representation, chunk.data); 2025 b.append(chunk, s.length); 2026 } 2027 auto c = b[45..96]; 2028 assert(equal(c.data, "15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,")); 2029 } 2030 @("Nbuff7") 2031 unittest 2032 { 2033 globalLogLevel = LogLevel.info; 2034 Nbuff b; 2035 for(int i=0; i<32; i++) 2036 { 2037 string s = "%s,".format(i); 2038 auto chunk = Nbuff.get(16); 2039 copy(s.representation, chunk.data); 2040 b.append(chunk, s.length); 2041 } 2042 auto c = b[0..$]; 2043 assert(b==c); 2044 auto d = b[0..10]; 2045 auto e = b[1..11]; 2046 assert(d != e); 2047 Nbuff f, g; 2048 f.append("abc"); 2049 g.append("bc"); 2050 assert(f[1..$] == g); 2051 f.pop(); 2052 assert(f == g); 2053 f.popChunk(); 2054 assert(f.length == 0); 2055 // mix string and chunk appends 2056 f.append("Length: "); 2057 auto chunk = Nbuff.get(16); 2058 copy("100\n".representation, chunk.data); 2059 f.append(chunk, 4); 2060 g.clear(); 2061 g.append("L"); 2062 g.append("ength: 100\n"); 2063 globalLogLevel = LogLevel.trace; 2064 assert(f == g); 2065 // 2066 assert(f[0] == 'L'); 2067 assert(g[1] == 'e'); 2068 assert(g[11] == '\n'); 2069 } 2070 @("Nbuff8") 2071 unittest 2072 { 2073 Nbuff b; 2074 b.append("012"); 2075 b.append("345"); 2076 b.append("678"); 2077 assert(b.countUntil("01".representation, 0)==0); 2078 assert(b.countUntil("12".representation, 0)==1); 2079 assert(b.countUntil("23".representation, 0)==2); 2080 assert(b.countUntil("23".representation, 2)==2); 2081 assert(b.countUntil("345".representation, 2)==3); 2082 assert(b.countUntil("345".representation, 3)==3); 2083 assert(b.countUntil("23456".representation,0) == 2); 2084 b.clear(); 2085 for(int i=0;i<100;i++) 2086 { 2087 b.append("%d".format(i)); 2088 } 2089 assert(b.countUntil("10".representation) == 10); 2090 assert(b.countUntil("90919".representation) == 170); 2091 assert(b.countUntil("99".representation, 170) == 188); 2092 for(int skip=0; skip<=170; skip++) 2093 { 2094 assert(b.countUntil("90919".representation, skip) == 170); 2095 } 2096 } 2097 @("Nbuff9") 2098 unittest 2099 { 2100 globalLogLevel = LogLevel.info; 2101 Nbuff b, line; 2102 b.append("\na\nbb\n"); 2103 auto c = b.countUntil("\n".representation); 2104 b = b[c+1..$]; 2105 assert(b.data == "a\nbb\n".representation); 2106 c = b.countUntil("\n".representation); 2107 line = b[0..c]; 2108 assert(line.data == "a".representation); 2109 2110 b = b[c+1..$]; 2111 assert(b.data == "bb\n".representation); 2112 c = b.countUntil("\n".representation); 2113 b = b[c+1..$]; 2114 b.append("ccc\nrest"); 2115 c = b.countUntil("\n".representation); 2116 b = b[c+1..$]; 2117 c = b.countUntil("\n".representation); 2118 } 2119 2120 @("Nbuff10") 2121 unittest 2122 { 2123 auto c = UniquePtr!MutableMemoryChunk(64); 2124 c.data[0] = 1; 2125 auto n = NbuffChunk("abc"); 2126 } 2127 2128 @("Nbuff11") 2129 unittest 2130 { 2131 globalLogLevel = LogLevel.info; 2132 // init from string 2133 auto b = Nbuff("abc"); 2134 assert(b.length == 3); 2135 assert(b.data.data == NbuffChunk("abc")); 2136 b.pop(); 2137 NbuffChunk c = b.data(0,1); 2138 assert(c.data == "b"); 2139 c = b.data(1,2); 2140 assert(c.data == "c"); 2141 b = Nbuff("abc"); 2142 b.append("def"); 2143 b.append("ghi"); 2144 c = b.data(4,6); 2145 assert(c.data == "ef"); 2146 c = b.data(2,4); 2147 assert(c.data == "cd"); 2148 c = b.data(2,7); 2149 assert(c.data == "cdefg"); 2150 } 2151 2152 version(Posix) 2153 { 2154 @("iovec") 2155 unittest 2156 { 2157 import core.sys.posix.sys.uio: iovec; 2158 Nbuff b; 2159 iovec[64] iov64; 2160 // fill nbuff 2161 for(int k=0;k<32;k++) 2162 { 2163 b.append("%s\n".format(k)); 2164 } 2165 // pop 15 chunks (so we will cross page boundary) 2166 iota(15).each!(_ => b.popChunk()); 2167 auto i = b.toIoVec(&iov64[0], 64); 2168 assert(i == 17); 2169 for(int k=0;k<i;k++) 2170 { 2171 int j = k + 15; 2172 assert("%s\n".format(j) == cast(string)iov64[k].iov_base[0..iov64[k].iov_len]); 2173 //writef("%s:%s", j, cast(string)iov64[k].iov_base[0..iov64[k].iov_len]); 2174 } 2175 2176 } 2177 }