LimitedConcurrencyLevelTaskScheduler.cs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace HttpDownload
  8. {
  9. class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
  10. {
  11. /// <summary>Whether the current thread is processing work items.</summary>
  12. [ThreadStatic]
  13. private static bool _currentThreadIsProcessingItems;
  14. /// <summary>The list of tasks to be executed.</summary>
  15. private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
  16. /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
  17. private readonly int _maxDegreeOfParallelism;
  18. /// <summary>Whether the scheduler is currently processing work items.</summary>
  19. private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
  20. /// <summary>
  21. /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
  22. /// specified degree of parallelism.
  23. /// </summary>
  24. /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
  25. public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
  26. {
  27. if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
  28. _maxDegreeOfParallelism = maxDegreeOfParallelism;
  29. }
  30. /// <summary>Queues a task to the scheduler.</summary>
  31. /// <param name="task">The task to be queued.</param>
  32. protected sealed override void QueueTask(Task task)
  33. {
  34. // Add the task to the list of tasks to be processed. If there aren't enough
  35. // delegates currently queued or running to process tasks, schedule another.
  36. lock (_tasks)
  37. {
  38. _tasks.AddLast(task);
  39. if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
  40. {
  41. ++_delegatesQueuedOrRunning;
  42. NotifyThreadPoolOfPendingWork();
  43. }
  44. }
  45. }
  46. /// <summary>
  47. /// Informs the ThreadPool that there's work to be executed for this scheduler.
  48. /// </summary>
  49. private void NotifyThreadPoolOfPendingWork()
  50. {
  51. ThreadPool.UnsafeQueueUserWorkItem(_ =>
  52. {
  53. // Note that the current thread is now processing work items.
  54. // This is necessary to enable inlining of tasks into this thread.
  55. _currentThreadIsProcessingItems = true;
  56. try
  57. {
  58. // Process all available items in the queue.
  59. while (true)
  60. {
  61. Task item;
  62. lock (_tasks)
  63. {
  64. // When there are no more items to be processed,
  65. // note that we're done processing, and get out.
  66. if (_tasks.Count == 0)
  67. {
  68. --_delegatesQueuedOrRunning;
  69. break;
  70. }
  71. // Get the next item from the queue
  72. item = _tasks.First.Value;
  73. _tasks.RemoveFirst();
  74. }
  75. // Execute the task we pulled out of the queue
  76. base.TryExecuteTask(item);
  77. }
  78. }
  79. // We're done processing items on the current thread
  80. finally { _currentThreadIsProcessingItems = false; }
  81. }, null);
  82. }
  83. /// <summary>Attempts to execute the specified task on the current thread.</summary>
  84. /// <param name="task">The task to be executed.</param>
  85. /// <param name="taskWasPreviouslyQueued"></param>
  86. /// <returns>Whether the task could be executed on the current thread.</returns>
  87. protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  88. {
  89. // If this thread isn't already processing a task, we don't support inlining
  90. if (!_currentThreadIsProcessingItems) return false;
  91. // If the task was previously queued, remove it from the queue
  92. if (taskWasPreviouslyQueued) TryDequeue(task);
  93. // Try to run the task.
  94. return base.TryExecuteTask(task);
  95. }
  96. /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
  97. /// <param name="task">The task to be removed.</param>
  98. /// <returns>Whether the task could be found and removed.</returns>
  99. protected sealed override bool TryDequeue(Task task)
  100. {
  101. lock (_tasks) return _tasks.Remove(task);
  102. }
  103. /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
  104. public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
  105. /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
  106. /// <returns>An enumerable of the tasks currently scheduled.</returns>
  107. protected sealed override IEnumerable<Task> GetScheduledTasks()
  108. {
  109. bool lockTaken = false;
  110. try
  111. {
  112. Monitor.TryEnter(_tasks, ref lockTaken);
  113. if (lockTaken) return _tasks.ToArray();
  114. else throw new NotSupportedException();
  115. }
  116. finally
  117. {
  118. if (lockTaken) Monitor.Exit(_tasks);
  119. }
  120. }
  121. }
  122. class TaskFactoryMananger
  123. {
  124. //USE
  125. public static void Run()
  126. {
  127. try
  128. {
  129. while (true)
  130. {
  131. LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(10);
  132. TaskFactory factory = new TaskFactory(lcts);
  133. Task[] spiderTask = new Task[] {
  134. factory.StartNew(() =>
  135. {
  136. //Log.Logger.Information("{0} Start on thread {1}", "111", Thread.CurrentThread.ManagedThreadId);
  137. //Log.Logger.Information("{0} Finish on thread {1}", "111", Thread.CurrentThread.ManagedThreadId);
  138. }),
  139. factory.StartNew(() =>
  140. {
  141. Thread.Sleep(TimeSpan.FromSeconds(3));
  142. //Log.Logger.Information("{0} Start on thread {1}", "222", Thread.CurrentThread.ManagedThreadId);
  143. //Log.Logger.Information("{0} Finish on thread {1}", "222", Thread.CurrentThread.ManagedThreadId);
  144. }),
  145. factory.StartNew(() =>
  146. {
  147. Thread.Sleep(TimeSpan.FromSeconds(5));
  148. //Log.Logger.Information("{0} Start on thread {1}", "333", Thread.CurrentThread.ManagedThreadId);
  149. //Log.Logger.Information("{0} Finish on thread {1}", "333", Thread.CurrentThread.ManagedThreadId);
  150. })
  151. };
  152. Task.WaitAll(spiderTask);
  153. Thread.Sleep(TimeSpan.FromMinutes(1));
  154. }
  155. }
  156. catch (AggregateException ex)
  157. {
  158. foreach (Exception inner in ex.InnerExceptions)
  159. {
  160. //Log.Logger.Error(inner.Message);
  161. }
  162. }
  163. }
  164. /// <summary>
  165. /// Provides a task scheduler that ensures a maximum concurrency level while
  166. /// running on top of the ThreadPool.
  167. /// </summary>
  168. }
  169. }