RecyclableMemoryStream.cs 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. // The MIT License (MIT)
  2. //
  3. // Copyright (c) 2015-2016 Microsoft
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in all
  13. // copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  21. // SOFTWARE.
  22. namespace Microsoft.IO
  23. {
  24. using System;
  25. using System.Collections.Generic;
  26. using System.Diagnostics.CodeAnalysis;
  27. using System.IO;
  28. using System.Threading;
  29. /// <summary>
  30. /// MemoryStream implementation that deals with pooling and managing memory streams which use potentially large
  31. /// buffers.
  32. /// </summary>
  33. /// <remarks>
  34. /// This class works in tandem with the RecylableMemoryStreamManager to supply MemoryStream
  35. /// objects to callers, while avoiding these specific problems:
  36. /// 1. LOH allocations - since all large buffers are pooled, they will never incur a Gen2 GC
  37. /// 2. Memory waste - A standard memory stream doubles its size when it runs out of room. This
  38. /// leads to continual memory growth as each stream approaches the maximum allowed size.
  39. /// 3. Memory copying - Each time a MemoryStream grows, all the bytes are copied into new buffers.
  40. /// This implementation only copies the bytes when GetBuffer is called.
  41. /// 4. Memory fragmentation - By using homogeneous buffer sizes, it ensures that blocks of memory
  42. /// can be easily reused.
  43. ///
  44. /// The stream is implemented on top of a series of uniformly-sized blocks. As the stream's length grows,
  45. /// additional blocks are retrieved from the memory manager. It is these blocks that are pooled, not the stream
  46. /// object itself.
  47. ///
  48. /// The biggest wrinkle in this implementation is when GetBuffer() is called. This requires a single
  49. /// contiguous buffer. If only a single block is in use, then that block is returned. If multiple blocks
  50. /// are in use, we retrieve a larger buffer from the memory manager. These large buffers are also pooled,
  51. /// split by size--they are multiples of a chunk size (1 MB by default).
  52. ///
  53. /// Once a large buffer is assigned to the stream the blocks are NEVER again used for this stream. All operations take place on the
  54. /// large buffer. The large buffer can be replaced by a larger buffer from the pool as needed. All blocks and large buffers
  55. /// are maintained in the stream until the stream is disposed (unless AggressiveBufferReturn is enabled in the stream manager).
  56. ///
  57. /// </remarks>
  58. public sealed class RecyclableMemoryStream : MemoryStream
  59. {
  60. private const long MaxStreamLength = Int32.MaxValue;
  61. private static readonly byte[] emptyArray = new byte[0];
  62. /// <summary>
  63. /// All of these blocks must be the same size
  64. /// </summary>
  65. private readonly List<byte[]> blocks = new List<byte[]>(1);
  66. /// <summary>
  67. /// This buffer exists so that WriteByte can forward all of its calls to Write
  68. /// without creating a new byte[] buffer on every call.
  69. /// </summary>
  70. private readonly byte[] byteBuffer = new byte[1];
  71. private readonly Guid id;
  72. private readonly RecyclableMemoryStreamManager memoryManager;
  73. private readonly string tag;
  74. /// <summary>
  75. /// This list is used to store buffers once they're replaced by something larger.
  76. /// This is for the cases where you have users of this class that may hold onto the buffers longer
  77. /// than they should and you want to prevent race conditions which could corrupt the data.
  78. /// </summary>
  79. private List<byte[]> dirtyBuffers;
  80. // long to allow Interlocked.Read (for .NET Standard 1.4 compat)
  81. private long disposedState;
  82. /// <summary>
  83. /// This is only set by GetBuffer() if the necessary buffer is larger than a single block size, or on
  84. /// construction if the caller immediately requests a single large buffer.
  85. /// </summary>
  86. /// <remarks>If this field is non-null, it contains the concatenation of the bytes found in the individual
  87. /// blocks. Once it is created, this (or a larger) largeBuffer will be used for the life of the stream.
  88. /// </remarks>
  89. private byte[] largeBuffer;
  90. /// <summary>
  91. /// Unique identifier for this stream across it's entire lifetime
  92. /// </summary>
  93. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  94. internal Guid Id
  95. {
  96. get
  97. {
  98. this.CheckDisposed();
  99. return this.id;
  100. }
  101. }
  102. /// <summary>
  103. /// A temporary identifier for the current usage of this stream.
  104. /// </summary>
  105. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  106. internal string Tag
  107. {
  108. get
  109. {
  110. this.CheckDisposed();
  111. return this.tag;
  112. }
  113. }
  114. /// <summary>
  115. /// Gets the memory manager being used by this stream.
  116. /// </summary>
  117. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  118. internal RecyclableMemoryStreamManager MemoryManager
  119. {
  120. get
  121. {
  122. this.CheckDisposed();
  123. return this.memoryManager;
  124. }
  125. }
  126. /// <summary>
  127. /// Callstack of the constructor. It is only set if MemoryManager.GenerateCallStacks is true,
  128. /// which should only be in debugging situations.
  129. /// </summary>
  130. internal string AllocationStack { get; }
  131. /// <summary>
  132. /// Callstack of the Dispose call. It is only set if MemoryManager.GenerateCallStacks is true,
  133. /// which should only be in debugging situations.
  134. /// </summary>
  135. internal string DisposeStack { get; private set; }
  136. #region Constructors
  137. /// <summary>
  138. /// Allocate a new RecyclableMemoryStream object.
  139. /// </summary>
  140. /// <param name="memoryManager">The memory manager</param>
  141. public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager)
  142. : this(memoryManager, null, 0, null) { }
  143. /// <summary>
  144. /// Allocate a new RecyclableMemoryStream object
  145. /// </summary>
  146. /// <param name="memoryManager">The memory manager</param>
  147. /// <param name="tag">A string identifying this stream for logging and debugging purposes</param>
  148. public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag)
  149. : this(memoryManager, tag, 0, null) { }
  150. /// <summary>
  151. /// Allocate a new RecyclableMemoryStream object
  152. /// </summary>
  153. /// <param name="memoryManager">The memory manager</param>
  154. /// <param name="tag">A string identifying this stream for logging and debugging purposes</param>
  155. /// <param name="requestedSize">The initial requested size to prevent future allocations</param>
  156. public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag, int requestedSize)
  157. : this(memoryManager, tag, requestedSize, null) { }
  158. /// <summary>
  159. /// Allocate a new RecyclableMemoryStream object
  160. /// </summary>
  161. /// <param name="memoryManager">The memory manager</param>
  162. /// <param name="tag">A string identifying this stream for logging and debugging purposes</param>
  163. /// <param name="requestedSize">The initial requested size to prevent future allocations</param>
  164. /// <param name="initialLargeBuffer">An initial buffer to use. This buffer will be owned by the stream and returned to the memory manager upon Dispose.</param>
  165. internal RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag, int requestedSize,
  166. byte[] initialLargeBuffer)
  167. : base(emptyArray)
  168. {
  169. this.memoryManager = memoryManager;
  170. this.id = Guid.NewGuid();
  171. this.tag = tag;
  172. if (requestedSize < memoryManager.BlockSize)
  173. {
  174. requestedSize = memoryManager.BlockSize;
  175. }
  176. if (initialLargeBuffer == null)
  177. {
  178. this.EnsureCapacity(requestedSize);
  179. }
  180. else
  181. {
  182. this.largeBuffer = initialLargeBuffer;
  183. }
  184. if (this.memoryManager.GenerateCallStacks)
  185. {
  186. this.AllocationStack = Environment.StackTrace;
  187. }
  188. RecyclableMemoryStreamManager.Events.Writer.MemoryStreamCreated(this.id, this.tag, requestedSize);
  189. this.memoryManager.ReportStreamCreated();
  190. }
  191. #endregion
  192. #region Dispose and Finalize
  193. ~RecyclableMemoryStream()
  194. {
  195. this.Dispose(false);
  196. }
  197. /// <summary>
  198. /// Returns the memory used by this stream back to the pool.
  199. /// </summary>
  200. /// <param name="disposing">Whether we're disposing (true), or being called by the finalizer (false)</param>
  201. [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
  202. Justification = "We have different disposal semantics, so SuppressFinalize is in a different spot.")]
  203. protected override void Dispose(bool disposing)
  204. {
  205. if (Interlocked.CompareExchange(ref this.disposedState, 1, 0) != 0)
  206. {
  207. string doubleDisposeStack = null;
  208. if (this.memoryManager.GenerateCallStacks)
  209. {
  210. doubleDisposeStack = Environment.StackTrace;
  211. }
  212. RecyclableMemoryStreamManager.Events.Writer.MemoryStreamDoubleDispose(this.id, this.tag,
  213. this.AllocationStack,
  214. this.DisposeStack,
  215. doubleDisposeStack);
  216. return;
  217. }
  218. RecyclableMemoryStreamManager.Events.Writer.MemoryStreamDisposed(this.id, this.tag);
  219. if (this.memoryManager.GenerateCallStacks)
  220. {
  221. this.DisposeStack = Environment.StackTrace;
  222. }
  223. if (disposing)
  224. {
  225. this.memoryManager.ReportStreamDisposed();
  226. GC.SuppressFinalize(this);
  227. }
  228. else
  229. {
  230. // We're being finalized.
  231. RecyclableMemoryStreamManager.Events.Writer.MemoryStreamFinalized(this.id, this.tag, this.AllocationStack);
  232. #if !NETSTANDARD1_4
  233. if (AppDomain.CurrentDomain.IsFinalizingForUnload())
  234. {
  235. // If we're being finalized because of a shutdown, don't go any further.
  236. // We have no idea what's already been cleaned up. Triggering events may cause
  237. // a crash.
  238. base.Dispose(disposing);
  239. return;
  240. }
  241. #endif
  242. this.memoryManager.ReportStreamFinalized();
  243. }
  244. this.memoryManager.ReportStreamLength(this.length);
  245. if (this.largeBuffer != null)
  246. {
  247. this.memoryManager.ReturnLargeBuffer(this.largeBuffer, this.tag);
  248. }
  249. if (this.dirtyBuffers != null)
  250. {
  251. foreach (var buffer in this.dirtyBuffers)
  252. {
  253. this.memoryManager.ReturnLargeBuffer(buffer, this.tag);
  254. }
  255. }
  256. this.memoryManager.ReturnBlocks(this.blocks, this.tag);
  257. this.blocks.Clear();
  258. base.Dispose(disposing);
  259. }
  260. /// <summary>
  261. /// Equivalent to Dispose
  262. /// </summary>
  263. #if NETSTANDARD1_4
  264. public void Close()
  265. #else
  266. public override void Close()
  267. #endif
  268. {
  269. this.Dispose(true);
  270. }
  271. #endregion
  272. #region MemoryStream overrides
  273. /// <summary>
  274. /// Gets or sets the capacity
  275. /// </summary>
  276. /// <remarks>Capacity is always in multiples of the memory manager's block size, unless
  277. /// the large buffer is in use. Capacity never decreases during a stream's lifetime.
  278. /// Explicitly setting the capacity to a lower value than the current value will have no effect.
  279. /// This is because the buffers are all pooled by chunks and there's little reason to
  280. /// allow stream truncation.
  281. /// </remarks>
  282. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  283. public override int Capacity
  284. {
  285. get
  286. {
  287. this.CheckDisposed();
  288. if (this.largeBuffer != null)
  289. {
  290. return this.largeBuffer.Length;
  291. }
  292. long size = (long)this.blocks.Count * this.memoryManager.BlockSize;
  293. return (int)Math.Min(int.MaxValue, size);
  294. }
  295. set
  296. {
  297. this.CheckDisposed();
  298. this.EnsureCapacity(value);
  299. }
  300. }
  301. private int length;
  302. /// <summary>
  303. /// Gets the number of bytes written to this stream.
  304. /// </summary>
  305. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  306. public override long Length
  307. {
  308. get
  309. {
  310. this.CheckDisposed();
  311. return this.length;
  312. }
  313. }
  314. private int position;
  315. /// <summary>
  316. /// Gets the current position in the stream
  317. /// </summary>
  318. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  319. public override long Position
  320. {
  321. get
  322. {
  323. this.CheckDisposed();
  324. return this.position;
  325. }
  326. set
  327. {
  328. this.CheckDisposed();
  329. if (value < 0)
  330. {
  331. throw new ArgumentOutOfRangeException("value", "value must be non-negative");
  332. }
  333. if (value > MaxStreamLength)
  334. {
  335. throw new ArgumentOutOfRangeException("value", "value cannot be more than " + MaxStreamLength);
  336. }
  337. this.position = (int)value;
  338. }
  339. }
  340. /// <summary>
  341. /// Whether the stream can currently read
  342. /// </summary>
  343. public override bool CanRead => !this.Disposed;
  344. /// <summary>
  345. /// Whether the stream can currently seek
  346. /// </summary>
  347. public override bool CanSeek => !this.Disposed;
  348. /// <summary>
  349. /// Always false
  350. /// </summary>
  351. public override bool CanTimeout => false;
  352. /// <summary>
  353. /// Whether the stream can currently write
  354. /// </summary>
  355. public override bool CanWrite => !this.Disposed;
  356. /// <summary>
  357. /// Returns a single buffer containing the contents of the stream.
  358. /// The buffer may be longer than the stream length.
  359. /// </summary>
  360. /// <returns>A byte[] buffer</returns>
  361. /// <remarks>IMPORTANT: Doing a Write() after calling GetBuffer() invalidates the buffer. The old buffer is held onto
  362. /// until Dispose is called, but the next time GetBuffer() is called, a new buffer from the pool will be required.</remarks>
  363. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  364. #if NETSTANDARD1_4
  365. public byte[] GetBuffer()
  366. #else
  367. public override byte[] GetBuffer()
  368. #endif
  369. {
  370. this.CheckDisposed();
  371. if (this.largeBuffer != null)
  372. {
  373. return this.largeBuffer;
  374. }
  375. if (this.blocks.Count == 1)
  376. {
  377. return this.blocks[0];
  378. }
  379. // Buffer needs to reflect the capacity, not the length, because
  380. // it's possible that people will manipulate the buffer directly
  381. // and set the length afterward. Capacity sets the expectation
  382. // for the size of the buffer.
  383. var newBuffer = this.memoryManager.GetLargeBuffer(this.Capacity, this.tag);
  384. // InternalRead will check for existence of largeBuffer, so make sure we
  385. // don't set it until after we've copied the data.
  386. this.InternalRead(newBuffer, 0, this.length, 0);
  387. this.largeBuffer = newBuffer;
  388. if (this.blocks.Count > 0 && this.memoryManager.AggressiveBufferReturn)
  389. {
  390. this.memoryManager.ReturnBlocks(this.blocks, this.tag);
  391. this.blocks.Clear();
  392. }
  393. return this.largeBuffer;
  394. }
  395. /// <summary>
  396. /// Returns a new array with a copy of the buffer's contents. You should almost certainly be using GetBuffer combined with the Length to
  397. /// access the bytes in this stream. Calling ToArray will destroy the benefits of pooled buffers, but it is included
  398. /// for the sake of completeness.
  399. /// </summary>
  400. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  401. #pragma warning disable CS0809
  402. [Obsolete("This method has degraded performance vs. GetBuffer and should be avoided.")]
  403. public override byte[] ToArray()
  404. {
  405. this.CheckDisposed();
  406. var newBuffer = new byte[this.Length];
  407. this.InternalRead(newBuffer, 0, this.length, 0);
  408. string stack = this.memoryManager.GenerateCallStacks ? Environment.StackTrace : null;
  409. RecyclableMemoryStreamManager.Events.Writer.MemoryStreamToArray(this.id, this.tag, stack, 0);
  410. this.memoryManager.ReportStreamToArray();
  411. return newBuffer;
  412. }
  413. #pragma warning restore CS0809
  414. /// <summary>
  415. /// Reads from the current position into the provided buffer
  416. /// </summary>
  417. /// <param name="buffer">Destination buffer</param>
  418. /// <param name="offset">Offset into buffer at which to start placing the read bytes.</param>
  419. /// <param name="count">Number of bytes to read.</param>
  420. /// <returns>The number of bytes read</returns>
  421. /// <exception cref="ArgumentNullException">buffer is null</exception>
  422. /// <exception cref="ArgumentOutOfRangeException">offset or count is less than 0</exception>
  423. /// <exception cref="ArgumentException">offset subtracted from the buffer length is less than count</exception>
  424. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  425. public override int Read(byte[] buffer, int offset, int count)
  426. {
  427. return this.SafeRead(buffer, offset, count, ref this.position);
  428. }
  429. /// <summary>
  430. /// Reads from the specified position into the provided buffer
  431. /// </summary>
  432. /// <param name="buffer">Destination buffer</param>
  433. /// <param name="offset">Offset into buffer at which to start placing the read bytes.</param>
  434. /// <param name="count">Number of bytes to read.</param>
  435. /// <param name="streamPosition">Position in the stream to start reading from</param>
  436. /// <returns>The number of bytes read</returns>
  437. /// <exception cref="ArgumentNullException">buffer is null</exception>
  438. /// <exception cref="ArgumentOutOfRangeException">offset or count is less than 0</exception>
  439. /// <exception cref="ArgumentException">offset subtracted from the buffer length is less than count</exception>
  440. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  441. public int SafeRead(byte[] buffer, int offset, int count, ref int streamPosition)
  442. {
  443. this.CheckDisposed();
  444. if (buffer == null)
  445. {
  446. throw new ArgumentNullException(nameof(buffer));
  447. }
  448. if (offset < 0)
  449. {
  450. throw new ArgumentOutOfRangeException(nameof(offset), "offset cannot be negative");
  451. }
  452. if (count < 0)
  453. {
  454. throw new ArgumentOutOfRangeException(nameof(count), "count cannot be negative");
  455. }
  456. if (offset + count > buffer.Length)
  457. {
  458. throw new ArgumentException("buffer length must be at least offset + count");
  459. }
  460. int amountRead = this.InternalRead(buffer, offset, count, streamPosition);
  461. streamPosition += amountRead;
  462. return amountRead;
  463. }
  464. /// <summary>
  465. /// Writes the buffer to the stream
  466. /// </summary>
  467. /// <param name="buffer">Source buffer</param>
  468. /// <param name="offset">Start position</param>
  469. /// <param name="count">Number of bytes to write</param>
  470. /// <exception cref="ArgumentNullException">buffer is null</exception>
  471. /// <exception cref="ArgumentOutOfRangeException">offset or count is negative</exception>
  472. /// <exception cref="ArgumentException">buffer.Length - offset is not less than count</exception>
  473. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  474. public override void Write(byte[] buffer, int offset, int count)
  475. {
  476. this.CheckDisposed();
  477. if (buffer == null)
  478. {
  479. throw new ArgumentNullException(nameof(buffer));
  480. }
  481. if (offset < 0)
  482. {
  483. throw new ArgumentOutOfRangeException(nameof(offset), offset,
  484. "Offset must be in the range of 0 - buffer.Length-1");
  485. }
  486. if (count < 0)
  487. {
  488. throw new ArgumentOutOfRangeException(nameof(count), count, "count must be non-negative");
  489. }
  490. if (count + offset > buffer.Length)
  491. {
  492. throw new ArgumentException("count must be greater than buffer.Length - offset");
  493. }
  494. int blockSize = this.memoryManager.BlockSize;
  495. long end = (long)this.position + count;
  496. // Check for overflow
  497. if (end > MaxStreamLength)
  498. {
  499. throw new IOException("Maximum capacity exceeded");
  500. }
  501. long requiredBuffers = (end + blockSize - 1) / blockSize;
  502. if (requiredBuffers * blockSize > MaxStreamLength)
  503. {
  504. throw new IOException("Maximum capacity exceeded");
  505. }
  506. this.EnsureCapacity((int)end);
  507. if (this.largeBuffer == null)
  508. {
  509. int bytesRemaining = count;
  510. int bytesWritten = 0;
  511. var blockAndOffset = this.GetBlockAndRelativeOffset(this.position);
  512. while (bytesRemaining > 0)
  513. {
  514. byte[] currentBlock = this.blocks[blockAndOffset.Block];
  515. int remainingInBlock = blockSize - blockAndOffset.Offset;
  516. int amountToWriteInBlock = Math.Min(remainingInBlock, bytesRemaining);
  517. Buffer.BlockCopy(buffer, offset + bytesWritten, currentBlock, blockAndOffset.Offset,
  518. amountToWriteInBlock);
  519. bytesRemaining -= amountToWriteInBlock;
  520. bytesWritten += amountToWriteInBlock;
  521. ++blockAndOffset.Block;
  522. blockAndOffset.Offset = 0;
  523. }
  524. }
  525. else
  526. {
  527. Buffer.BlockCopy(buffer, offset, this.largeBuffer, this.position, count);
  528. }
  529. this.position = (int)end;
  530. this.length = Math.Max(this.position, this.length);
  531. }
  532. /// <summary>
  533. /// Returns a useful string for debugging. This should not normally be called in actual production code.
  534. /// </summary>
  535. public override string ToString()
  536. {
  537. return $"Id = {this.Id}, Tag = {this.Tag}, Length = {this.Length:N0} bytes";
  538. }
  539. /// <summary>
  540. /// Writes a single byte to the current position in the stream.
  541. /// </summary>
  542. /// <param name="value">byte value to write</param>
  543. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  544. public override void WriteByte(byte value)
  545. {
  546. this.CheckDisposed();
  547. this.byteBuffer[0] = value;
  548. this.Write(this.byteBuffer, 0, 1);
  549. }
  550. /// <summary>
  551. /// Reads a single byte from the current position in the stream.
  552. /// </summary>
  553. /// <returns>The byte at the current position, or -1 if the position is at the end of the stream.</returns>
  554. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  555. public override int ReadByte()
  556. {
  557. return this.SafeReadByte(ref this.position);
  558. }
  559. /// <summary>
  560. /// Reads a single byte from the specified position in the stream.
  561. /// </summary>
  562. /// <param name="streamPosition">The position in the stream to read from</param>
  563. /// <returns>The byte at the current position, or -1 if the position is at the end of the stream.</returns>
  564. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  565. public int SafeReadByte(ref int streamPosition)
  566. {
  567. this.CheckDisposed();
  568. if (streamPosition == this.length)
  569. {
  570. return -1;
  571. }
  572. byte value;
  573. if (this.largeBuffer == null)
  574. {
  575. var blockAndOffset = this.GetBlockAndRelativeOffset(streamPosition);
  576. value = this.blocks[blockAndOffset.Block][blockAndOffset.Offset];
  577. }
  578. else
  579. {
  580. value = this.largeBuffer[streamPosition];
  581. }
  582. streamPosition++;
  583. return value;
  584. }
  585. /// <summary>
  586. /// Sets the length of the stream
  587. /// </summary>
  588. /// <exception cref="ArgumentOutOfRangeException">value is negative or larger than MaxStreamLength</exception>
  589. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  590. public override void SetLength(long value)
  591. {
  592. this.CheckDisposed();
  593. if (value < 0 || value > MaxStreamLength)
  594. {
  595. throw new ArgumentOutOfRangeException(nameof(value),
  596. "value must be non-negative and at most " + MaxStreamLength);
  597. }
  598. this.EnsureCapacity((int)value);
  599. this.length = (int)value;
  600. if (this.position > value)
  601. {
  602. this.position = (int)value;
  603. }
  604. }
  605. /// <summary>
  606. /// Sets the position to the offset from the seek location
  607. /// </summary>
  608. /// <param name="offset">How many bytes to move</param>
  609. /// <param name="loc">From where</param>
  610. /// <returns>The new position</returns>
  611. /// <exception cref="ObjectDisposedException">Object has been disposed</exception>
  612. /// <exception cref="ArgumentOutOfRangeException">offset is larger than MaxStreamLength</exception>
  613. /// <exception cref="ArgumentException">Invalid seek origin</exception>
  614. /// <exception cref="IOException">Attempt to set negative position</exception>
  615. public override long Seek(long offset, SeekOrigin loc)
  616. {
  617. this.CheckDisposed();
  618. if (offset > MaxStreamLength)
  619. {
  620. throw new ArgumentOutOfRangeException(nameof(offset), "offset cannot be larger than " + MaxStreamLength);
  621. }
  622. int newPosition;
  623. switch (loc)
  624. {
  625. case SeekOrigin.Begin:
  626. newPosition = (int)offset;
  627. break;
  628. case SeekOrigin.Current:
  629. newPosition = (int)offset + this.position;
  630. break;
  631. case SeekOrigin.End:
  632. newPosition = (int)offset + this.length;
  633. break;
  634. default:
  635. throw new ArgumentException("Invalid seek origin", nameof(loc));
  636. }
  637. if (newPosition < 0)
  638. {
  639. throw new IOException("Seek before beginning");
  640. }
  641. this.position = newPosition;
  642. return this.position;
  643. }
  644. /// <summary>
  645. /// Synchronously writes this stream's bytes to the parameter stream.
  646. /// </summary>
  647. /// <param name="stream">Destination stream</param>
  648. /// <remarks>Important: This does a synchronous write, which may not be desired in some situations</remarks>
  649. public override void WriteTo(Stream stream)
  650. {
  651. this.CheckDisposed();
  652. if (stream == null)
  653. {
  654. throw new ArgumentNullException(nameof(stream));
  655. }
  656. if (this.largeBuffer == null)
  657. {
  658. int currentBlock = 0;
  659. int bytesRemaining = this.length;
  660. while (bytesRemaining > 0)
  661. {
  662. int amountToCopy = Math.Min(this.blocks[currentBlock].Length, bytesRemaining);
  663. stream.Write(this.blocks[currentBlock], 0, amountToCopy);
  664. bytesRemaining -= amountToCopy;
  665. ++currentBlock;
  666. }
  667. }
  668. else
  669. {
  670. stream.Write(this.largeBuffer, 0, this.length);
  671. }
  672. }
  673. #endregion
  674. #region Helper Methods
  675. private bool Disposed => Interlocked.Read(ref this.disposedState) != 0;
  676. private void CheckDisposed()
  677. {
  678. if (this.Disposed)
  679. {
  680. throw new ObjectDisposedException($"The stream with Id {this.id} and Tag {this.tag} is disposed.");
  681. }
  682. }
  683. private int InternalRead(byte[] buffer, int offset, int count, int fromPosition)
  684. {
  685. if (this.length - fromPosition <= 0)
  686. {
  687. return 0;
  688. }
  689. int amountToCopy;
  690. if (this.largeBuffer == null)
  691. {
  692. var blockAndOffset = this.GetBlockAndRelativeOffset(fromPosition);
  693. int bytesWritten = 0;
  694. int bytesRemaining = Math.Min(count, this.length - fromPosition);
  695. while (bytesRemaining > 0)
  696. {
  697. amountToCopy = Math.Min(this.blocks[blockAndOffset.Block].Length - blockAndOffset.Offset,
  698. bytesRemaining);
  699. Buffer.BlockCopy(this.blocks[blockAndOffset.Block], blockAndOffset.Offset, buffer,
  700. bytesWritten + offset, amountToCopy);
  701. bytesWritten += amountToCopy;
  702. bytesRemaining -= amountToCopy;
  703. ++blockAndOffset.Block;
  704. blockAndOffset.Offset = 0;
  705. }
  706. return bytesWritten;
  707. }
  708. amountToCopy = Math.Min(count, this.length - fromPosition);
  709. Buffer.BlockCopy(this.largeBuffer, fromPosition, buffer, offset, amountToCopy);
  710. return amountToCopy;
  711. }
  712. private struct BlockAndOffset
  713. {
  714. public int Block;
  715. public int Offset;
  716. public BlockAndOffset(int block, int offset)
  717. {
  718. this.Block = block;
  719. this.Offset = offset;
  720. }
  721. }
  722. private BlockAndOffset GetBlockAndRelativeOffset(int offset)
  723. {
  724. var blockSize = this.memoryManager.BlockSize;
  725. return new BlockAndOffset(offset / blockSize, offset % blockSize);
  726. }
  727. private void EnsureCapacity(int newCapacity)
  728. {
  729. if (newCapacity > this.memoryManager.MaximumStreamCapacity && this.memoryManager.MaximumStreamCapacity > 0)
  730. {
  731. RecyclableMemoryStreamManager.Events.Writer.MemoryStreamOverCapacity(newCapacity,
  732. this.memoryManager
  733. .MaximumStreamCapacity, this.tag,
  734. this.AllocationStack);
  735. throw new InvalidOperationException("Requested capacity is too large: " + newCapacity + ". Limit is " +
  736. this.memoryManager.MaximumStreamCapacity);
  737. }
  738. if (this.largeBuffer != null)
  739. {
  740. if (newCapacity > this.largeBuffer.Length)
  741. {
  742. var newBuffer = this.memoryManager.GetLargeBuffer(newCapacity, this.tag);
  743. this.InternalRead(newBuffer, 0, this.length, 0);
  744. this.ReleaseLargeBuffer();
  745. this.largeBuffer = newBuffer;
  746. }
  747. }
  748. else
  749. {
  750. while (this.Capacity < newCapacity)
  751. {
  752. blocks.Add((this.memoryManager.GetBlock()));
  753. }
  754. }
  755. }
  756. /// <summary>
  757. /// Release the large buffer (either stores it for eventual release or returns it immediately).
  758. /// </summary>
  759. private void ReleaseLargeBuffer()
  760. {
  761. if (this.memoryManager.AggressiveBufferReturn)
  762. {
  763. this.memoryManager.ReturnLargeBuffer(this.largeBuffer, this.tag);
  764. }
  765. else
  766. {
  767. if (this.dirtyBuffers == null)
  768. {
  769. // We most likely will only ever need space for one
  770. this.dirtyBuffers = new List<byte[]>(1);
  771. }
  772. this.dirtyBuffers.Add(this.largeBuffer);
  773. }
  774. this.largeBuffer = null;
  775. }
  776. #endregion
  777. }
  778. }