123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608 |
- // ---------------------------------------------------------------------
- // Copyright (c) 2015-2016 Microsoft
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to deal
- // in the Software without restriction, including without limitation the rights
- // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- // copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- // THE SOFTWARE.
- // ---------------------------------------------------------------------
- namespace Microsoft.IO
- {
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.IO;
- using System.Linq;
- using System.Threading;
- /// <summary>
- /// Manages pools of RecyclableMemoryStream objects.
- /// </summary>
- /// <remarks>
- /// There are two pools managed in here. The small pool contains same-sized buffers that are handed to streams
- /// as they write more data.
- ///
- /// For scenarios that need to call GetBuffer(), the large pool contains buffers of various sizes, all
- /// multiples of LargeBufferMultiple (1 MB by default). They are split by size to avoid overly-wasteful buffer
- /// usage. There should be far fewer 8 MB buffers than 1 MB buffers, for example.
- /// </remarks>
- public partial class RecyclableMemoryStreamManager
- {
- /// <summary>
- /// Generic delegate for handling events without any arguments.
- /// </summary>
- public delegate void EventHandler();
- /// <summary>
- /// Delegate for handling large buffer discard reports.
- /// </summary>
- /// <param name="reason">Reason the buffer was discarded.</param>
- public delegate void LargeBufferDiscardedEventHandler(Events.MemoryStreamDiscardReason reason);
- /// <summary>
- /// Delegate for handling reports of stream size when streams are allocated
- /// </summary>
- /// <param name="bytes">Bytes allocated.</param>
- public delegate void StreamLengthReportHandler(long bytes);
- /// <summary>
- /// Delegate for handling periodic reporting of memory use statistics.
- /// </summary>
- /// <param name="smallPoolInUseBytes">Bytes currently in use in the small pool.</param>
- /// <param name="smallPoolFreeBytes">Bytes currently free in the small pool.</param>
- /// <param name="largePoolInUseBytes">Bytes currently in use in the large pool.</param>
- /// <param name="largePoolFreeBytes">Bytes currently free in the large pool.</param>
- public delegate void UsageReportEventHandler(
- long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes);
- public const int DefaultBlockSize = 128 * 1024;
- public const int DefaultLargeBufferMultiple = 1024 * 1024;
- public const int DefaultMaximumBufferSize = 128 * 1024 * 1024;
- private readonly int blockSize;
- private readonly long[] largeBufferFreeSize;
- private readonly long[] largeBufferInUseSize;
- private readonly int largeBufferMultiple;
- /// <summary>
- /// pools[0] = 1x largeBufferMultiple buffers
- /// pools[1] = 2x largeBufferMultiple buffers
- /// etc., up to maximumBufferSize
- /// </summary>
- private readonly ConcurrentStack<byte[]>[] largePools;
- private readonly int maximumBufferSize;
- private readonly ConcurrentStack<byte[]> smallPool;
- private long smallPoolFreeSize;
- private long smallPoolInUseSize;
- /// <summary>
- /// Initializes the memory manager with the default block/buffer specifications.
- /// </summary>
- public RecyclableMemoryStreamManager()
- : this(DefaultBlockSize, DefaultLargeBufferMultiple, DefaultMaximumBufferSize) { }
- /// <summary>
- /// Initializes the memory manager with the given block requiredSize.
- /// </summary>
- /// <param name="blockSize">Size of each block that is pooled. Must be > 0.</param>
- /// <param name="largeBufferMultiple">Each large buffer will be a multiple of this value.</param>
- /// <param name="maximumBufferSize">Buffers larger than this are not pooled</param>
- /// <exception cref="ArgumentOutOfRangeException">blockSize is not a positive number, or largeBufferMultiple is not a positive number, or maximumBufferSize is less than blockSize.</exception>
- /// <exception cref="ArgumentException">maximumBufferSize is not a multiple of largeBufferMultiple</exception>
- public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize)
- {
- if (blockSize <= 0)
- {
- throw new ArgumentOutOfRangeException(nameof(blockSize), blockSize, "blockSize must be a positive number");
- }
- if (largeBufferMultiple <= 0)
- {
- throw new ArgumentOutOfRangeException(nameof(largeBufferMultiple),
- "largeBufferMultiple must be a positive number");
- }
- if (maximumBufferSize < blockSize)
- {
- throw new ArgumentOutOfRangeException(nameof(maximumBufferSize),
- "maximumBufferSize must be at least blockSize");
- }
- this.blockSize = blockSize;
- this.largeBufferMultiple = largeBufferMultiple;
- this.maximumBufferSize = maximumBufferSize;
- if (!this.IsLargeBufferMultiple(maximumBufferSize))
- {
- throw new ArgumentException("maximumBufferSize is not a multiple of largeBufferMultiple",
- nameof(maximumBufferSize));
- }
- this.smallPool = new ConcurrentStack<byte[]>();
- var numLargePools = maximumBufferSize / largeBufferMultiple;
- // +1 to store size of bytes in use that are too large to be pooled
- this.largeBufferInUseSize = new long[numLargePools + 1];
- this.largeBufferFreeSize = new long[numLargePools];
- this.largePools = new ConcurrentStack<byte[]>[numLargePools];
- for (var i = 0; i < this.largePools.Length; ++i)
- {
- this.largePools[i] = new ConcurrentStack<byte[]>();
- }
- Events.Writer.MemoryStreamManagerInitialized(blockSize, largeBufferMultiple, maximumBufferSize);
- }
- /// <summary>
- /// The size of each block. It must be set at creation and cannot be changed.
- /// </summary>
- public int BlockSize => this.blockSize;
- /// <summary>
- /// All buffers are multiples of this number. It must be set at creation and cannot be changed.
- /// </summary>
- public int LargeBufferMultiple => this.largeBufferMultiple;
- /// <summary>
- /// Gets or sets the maximum buffer size.
- /// </summary>
- /// <remarks>Any buffer that is returned to the pool that is larger than this will be
- /// discarded and garbage collected.</remarks>
- public int MaximumBufferSize => this.maximumBufferSize;
- /// <summary>
- /// Number of bytes in small pool not currently in use
- /// </summary>
- public long SmallPoolFreeSize => this.smallPoolFreeSize;
- /// <summary>
- /// Number of bytes currently in use by stream from the small pool
- /// </summary>
- public long SmallPoolInUseSize => this.smallPoolInUseSize;
- /// <summary>
- /// Number of bytes in large pool not currently in use
- /// </summary>
- public long LargePoolFreeSize => this.largeBufferFreeSize.Sum();
- /// <summary>
- /// Number of bytes currently in use by streams from the large pool
- /// </summary>
- public long LargePoolInUseSize => this.largeBufferInUseSize.Sum();
- /// <summary>
- /// How many blocks are in the small pool
- /// </summary>
- public long SmallBlocksFree => this.smallPool.Count;
- /// <summary>
- /// How many buffers are in the large pool
- /// </summary>
- public long LargeBuffersFree
- {
- get
- {
- long free = 0;
- foreach (var pool in this.largePools)
- {
- free += pool.Count;
- }
- return free;
- }
- }
- /// <summary>
- /// How many bytes of small free blocks to allow before we start dropping
- /// those returned to us.
- /// </summary>
- public long MaximumFreeSmallPoolBytes { get; set; }
- /// <summary>
- /// How many bytes of large free buffers to allow before we start dropping
- /// those returned to us.
- /// </summary>
- public long MaximumFreeLargePoolBytes { get; set; }
- /// <summary>
- /// Maximum stream capacity in bytes. Attempts to set a larger capacity will
- /// result in an exception.
- /// </summary>
- /// <remarks>A value of 0 indicates no limit.</remarks>
- public long MaximumStreamCapacity { get; set; }
- /// <summary>
- /// Whether to save callstacks for stream allocations. This can help in debugging.
- /// It should NEVER be turned on generally in production.
- /// </summary>
- public bool GenerateCallStacks { get; set; }
- /// <summary>
- /// Whether dirty buffers can be immediately returned to the buffer pool. E.g. when GetBuffer() is called on
- /// a stream and creates a single large buffer, if this setting is enabled, the other blocks will be returned
- /// to the buffer pool immediately.
- /// Note when enabling this setting that the user is responsible for ensuring that any buffer previously
- /// retrieved from a stream which is subsequently modified is not used after modification (as it may no longer
- /// be valid).
- /// </summary>
- public bool AggressiveBufferReturn { get; set; }
- /// <summary>
- /// Removes and returns a single block from the pool.
- /// </summary>
- /// <returns>A byte[] array</returns>
- internal byte[] GetBlock()
- {
- byte[] block;
- if (!this.smallPool.TryPop(out block))
- {
- // We'll add this back to the pool when the stream is disposed
- // (unless our free pool is too large)
- block = new byte[this.BlockSize];
- Events.Writer.MemoryStreamNewBlockCreated(this.smallPoolInUseSize);
- ReportBlockCreated();
- }
- else
- {
- Interlocked.Add(ref this.smallPoolFreeSize, -this.BlockSize);
- }
- Interlocked.Add(ref this.smallPoolInUseSize, this.BlockSize);
- return block;
- }
- /// <summary>
- /// Returns a buffer of arbitrary size from the large buffer pool. This buffer
- /// will be at least the requiredSize and always be a multiple of largeBufferMultiple.
- /// </summary>
- /// <param name="requiredSize">The minimum length of the buffer</param>
- /// <param name="tag">The tag of the stream returning this buffer, for logging if necessary.</param>
- /// <returns>A buffer of at least the required size.</returns>
- internal byte[] GetLargeBuffer(int requiredSize, string tag)
- {
- requiredSize = this.RoundToLargeBufferMultiple(requiredSize);
- var poolIndex = requiredSize / this.largeBufferMultiple - 1;
- byte[] buffer;
- if (poolIndex < this.largePools.Length)
- {
- if (!this.largePools[poolIndex].TryPop(out buffer))
- {
- buffer = new byte[requiredSize];
- Events.Writer.MemoryStreamNewLargeBufferCreated(requiredSize, this.LargePoolInUseSize);
- ReportLargeBufferCreated();
- }
- else
- {
- Interlocked.Add(ref this.largeBufferFreeSize[poolIndex], -buffer.Length);
- }
- }
- else
- {
- // Buffer is too large to pool. They get a new buffer.
- // We still want to track the size, though, and we've reserved a slot
- // in the end of the inuse array for nonpooled bytes in use.
- poolIndex = this.largeBufferInUseSize.Length - 1;
- // We still want to round up to reduce heap fragmentation.
- buffer = new byte[requiredSize];
- string callStack = null;
- if (this.GenerateCallStacks)
- {
- // Grab the stack -- we want to know who requires such large buffers
- callStack = Environment.StackTrace;
- }
- Events.Writer.MemoryStreamNonPooledLargeBufferCreated(requiredSize, tag, callStack);
- ReportLargeBufferCreated();
- }
- Interlocked.Add(ref this.largeBufferInUseSize[poolIndex], buffer.Length);
- return buffer;
- }
- private int RoundToLargeBufferMultiple(int requiredSize)
- {
- return ((requiredSize + this.LargeBufferMultiple - 1) / this.LargeBufferMultiple) * this.LargeBufferMultiple;
- }
- private bool IsLargeBufferMultiple(int value)
- {
- return (value != 0) && (value % this.LargeBufferMultiple) == 0;
- }
- /// <summary>
- /// Returns the buffer to the large pool
- /// </summary>
- /// <param name="buffer">The buffer to return.</param>
- /// <param name="tag">The tag of the stream returning this buffer, for logging if necessary.</param>
- /// <exception cref="ArgumentNullException">buffer is null</exception>
- /// <exception cref="ArgumentException">buffer.Length is not a multiple of LargeBufferMultiple (it did not originate from this pool)</exception>
- internal void ReturnLargeBuffer(byte[] buffer, string tag)
- {
- if (buffer == null)
- {
- throw new ArgumentNullException(nameof(buffer));
- }
- if (!this.IsLargeBufferMultiple(buffer.Length))
- {
- throw new ArgumentException(
- "buffer did not originate from this memory manager. The size is not a multiple of " +
- this.LargeBufferMultiple);
- }
- var poolIndex = buffer.Length / this.largeBufferMultiple - 1;
- if (poolIndex < this.largePools.Length)
- {
- if ((this.largePools[poolIndex].Count + 1) * buffer.Length <= this.MaximumFreeLargePoolBytes ||
- this.MaximumFreeLargePoolBytes == 0)
- {
- this.largePools[poolIndex].Push(buffer);
- Interlocked.Add(ref this.largeBufferFreeSize[poolIndex], buffer.Length);
- }
- else
- {
- Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag,
- Events.MemoryStreamDiscardReason.EnoughFree);
- ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.EnoughFree);
- }
- }
- else
- {
- // This is a non-poolable buffer, but we still want to track its size for inuse
- // analysis. We have space in the inuse array for this.
- poolIndex = this.largeBufferInUseSize.Length - 1;
- Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag,
- Events.MemoryStreamDiscardReason.TooLarge);
- ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.TooLarge);
- }
- Interlocked.Add(ref this.largeBufferInUseSize[poolIndex], -buffer.Length);
- ReportUsageReport(this.smallPoolInUseSize, this.smallPoolFreeSize, this.LargePoolInUseSize,
- this.LargePoolFreeSize);
- }
- /// <summary>
- /// Returns the blocks to the pool
- /// </summary>
- /// <param name="blocks">Collection of blocks to return to the pool</param>
- /// <param name="tag">The tag of the stream returning these blocks, for logging if necessary.</param>
- /// <exception cref="ArgumentNullException">blocks is null</exception>
- /// <exception cref="ArgumentException">blocks contains buffers that are the wrong size (or null) for this memory manager</exception>
- internal void ReturnBlocks(ICollection<byte[]> blocks, string tag)
- {
- if (blocks == null)
- {
- throw new ArgumentNullException(nameof(blocks));
- }
- var bytesToReturn = blocks.Count * this.BlockSize;
- Interlocked.Add(ref this.smallPoolInUseSize, -bytesToReturn);
- foreach (var block in blocks)
- {
- if (block == null || block.Length != this.BlockSize)
- {
- throw new ArgumentException("blocks contains buffers that are not BlockSize in length");
- }
- }
- foreach (var block in blocks)
- {
- if (this.MaximumFreeSmallPoolBytes == 0 || this.SmallPoolFreeSize < this.MaximumFreeSmallPoolBytes)
- {
- Interlocked.Add(ref this.smallPoolFreeSize, this.BlockSize);
- this.smallPool.Push(block);
- }
- else
- {
- Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Small, tag,
- Events.MemoryStreamDiscardReason.EnoughFree);
- ReportBlockDiscarded();
- break;
- }
- }
- ReportUsageReport(this.smallPoolInUseSize, this.smallPoolFreeSize, this.LargePoolInUseSize,
- this.LargePoolFreeSize);
- }
- internal void ReportBlockCreated()
- {
- this.BlockCreated?.Invoke();
- }
- internal void ReportBlockDiscarded()
- {
- this.BlockDiscarded?.Invoke();
- }
- internal void ReportLargeBufferCreated()
- {
- this.LargeBufferCreated?.Invoke();
- }
- internal void ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason reason)
- {
- this.LargeBufferDiscarded?.Invoke(reason);
- }
- internal void ReportStreamCreated()
- {
- this.StreamCreated?.Invoke();
- }
- internal void ReportStreamDisposed()
- {
- this.StreamDisposed?.Invoke();
- }
- internal void ReportStreamFinalized()
- {
- this.StreamFinalized?.Invoke();
- }
- internal void ReportStreamLength(long bytes)
- {
- this.StreamLength?.Invoke(bytes);
- }
- internal void ReportStreamToArray()
- {
- this.StreamConvertedToArray?.Invoke();
- }
- internal void ReportUsageReport(
- long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes)
- {
- this.UsageReport?.Invoke(smallPoolInUseBytes, smallPoolFreeBytes, largePoolInUseBytes, largePoolFreeBytes);
- }
- /// <summary>
- /// Retrieve a new MemoryStream object with no tag and a default initial capacity.
- /// </summary>
- /// <returns>A MemoryStream.</returns>
- public MemoryStream GetStream()
- {
- return new RecyclableMemoryStream(this);
- }
- /// <summary>
- /// Retrieve a new MemoryStream object with the given tag and a default initial capacity.
- /// </summary>
- /// <param name="tag">A tag which can be used to track the source of the stream.</param>
- /// <returns>A MemoryStream.</returns>
- public MemoryStream GetStream(string tag)
- {
- return new RecyclableMemoryStream(this, tag);
- }
- /// <summary>
- /// Retrieve a new MemoryStream object with the given tag and at least the given capacity.
- /// </summary>
- /// <param name="tag">A tag which can be used to track the source of the stream.</param>
- /// <param name="requiredSize">The minimum desired capacity for the stream.</param>
- /// <returns>A MemoryStream.</returns>
- public MemoryStream GetStream(string tag, int requiredSize)
- {
- return new RecyclableMemoryStream(this, tag, requiredSize);
- }
- /// <summary>
- /// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using
- /// a single continugous underlying buffer.
- /// </summary>
- /// <remarks>Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations
- /// where the initial size is known and it is desirable to avoid copying data between the smaller underlying
- /// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer
- /// on the underlying stream.</remarks>
- /// <param name="tag">A tag which can be used to track the source of the stream.</param>
- /// <param name="requiredSize">The minimum desired capacity for the stream.</param>
- /// <param name="asContiguousBuffer">Whether to attempt to use a single contiguous buffer.</param>
- /// <returns>A MemoryStream.</returns>
- public MemoryStream GetStream(string tag, int requiredSize, bool asContiguousBuffer)
- {
- if (!asContiguousBuffer || requiredSize <= this.BlockSize)
- {
- return this.GetStream(tag, requiredSize);
- }
- return new RecyclableMemoryStream(this, tag, requiredSize, this.GetLargeBuffer(requiredSize, tag));
- }
- /// <summary>
- /// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided
- /// buffer. The provided buffer is not wrapped or used after construction.
- /// </summary>
- /// <remarks>The new stream's position is set to the beginning of the stream when returned.</remarks>
- /// <param name="tag">A tag which can be used to track the source of the stream.</param>
- /// <param name="buffer">The byte buffer to copy data from.</param>
- /// <param name="offset">The offset from the start of the buffer to copy from.</param>
- /// <param name="count">The number of bytes to copy from the buffer.</param>
- /// <returns>A MemoryStream.</returns>
- [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
- public MemoryStream GetStream(string tag, byte[] buffer, int offset, int count)
- {
- var stream = new RecyclableMemoryStream(this, tag, count);
- stream.Write(buffer, offset, count);
- stream.Position = 0;
- return stream;
- }
- /// <summary>
- /// Triggered when a new block is created.
- /// </summary>
- public event EventHandler BlockCreated;
- /// <summary>
- /// Triggered when a new block is created.
- /// </summary>
- public event EventHandler BlockDiscarded;
- /// <summary>
- /// Triggered when a new large buffer is created.
- /// </summary>
- public event EventHandler LargeBufferCreated;
- /// <summary>
- /// Triggered when a new stream is created.
- /// </summary>
- public event EventHandler StreamCreated;
- /// <summary>
- /// Triggered when a stream is disposed.
- /// </summary>
- public event EventHandler StreamDisposed;
- /// <summary>
- /// Triggered when a stream is finalized.
- /// </summary>
- public event EventHandler StreamFinalized;
- /// <summary>
- /// Triggered when a stream is finalized.
- /// </summary>
- public event StreamLengthReportHandler StreamLength;
- /// <summary>
- /// Triggered when a user converts a stream to array.
- /// </summary>
- public event EventHandler StreamConvertedToArray;
- /// <summary>
- /// Triggered when a large buffer is discarded, along with the reason for the discard.
- /// </summary>
- public event LargeBufferDiscardedEventHandler LargeBufferDiscarded;
- /// <summary>
- /// Periodically triggered to report usage statistics.
- /// </summary>
- public event UsageReportEventHandler UsageReport;
- }
- }
|