#眉標=ThreadPool、BlockQueue、.NET #副標=多執行緒開發應用 #大標=生產者vs消費者 – 執行緒的供需問題 #作者=文/圖 吳剛志 =============================================== 程式1 FileStream fs = new FileStream( Path.Combine(PhotoJob.destFolder, "result.zip"), FileMode.Create); ZipOutputStream zos = new ZipOutputStream(fs); foreach (PhotoJob job in PhotoJob.GetJobs()) { job.Download(); job.MakeThumb(); job.AddZip(zos); } zos.Close(); fs.Close(); ================================================ =============================================== 程式2 List list = new List(); foreach (PhotoJob job in PhotoJob.GetJobs()) { list.Add(job); ThreadPool.QueueUserWorkItem( job.DownloadAndMakeThumb); } bool exit = true; do { Thread.Sleep(10); exit = true; foreach (PhotoJob pdj in list) { if (pdj.IsThumbReady == true) continue; exit = false; break; } } while (!exit); FileStream fs = new FileStream( Path.Combine(PhotoJob.destFolder, "result.zip"), FileMode.Create); ZipOutputStream zos = new ZipOutputStream(fs); foreach (PhotoJob pdj in list) { pdj.AddZip(zos); } zos.Close(); fs.Close(); =============================================== =============================================== 程式3 public class BlockQueue { public readonly int SizeLimit = 0; private Queue _inner_queue = null; private ManualResetEvent _enqueue_wait = null; private ManualResetEvent _dequeue_wait = null; public BlockQueue(int sizeLimit) { this.IsShutdown = false; this.SizeLimit = sizeLimit; this._inner_queue = new Queue(); this._enqueue_wait = new ManualResetEvent(false); this._dequeue_wait = new ManualResetEvent(false); } public void EnQueue(T item) { // 略,見程式 4 } public T DeQueue() { // 略,見程式 5 } public bool IsShutdown { get; private set; } public void Shutdown() { this.IsShutdown = true; this._dequeue_wait.Set(); } } =============================================== =============================================== 程式4 public void EnQueue(T item) { if (this.IsShutdown == true) { throw new InvalidCastException(); } while (true) { lock (this._inner_queue) { if (this._inner_queue.Count < this.SizeLimit) { this._inner_queue.Enqueue(item); this._enqueue_wait.Reset(); this._dequeue_wait.Set(); break; } } this._enqueue_wait.WaitOne(); } } =============================================== =============================================== 程式5 public T DeQueue() { while (true) { if (this.IsShutdown == true) { lock (this._inner_queue) { return this._inner_queue.Dequeue(); } } lock (this._inner_queue) { if (this._inner_queue.Count > 0) { T item = this._inner_queue.Dequeue(); this._dequeue_wait.Reset(); this._enqueue_wait.Set(); return item; } } this._dequeue_wait.WaitOne(); } } =============================================== =============================================== 程式6 public const int ConcurrentDownloadThreads = 3; public const int ConcurrentMakeThumbThreads = 5; private static IEnumerator jobs = null; private static BlockQueue downloadBufferQueue = new BlockQueue(int.MaxValue); private static BlockQueue thumbQueue = new BlockQueue(int.MaxValue); public static int Main(string[] args) { // 1. init jobs = PhotoJob.GetJobs().GetEnumerator(); List downloadWorkers = new List(); for (int t = 0; t < ConcurrentDownloadThreads; t++) { downloadWorkers.Add(new Thread(DownloadWorker)); } List resizeWorkers = new List(); for (int t = 0; t < ConcurrentMakeThumbThreads; t++) { resizeWorkers.Add(new Thread(MakeThumbWorker)); } Thread zipWorker = new Thread(AddZipWorker); zipWorker.Priority = ThreadPriority.Lowest; foreach (Thread t in downloadWorkers) { t.Start(); } foreach (Thread t in resizeWorkers) { t.Start(); } zipWorker.Start(); // 2. wait complete foreach (Thread t in downloadWorkers) { t.Join(); } downloadBufferQueue.Shutdown(); foreach (Thread t in resizeWorkers) { t.Join(); } thumbQueue.Shutdown(); zipWorker.Join(); return 0; } private static void DownloadWorker() { do { PhotoJob dj = null; lock (jobs) { if (jobs.MoveNext()) dj = jobs.Current; } if (dj == null) break; dj.Download(); downloadBufferQueue.EnQueue(dj); } while (true); } private static void MakeThumbWorker() { try { while (true) { PhotoJob dj = downloadBufferQueue.DeQueue(); dj.MakeThumb(); thumbQueue.EnQueue(dj); } } catch (InvalidOperationException) { } } private static void AddZipWorker() { FileStream fs = new FileStream( Path.Combine(PhotoJob.destFolder, "result.zip"), FileMode.Create); ZipOutputStream zos = new ZipOutputStream(fs); try { while (true) { PhotoJob dj = thumbQueue.DeQueue(); dj.AddZip(zos); } } catch (InvalidOperationException) { } zos.Close(); fs.Close(); } ===============================================