#眉標=Thread Pipeline、ThreadSafe、ThreadPool #副標=多執行緒控制技巧 #大標=生產線模式的多執行緒應用 #作者=文/圖 吳剛志 ============= 程式1:處理縮圖及壓縮的程式碼 public class PipeWorkItem { public ZipOutputStream zipos = null; public string SourceImageFile = null; public void Stage1() { //第一階段: 把原圖檔轉成縮圖,存到暫存檔 (略) } public void Stage2() { //第二階段: 把暫存檔加到壓縮檔 (略) } } ================ ============= 程式2:生產線的程式碼 public class PipeWorkRunner { public void Start() { //啟動生產線的執行緒 this._is_start = true; this._stage1_thread = new Thread(this.Stage1Runner); this._stage2_thread = new Thread(this.Stage2Runner); this._stage1_thread.Start(); this._stage2_thread.Start(); this._stage1_thread.Join(); this._stage2_thread.Join(); } private Thread _stage1_thread = null; private Thread _stage2_thread = null; private Queue _stage1_queue = new Queue(); private Queue _stage2_queue = new Queue(); private ManualResetEvent _notify_stage2 = new ManualResetEvent(false); private void Stage1Runner() { { while (this._stage1_queue.Count > 0) { PipeWorkItem pwi = this._stage1_queue.Dequeue(); pwi.Stage1(); this._stage2_queue.Enqueue(pwi); this._notify_stage2.Set(); } } } private void Stage2Runner() { while (true) { while (this._stage2_queue.Count > 0) { PipeWorkItem pwi = this._stage2_queue.Dequeue(); pwi.Stage2(); } if (this._stage1_thread.IsAlive == false) break; this._notify_stage2.WaitOne(); } } } ================ ============= 程式3:每個階段的執行緒數量調整 public void Start() { //啟動生產線的執行緒 this._is_start = true; //階段1: 準備兩個執行緒 this._stage1_thread.Add(new Thread(this.Stage1Runner)); this._stage1_thread.Add(new Thread(this.Stage1Runner)); this._stage2_thread.Add(new Thread(this.Stage2Runner)); foreach (Thread t in this._stage1_thread) t.Start(); foreach (Thread t in this._stage2_thread) t.Start(); foreach (Thread t in this._stage1_thread) t.Join(); foreach (Thread t in this._stage2_thread) t.Join(); } private List _stage1_thread = new List(); private List _stage2_thread = new List(); private Queue _stage1_queue = new Queue(); private Queue _stage2_queue = new Queue(); //private ManualResetEvent _notify_stage1 = new ManualResetEvent(false); private ManualResetEvent _notify_stage2 = new ManualResetEvent(false); private void Stage1Runner() { PipeWorkItem pwi = null; while (true) { pwi = null; lock (this._stage1_queue) { if (this._stage1_queue.Count > 0) pwi = this._stage1_queue.Dequeue(); } if (pwi == null) break; pwi.Stage1(); //lock (this._stage2_queue) { this._stage2_queue.Enqueue(pwi); } this._notify_stage2.Set(); } } private void Stage2Runner() { Stopwatch idle_timer = new Stopwatch(); while (true) { while (this._stage2_queue.Count > 0) { PipeWorkItem pwi = this._stage2_queue.Dequeue(); pwi.Stage2(); Console.WriteLine("Queued items in stage 2: {0}", this._stage2_queue.Count); } // // 檢查 stage 1 所有的 thread 是否已經全部結束? // { bool _isAllStopped = true; foreach (Thread t1 in this._stage1_thread) { if (t1.IsAlive == true) _isAllStopped = false; } if (_isAllStopped == true) break; } idle_timer.Reset(); idle_timer.Start(); this._notify_stage2.WaitOne(); this._notify_stage2.Reset(); Console.WriteLine("Stage 2: Idle {0} msec", idle_timer.ElapsedMilliseconds); } } ================