// --------------------------------------------------------------------- // Copyright (c) 2015-2016 Microsoft // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // --------------------------------------------------------------------- namespace Microsoft.IO { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Threading; /// /// Manages pools of RecyclableMemoryStream objects. /// /// /// There are two pools managed in here. The small pool contains same-sized buffers that are handed to streams /// as they write more data. /// /// For scenarios that need to call GetBuffer(), the large pool contains buffers of various sizes, all /// multiples of LargeBufferMultiple (1 MB by default). They are split by size to avoid overly-wasteful buffer /// usage. There should be far fewer 8 MB buffers than 1 MB buffers, for example. /// public partial class RecyclableMemoryStreamManager { /// /// Generic delegate for handling events without any arguments. /// public delegate void EventHandler(); /// /// Delegate for handling large buffer discard reports. /// /// Reason the buffer was discarded. public delegate void LargeBufferDiscardedEventHandler(Events.MemoryStreamDiscardReason reason); /// /// Delegate for handling reports of stream size when streams are allocated /// /// Bytes allocated. public delegate void StreamLengthReportHandler(long bytes); /// /// Delegate for handling periodic reporting of memory use statistics. /// /// Bytes currently in use in the small pool. /// Bytes currently free in the small pool. /// Bytes currently in use in the large pool. /// Bytes currently free in the large pool. public delegate void UsageReportEventHandler( long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes); public const int DefaultBlockSize = 128 * 1024; public const int DefaultLargeBufferMultiple = 1024 * 1024; public const int DefaultMaximumBufferSize = 128 * 1024 * 1024; private readonly int blockSize; private readonly long[] largeBufferFreeSize; private readonly long[] largeBufferInUseSize; private readonly int largeBufferMultiple; /// /// pools[0] = 1x largeBufferMultiple buffers /// pools[1] = 2x largeBufferMultiple buffers /// etc., up to maximumBufferSize /// private readonly ConcurrentStack[] largePools; private readonly int maximumBufferSize; private readonly ConcurrentStack smallPool; private long smallPoolFreeSize; private long smallPoolInUseSize; /// /// Initializes the memory manager with the default block/buffer specifications. /// public RecyclableMemoryStreamManager() : this(DefaultBlockSize, DefaultLargeBufferMultiple, DefaultMaximumBufferSize) { } /// /// Initializes the memory manager with the given block requiredSize. /// /// Size of each block that is pooled. Must be > 0. /// Each large buffer will be a multiple of this value. /// Buffers larger than this are not pooled /// blockSize is not a positive number, or largeBufferMultiple is not a positive number, or maximumBufferSize is less than blockSize. /// maximumBufferSize is not a multiple of largeBufferMultiple public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize) { if (blockSize <= 0) { throw new ArgumentOutOfRangeException(nameof(blockSize), blockSize, "blockSize must be a positive number"); } if (largeBufferMultiple <= 0) { throw new ArgumentOutOfRangeException(nameof(largeBufferMultiple), "largeBufferMultiple must be a positive number"); } if (maximumBufferSize < blockSize) { throw new ArgumentOutOfRangeException(nameof(maximumBufferSize), "maximumBufferSize must be at least blockSize"); } this.blockSize = blockSize; this.largeBufferMultiple = largeBufferMultiple; this.maximumBufferSize = maximumBufferSize; if (!this.IsLargeBufferMultiple(maximumBufferSize)) { throw new ArgumentException("maximumBufferSize is not a multiple of largeBufferMultiple", nameof(maximumBufferSize)); } this.smallPool = new ConcurrentStack(); var numLargePools = maximumBufferSize / largeBufferMultiple; // +1 to store size of bytes in use that are too large to be pooled this.largeBufferInUseSize = new long[numLargePools + 1]; this.largeBufferFreeSize = new long[numLargePools]; this.largePools = new ConcurrentStack[numLargePools]; for (var i = 0; i < this.largePools.Length; ++i) { this.largePools[i] = new ConcurrentStack(); } Events.Writer.MemoryStreamManagerInitialized(blockSize, largeBufferMultiple, maximumBufferSize); } /// /// The size of each block. It must be set at creation and cannot be changed. /// public int BlockSize => this.blockSize; /// /// All buffers are multiples of this number. It must be set at creation and cannot be changed. /// public int LargeBufferMultiple => this.largeBufferMultiple; /// /// Gets or sets the maximum buffer size. /// /// Any buffer that is returned to the pool that is larger than this will be /// discarded and garbage collected. public int MaximumBufferSize => this.maximumBufferSize; /// /// Number of bytes in small pool not currently in use /// public long SmallPoolFreeSize => this.smallPoolFreeSize; /// /// Number of bytes currently in use by stream from the small pool /// public long SmallPoolInUseSize => this.smallPoolInUseSize; /// /// Number of bytes in large pool not currently in use /// public long LargePoolFreeSize => this.largeBufferFreeSize.Sum(); /// /// Number of bytes currently in use by streams from the large pool /// public long LargePoolInUseSize => this.largeBufferInUseSize.Sum(); /// /// How many blocks are in the small pool /// public long SmallBlocksFree => this.smallPool.Count; /// /// How many buffers are in the large pool /// public long LargeBuffersFree { get { long free = 0; foreach (var pool in this.largePools) { free += pool.Count; } return free; } } /// /// How many bytes of small free blocks to allow before we start dropping /// those returned to us. /// public long MaximumFreeSmallPoolBytes { get; set; } /// /// How many bytes of large free buffers to allow before we start dropping /// those returned to us. /// public long MaximumFreeLargePoolBytes { get; set; } /// /// Maximum stream capacity in bytes. Attempts to set a larger capacity will /// result in an exception. /// /// A value of 0 indicates no limit. public long MaximumStreamCapacity { get; set; } /// /// Whether to save callstacks for stream allocations. This can help in debugging. /// It should NEVER be turned on generally in production. /// public bool GenerateCallStacks { get; set; } /// /// Whether dirty buffers can be immediately returned to the buffer pool. E.g. when GetBuffer() is called on /// a stream and creates a single large buffer, if this setting is enabled, the other blocks will be returned /// to the buffer pool immediately. /// Note when enabling this setting that the user is responsible for ensuring that any buffer previously /// retrieved from a stream which is subsequently modified is not used after modification (as it may no longer /// be valid). /// public bool AggressiveBufferReturn { get; set; } /// /// Removes and returns a single block from the pool. /// /// A byte[] array internal byte[] GetBlock() { byte[] block; if (!this.smallPool.TryPop(out block)) { // We'll add this back to the pool when the stream is disposed // (unless our free pool is too large) block = new byte[this.BlockSize]; Events.Writer.MemoryStreamNewBlockCreated(this.smallPoolInUseSize); ReportBlockCreated(); } else { Interlocked.Add(ref this.smallPoolFreeSize, -this.BlockSize); } Interlocked.Add(ref this.smallPoolInUseSize, this.BlockSize); return block; } /// /// Returns a buffer of arbitrary size from the large buffer pool. This buffer /// will be at least the requiredSize and always be a multiple of largeBufferMultiple. /// /// The minimum length of the buffer /// The tag of the stream returning this buffer, for logging if necessary. /// A buffer of at least the required size. internal byte[] GetLargeBuffer(int requiredSize, string tag) { requiredSize = this.RoundToLargeBufferMultiple(requiredSize); var poolIndex = requiredSize / this.largeBufferMultiple - 1; byte[] buffer; if (poolIndex < this.largePools.Length) { if (!this.largePools[poolIndex].TryPop(out buffer)) { buffer = new byte[requiredSize]; Events.Writer.MemoryStreamNewLargeBufferCreated(requiredSize, this.LargePoolInUseSize); ReportLargeBufferCreated(); } else { Interlocked.Add(ref this.largeBufferFreeSize[poolIndex], -buffer.Length); } } else { // Buffer is too large to pool. They get a new buffer. // We still want to track the size, though, and we've reserved a slot // in the end of the inuse array for nonpooled bytes in use. poolIndex = this.largeBufferInUseSize.Length - 1; // We still want to round up to reduce heap fragmentation. buffer = new byte[requiredSize]; string callStack = null; if (this.GenerateCallStacks) { // Grab the stack -- we want to know who requires such large buffers callStack = Environment.StackTrace; } Events.Writer.MemoryStreamNonPooledLargeBufferCreated(requiredSize, tag, callStack); ReportLargeBufferCreated(); } Interlocked.Add(ref this.largeBufferInUseSize[poolIndex], buffer.Length); return buffer; } private int RoundToLargeBufferMultiple(int requiredSize) { return ((requiredSize + this.LargeBufferMultiple - 1) / this.LargeBufferMultiple) * this.LargeBufferMultiple; } private bool IsLargeBufferMultiple(int value) { return (value != 0) && (value % this.LargeBufferMultiple) == 0; } /// /// Returns the buffer to the large pool /// /// The buffer to return. /// The tag of the stream returning this buffer, for logging if necessary. /// buffer is null /// buffer.Length is not a multiple of LargeBufferMultiple (it did not originate from this pool) internal void ReturnLargeBuffer(byte[] buffer, string tag) { if (buffer == null) { throw new ArgumentNullException(nameof(buffer)); } if (!this.IsLargeBufferMultiple(buffer.Length)) { throw new ArgumentException( "buffer did not originate from this memory manager. The size is not a multiple of " + this.LargeBufferMultiple); } var poolIndex = buffer.Length / this.largeBufferMultiple - 1; if (poolIndex < this.largePools.Length) { if ((this.largePools[poolIndex].Count + 1) * buffer.Length <= this.MaximumFreeLargePoolBytes || this.MaximumFreeLargePoolBytes == 0) { this.largePools[poolIndex].Push(buffer); Interlocked.Add(ref this.largeBufferFreeSize[poolIndex], buffer.Length); } else { Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag, Events.MemoryStreamDiscardReason.EnoughFree); ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.EnoughFree); } } else { // This is a non-poolable buffer, but we still want to track its size for inuse // analysis. We have space in the inuse array for this. poolIndex = this.largeBufferInUseSize.Length - 1; Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag, Events.MemoryStreamDiscardReason.TooLarge); ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.TooLarge); } Interlocked.Add(ref this.largeBufferInUseSize[poolIndex], -buffer.Length); ReportUsageReport(this.smallPoolInUseSize, this.smallPoolFreeSize, this.LargePoolInUseSize, this.LargePoolFreeSize); } /// /// Returns the blocks to the pool /// /// Collection of blocks to return to the pool /// The tag of the stream returning these blocks, for logging if necessary. /// blocks is null /// blocks contains buffers that are the wrong size (or null) for this memory manager internal void ReturnBlocks(ICollection blocks, string tag) { if (blocks == null) { throw new ArgumentNullException(nameof(blocks)); } var bytesToReturn = blocks.Count * this.BlockSize; Interlocked.Add(ref this.smallPoolInUseSize, -bytesToReturn); foreach (var block in blocks) { if (block == null || block.Length != this.BlockSize) { throw new ArgumentException("blocks contains buffers that are not BlockSize in length"); } } foreach (var block in blocks) { if (this.MaximumFreeSmallPoolBytes == 0 || this.SmallPoolFreeSize < this.MaximumFreeSmallPoolBytes) { Interlocked.Add(ref this.smallPoolFreeSize, this.BlockSize); this.smallPool.Push(block); } else { Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Small, tag, Events.MemoryStreamDiscardReason.EnoughFree); ReportBlockDiscarded(); break; } } ReportUsageReport(this.smallPoolInUseSize, this.smallPoolFreeSize, this.LargePoolInUseSize, this.LargePoolFreeSize); } internal void ReportBlockCreated() { this.BlockCreated?.Invoke(); } internal void ReportBlockDiscarded() { this.BlockDiscarded?.Invoke(); } internal void ReportLargeBufferCreated() { this.LargeBufferCreated?.Invoke(); } internal void ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason reason) { this.LargeBufferDiscarded?.Invoke(reason); } internal void ReportStreamCreated() { this.StreamCreated?.Invoke(); } internal void ReportStreamDisposed() { this.StreamDisposed?.Invoke(); } internal void ReportStreamFinalized() { this.StreamFinalized?.Invoke(); } internal void ReportStreamLength(long bytes) { this.StreamLength?.Invoke(bytes); } internal void ReportStreamToArray() { this.StreamConvertedToArray?.Invoke(); } internal void ReportUsageReport( long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes) { this.UsageReport?.Invoke(smallPoolInUseBytes, smallPoolFreeBytes, largePoolInUseBytes, largePoolFreeBytes); } /// /// Retrieve a new MemoryStream object with no tag and a default initial capacity. /// /// A MemoryStream. public MemoryStream GetStream() { return new RecyclableMemoryStream(this); } /// /// Retrieve a new MemoryStream object with the given tag and a default initial capacity. /// /// A tag which can be used to track the source of the stream. /// A MemoryStream. public MemoryStream GetStream(string tag) { return new RecyclableMemoryStream(this, tag); } /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity. /// /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// A MemoryStream. public MemoryStream GetStream(string tag, int requiredSize) { return new RecyclableMemoryStream(this, tag, requiredSize); } /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using /// a single continugous underlying buffer. /// /// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations /// where the initial size is known and it is desirable to avoid copying data between the smaller underlying /// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer /// on the underlying stream. /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// Whether to attempt to use a single contiguous buffer. /// A MemoryStream. public MemoryStream GetStream(string tag, int requiredSize, bool asContiguousBuffer) { if (!asContiguousBuffer || requiredSize <= this.BlockSize) { return this.GetStream(tag, requiredSize); } return new RecyclableMemoryStream(this, tag, requiredSize, this.GetLargeBuffer(requiredSize, tag)); } /// /// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided /// buffer. The provided buffer is not wrapped or used after construction. /// /// The new stream's position is set to the beginning of the stream when returned. /// A tag which can be used to track the source of the stream. /// The byte buffer to copy data from. /// The offset from the start of the buffer to copy from. /// The number of bytes to copy from the buffer. /// A MemoryStream. [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] public MemoryStream GetStream(string tag, byte[] buffer, int offset, int count) { var stream = new RecyclableMemoryStream(this, tag, count); stream.Write(buffer, offset, count); stream.Position = 0; return stream; } /// /// Triggered when a new block is created. /// public event EventHandler BlockCreated; /// /// Triggered when a new block is created. /// public event EventHandler BlockDiscarded; /// /// Triggered when a new large buffer is created. /// public event EventHandler LargeBufferCreated; /// /// Triggered when a new stream is created. /// public event EventHandler StreamCreated; /// /// Triggered when a stream is disposed. /// public event EventHandler StreamDisposed; /// /// Triggered when a stream is finalized. /// public event EventHandler StreamFinalized; /// /// Triggered when a stream is finalized. /// public event StreamLengthReportHandler StreamLength; /// /// Triggered when a user converts a stream to array. /// public event EventHandler StreamConvertedToArray; /// /// Triggered when a large buffer is discarded, along with the reason for the discard. /// public event LargeBufferDiscardedEventHandler LargeBufferDiscarded; /// /// Periodically triggered to report usage statistics. /// public event UsageReportEventHandler UsageReport; } }