本篇同步发文于个人Blog: [读书笔记] Threading in C# - PART 2: BASIC SYNCHRONIZATION
Synchronization Essentials
同步的结构分4种:Simple blocking methods: 像是Sleep, Join, Task.Wait等
Locking constructs: 限制数个Thread做后续操作, Excluseive locking只有一个thread, 比如lock(Monitor.Enter, Monitor.Exit), Mutex 和 SpinLock. 而Nonexclusive locking像是Semaphore, SemaphoreSlim 和 reader/writer locks
Signaling constructs: Thread可以暂停, 直到收到通知才恢复, 这可避免没效率的轮询. 比如用 event wait handler, Monitor的Wait/Pulse, CountdownEvent和Barrier
Nonblocking synchronization constructs: Thread.MemoryBarrier, Thread.VolatileRead, Thread.VolatileWrite, volatile关键字 和 Interlocked类别
Blocking
等待、Sleep等会让thread暂停, 把time slice还给CPU.
可用ThreadState检查是否Blocked
bool blocked = (someThread.ThreadState & ThreadState.WaitSleepJoin) != 0
当Thread发生block 或 unblock, 都会造成 Context Switch
unblock的触发条件:
blocking条件已满足
operation timing out(有指定timeout的时候)
被interrupt
被aborted
Blocking Versus Spinning
用loop一直查询某条件, 造成CPU消耗很大的运算资源
一种比较好一点的写法是, 在loop内加个Thread.Sleep
如果要用Spinning的写法, 一种是保证该条件很快就满足的运算, 另外一种是用SpinLock/SpinWait
ThreadState
Locking
区分 thread-safe和thread-unsafe的程式码, 通常是有static变数, 如果有多个thread存取时会不会错误
最简单的方式是用lock关键字做同步化, 绑住某个同步化物件, 只允许一个thread操作, 其他的thread变成blocked状态, 且依照queue的顺序来排队
A Comparison of Locking Constructs
下表的overhead是指对一个thread做block和unblock的时间Monitor.Enter and Monitor.Exit
lock = Monitor.Enter + Monitor.Exit + try/finally Monitor.Enter(_locker); try { DoSomething(); } finally { Monitor.Exit(_locker); }
上面写法会有bug, 如果在Enter和try之间发生exception (比如thread被Abort或记忆体溢出), 则永远不释放该locker
更好的写法是在try内用Enter, 且代入bool的变数, 用来判断是否lock成功, 成功的话可以呼叫Exit
bool lockTaken = false; try { Monitor.Enter(_locker, ref lockTaken) } finally { if(lockTaken) { Monitor.Exit(_locker); } }
还有个TryEnter, 可以代入timeout, 如果回传true代表lock成功, 如果回传false代表lock过程超时Choosing the Synchronization Object
必须是reference type的物件
一般是private的物件, 做逻辑封装
精準的lock会用专门的locker物件
lock(this)或lock(typeof(SomeClass)), 很难预防死结和过多的blocking
When to Lock
基本的规则是, lock在存取可写的共享物件
Thread safe与unsafe的写法
class ThreadUnsafe { static int _x; static void Increment() { ++_x; } static void Assign() { _x = 123; } } class ThreadSafe { static readonly object _locker = new object(); static int _x; static void Increment() { lock(_locker) { ++_x; }} static void Assign() { lock(_locker) { _x = 123; }} }
non-blocking的同步化, 后续有memory-barriers和Interlocked可用Locking and Atomicity
有一组变数, 写跟读总是在同一个lock, 则称它们是Atomic
比如下面x与y的除法範例
lock (_locker) { if(x!=0) y /= x; }
有时会有破坏atomicity的bug, 比如有呼叫其他函式造成exception, 使某些变数没完整计算完
建议其他函式先运算完再把它的值带入到lock, 或者try的catch/finally做rollback
Nested Locking
lock可以巢状包装
适用于lock的内容, 有call其他的函式, 这些函式实作再加上lock
Deadlocks
当两个thread都掌握对方的资源且等待对方释放, 没任何进展就是死结
基本的死结案例:
using System; using System.Threading; class TestDeadlocks { static void Main(string[] args){ object locker1 = new object(); object locker2 = new object(); new Thread(() => { lock(locker1){ Thread.Sleep(1000); Console.WriteLine("Ready to lock 2"); lock(locker2); } }).Start(); lock(locker2){ Thread.Sleep(1000); Console.WriteLine("Ready to lock 1"); lock(locker1); } Console.WriteLine("Hi"); Console.Read(); } }
更複杂的死结是,Thread 1 lock 而呼叫A class的X方法, X方法呼叫B class的Y 方法, 另外Thread 2 lock而呼叫B class的Y方法, Y方法呼叫A class的X方法.
考虑lock是否要用在别的class的函式
之后的declarative, data parallelism, immutable types 和 nonblock synchronization能减少lock的需求
另一些常见的死结发生在WPF的Dispatcher.Invoke或Winform的Control.Invoke, 解法是用BeginInvoke
Performance
基本上lock的速度很快
如果有很短暂的lock, 可以改用SpinLock, 减少Context Switch
Lock得太久, 会减少共时性的效能; Lock也是造成Deadlock的风险
Mutex
跨Process的lock, 大约比lock慢50倍
使用WaitOne做lock, ReleaseMutex unlock, 而用close或dispose也是release
Mutex认出同样的lock是用Name
如果是执行在Terminal Services环境, 一般的Mutex无法跨terminal server session, 要在Name加上Global\ 前缀字
using System; using System.Threading; class OneAtATimePlease { static void Main(string[] args){ using(var mutex = new Mutex(false, "test oreilly")) { if(!mutex.WaitOne(TimeSpan.FromSeconds(3), false)){ Console.WriteLine(" another is running"); Console.Read(); return; } RunProgram(); } } static void RunProgram() { Console.WriteLine("To exit"); Console.Read(); } }
Semaphore
Semaphore允许多个Thread在同一区段执行, 超过此容量的thread会block等待
把Semaphore的容量设为1, 就和lock与mutex一样, 但Semaphore的Release是任何thread都能呼叫
SemaphoreSlim有更低延迟, 且能带cancellation token, 用在parallel programming
如果Semaphore有给名字, 也是能跨Process
下面範例是最多3个Thread进入
using System; using System.Threading; class SemaphoreClub { static SemaphoreSlim _sem = new SemaphoreSlim(3); static void Main(string[] args){ for(int i = 0 ; i < 5; ++i){ new Thread(Enter).Start(i); } Console.Read(); } static void Enter(object id) { Console.WriteLine(id + " wants to enter"); _sem.Wait(); Console.WriteLine(id + " is in!"); Thread.Sleep(500 * (int) id); Console.WriteLine(id + " is leaving"); _sem.Release(); } }
Thread Safety
一般的type很少是Thread-safe, 原因有如下:开发时要维护该type所有Thread-safe栏位
Thread-safety有效能上的花费, 即使没有多执行绪也必要花费
使用Thread-safe的type不一定能让执行程式thread-safe
基本的用法是用exclusive lock去锁定特定的程式码而达到thread-safe
另外是减少共享资料, 达到无状态的功能, 比如ASP.NET Web的Request, 大都是独自处理
最后是用automatic locking regime的方式, 对class或property加上ContextBoundObject和Synchronization属性, 就能自动有锁的功能. 但缺点是会产生另一种方式的死结、併发性差、意外重入等问题. 尽量用exlusive lock.
Thread Safety and .NET Framework Types
Enumeration是thread-unsafe的行为, 所以共同资料要enumeration时, 先宣告一个local变数, 再用lock的方式copy (ToList, ToArray等)到local变数
Enumeration的另一种解法是reader/writer lock
class ThreadSafe { static List<string> _list = new List<string>(); static void Main() { new Thread(AddItem).Start(); new Thread(AddItem).Start(); } static void AddItem() { lock (_list) _list.Add("Item " + _list.Count()); string[] items; lock (_list) items = _list.ToArray(); foreach(string s in items) Console.WriteLine(s); } }
Locking around thread-safe objects
如果物件本身是thread-safe, 但是对它的有些操作仍是要lock, 比如if的叙述, 没有lock的状况下, 多执行绪的情况会存取到同样的值而做后续if block的操作(且可能会改值).Static members
.net framework设计static member是thread-safe, 而实例的member不是. 比如取DateTime.Now, 就不需要去用lock来取
static function不是thread-safe, 要确认功能对资料的共享性
Read-only thread safety
能在文件注明该collection是只读访问的thread-safe, 并要求使用者在只读的方法做写入
实作ToArray等, 本身会有thread-unsafe的issue
如果文件缺少说明, 要注意是否某些方法是read-only. 比如Random.Next(), 内部实作有更新private seed, 因此要用lock取值或者分开的Random物件
Thread Safety in Application Servers
通常像ASP.NET,WCF都是独立thread处理request, 但有时需要共享资料, 像catch, 更新和取资料都要lock, 会减少效能Rich Client Applications and Thread Affinity
在WPF或Winform, UI的元件有Affinity特性, 代表哪个thread建立元件, 那元件只能被那thread存取.
所以别的thread需要marshal原本thread来控制元件, 比如Winform的Invoke或BeginInvoke, WPF的Invoke或BeginInvoke
Invoke是同步方法, 会block目前thread; BeginInvoke是非同步方法, 立即回传caller而marshal的request会进到queue(和keyboard, mouse的事件使用同样的message queue)
Worker threads versus UI threads
Rich client有两大thread: UI Thread和Worker Thread
UI Thread专门建立UI元件, Worker thread一般用来执行long-running job
Rich client都会有一个UI Thread且是Main thread, 再由它生成work thread, 可直接生成或者用BackgroundWorker
Single Document Interface (SDI), 像是Word, 会有多个UI Thread
Immutable Objects
物件能封装成里面的状态不能被内部与外部改变, 称为immutable object. 决定它内部值是在Constructor且值是Read-only. 可减少lock的执行时间.
下面範例是建立一个immutable object, 只有assign新物件才会需要lock, 取值不需要
class ProgressStatus { public readonly int PercentComplete; public readonly string StatusMessage; public ProgressStatus(int percentComplete, string statusMessage) { PercentComplete = percentComplete; StatusMessage = statusMessage; } } class Program { readonly object _statusLocker = new object(); ProgressStatus _status; void SomeFunction() { _status = new ProgressStatus(50, "Working on it"); ProgressStatus statusCopy; lock(_statusLocker) statusCopy = _status; int pc = statusCopy.PercentComplete; string msg = statusCopy.StatusMessage; } }
在int pc = ... 的最后2行, 有隐含用Memory barrier包装
后续不使用lock, 还会有显示Memory barrier, Interlocked.CompareExchange, spin-waits等功能可用
Signaling with Event Wait Handles
Signaling是指thread会一直等待, 直到收到从别的Thread发的通知
和一般C#的event不相关
3种类型: AutoResetEvent, ManualResetEvent, CountdownEvent
A Comparison of Signaling Constructs
下表的overhead是指对一个signal和wait的时间AutoResetEvent
它像是一个票闸, 插入一张票只让一个人过
Thread 在门闸时呼叫WaitOne来wait/block, 而呼叫Set插入票
如果有多个thread在门闸呼叫WaitOne, 变成queue排队
Ticket可以来自任何thread, 代表任何unblock的thread可存取该AutoResetEvent物件并呼叫Set
在constructor代入true的话, 代表直接呼叫Set
用EventWaitHandle可达到相同的功能 (EventWaitHandle是AutoResetEvent的父类别)
var auto = new AutoResetEvent (false); // 等同写法 var auto2 = new EventWaitHandle(false, EventResetMode.AutoReset);
使用的範例如下: using System; using System.Threading; class TestAutoResetEvent { static EventWaitHandle _waitHandle = new AutoResetEvent(false); static void Main(string[] args){ new Thread(() => { Console.WriteLine("Wait..."); _waitHandle.WaitOne(); Console.WriteLine("awake"); }).Start(); Thread.Sleep(1000); _waitHandle.Set(); Console.Read(); } }
範例对应的时序表Producer/consumer queue
一个queue用来放进要执行的任务, 而其他thread在背景从这queue挑任务来做
用这种queue能有效管理要执行的thread数量, 比如IO密集型任务可只安排一个thread, 其他需要10个
CLR的Thread pool也是一种Producer/consumer queue
queue插入的资料会有对应的任务, 比如填入档案名称, 而对应的任务是加密该档案
以下用AutoResetEvent实作範例
using System; using System.Collections.Generic; using System.Threading; namespace ProducerConsumerTest { class Program { static void Main(string[] args) { using (ProducerConsumerQueue q = new ProducerConsumerQueue()) { q.EnqueueTask("Hello"); for(int i = 0; i < 20; ++i) { q.EnqueueTask("Say " + i); } q.EnqueueTask("Good bye"); } } } public class ProducerConsumerQueue : IDisposable { EventWaitHandle _wh = new AutoResetEvent(false); Thread _worker; readonly object _locker = new object(); Queue<string> _tasks = new Queue<string>(); public ProducerConsumerQueue() { _worker = new Thread(Work); _worker.Start(); } public void EnqueueTask(string task) { lock (_locker) { _tasks.Enqueue(task); } _wh.Set(); } public void Dispose() { EnqueueTask(null); // signal the consumer to exit _worker.Join(); // wait for the consumer's thread to finish _wh.Close(); // release any OS Resources } private void Work() { while (true) { string task = null; lock(_locker) { if(_tasks.Count > 0) { task = _tasks.Dequeue(); if(task == null) { return; } } } if(task != null) { Console.WriteLine("Performing task : " + task); Thread.Sleep(1000); // simulate work... } else { _wh.WaitOne(); // no more tasks , wait for a signal } } } } }
用lock去锁定queue, 达到thread-safe
在enqueue之后, 呼叫Set, 通知在while(true)有wait的thread可以往下做
如果caller插入null的资料, 直接结束
queue如果是空的, 会呼叫WaitOne等待signal
在Dispose的实作, 呼叫Enqueue(null), 让Work方法读到null而return结束, 否则Thread的Join永远不结束; 对EventWaitHandle呼叫Close, 可以释放内部有用到的资源
.Net Framework 4 有BlockingCollection, 实作Producer/Consumer queue
上述用AutoResetEvent的Producer/Consumer queue是个好的範例, 未来加上cancellation或bounded queue, 都可以此为起点
ManualResetEvent
和AutoResetEvent相比, ManualResetEvent是一般的闸门, 呼叫Set时, 让所有等待(有呼叫过WaitOne)的Thread全都能进入
呼叫Reset能把闸门关上
呼叫WaitOne就会Block
等同的写法
var manual1 = new ManualResetEvent(false);
var manual2 = new EventWaitHandle(false, EventResetModel.ManualReset);
另一个是ManualResetEventSlim能执行更快且支援CancellationToken, 但不能跨Process
ManualResetEvent是让一个Thread允许多个Thread unblock, CountdownEvent则相反
CountdownEvent
用CountdownEvent可以等多个Thread执行后再往后执行
在.NET Framework 4之前, 可以用Wait and Pulse来实作CountdownEvent
建构CountdownEvent指定要的数量, 呼叫Wait则block该thread, 而呼叫Signal会降低count, 直到count为0, 该thread将unblock
以下範例是等待3个Thread执行后, 才继续执行
using System; using System.Threading; public class Program { static CountdownEvent _countDown = new CountdownEvent(3); public static void Main() { new Thread(SaySomething).Start("Thread 1"); new Thread(SaySomething).Start("Thread 2"); new Thread(SaySomething).Start("Thread 3"); _countDown.Wait(); Console.WriteLine("All threads have finished"); } static void SaySomething(object msg) { Thread.Sleep(3000); Console.WriteLine(msg); _countDown.Signal(); } }
Count可以用AddCount来加更多需等待的数量, 但如果已经达到count = 0而又呼叫AddCount, 将抛出exception
建议可用TryAddCount, 回传false代表count已经是0
呼叫Reset将Count回到初始值
Creating a Cross-Process EventWaitHandle
EventWaitHandle可以指定名字, 让多个Process根据同一个名字而共同参考
基本用法:
EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.AutoReset, "MyCompany.MyApp.Name");
Wait Handles and the Thread Pool
使用ThreadPool.RegisterWaitForSingleObject, 可以不绑特定的Thread来执行, 将要委託的任务交给Thread pool执行 using System; using System.Threading; namespace TestWaitHandleThreadPool { class Program { static ManualResetEvent _starter = new ManualResetEvent(false); static void Main(string[] args) { RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Go, "Some Data", -1, true); Thread.Sleep(5000); _starter.Set(); Console.ReadLine(); reg.Unregister(_starter); } static void Go(object data, bool timeOut) { Console.WriteLine("Start work : " + data); } } }
参数-1代表不用timeout, 如果有timeout的话, 会检测传送的物件(範例是Some Data字串)的状态; 参数true代表该Thread pool收到signal后, 不再重设要Wait.
假如原本用WaiOne的方式处理, Server收到100个任务就得new 100个Thread, 变成绑定太多且大量Block. 改写的方法如下, 让后续的委託工作都给‘hread Pool处理
void AppServerMethod() { _wh.WaitOne(); // ... continue execution } // 变成 void AppServerMethod() { RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Resume, null, -1, true); // ... } static void Resume(object data, bool timeOut) { // ... continue execution }
WaitAny, WaitAll, and SignalAndWait
WaitHandle提供static method, 包含WaitNay, WaitAll, SignalAndWait, 可以对有继承WaitHandle的物件使用较複杂的Signal/Wait的功能
WaitAny: 等待任一个Thread收到Signal
WaitAll: 等待所有Thread都收到Signal
SignalAndWait: 对第一个参数的thread发出signal, 对第二个参数的thread做等待
Alternatives to WaitAll and SignalAndWait
WaitAll和SignalAndWait不能在单一执行绪的环境执行.
SignalAndWait的替代方案是Barrier类别, 而WaitAll的替代方案是Parallel class的Invoke方法
Synchronization Contexts (.NET Core已不存在)
继承ContextBoundObject且加上Synchronization属性, CLR在这物件会自动使用lock
如下範例, 每个Demo函式会排队执行
using System; using System.Runtime.Remoting.Contexts; using System.Threading; namespace TestAutoLock { class Program { static void Main(string[] args) { AutoLock safeInstance = new AutoLock(); new Thread(safeInstance.Demo).Start(); new Thread(safeInstance.Demo).Start(); safeInstance.Demo(); } } [Synchronization] public class AutoLock : ContextBoundObject { public void Demo() { Console.Write("Thread id : " + Thread.CurrentThread.ManagedThreadId); Console.Write(" Start....."); Thread.Sleep(1000); Console.WriteLine("End"); } } }
自动lock不包含static的成员和没有继承ContextBoundObject的物件(比如Form)
想像是CLR将原始Class套上一层ContextBoundObject Proxy, 能呼叫原始Class的成员, 再为它的方法都加上同步化的功能
如果前面的AutoLock是个Collection, 则使用它的物件也必须是ContextBoundObject, 否则存取它的item需要手动加上lock
Synchronization Context预设会延伸从同一层Scope的Context, 也就是lock包含的深度一直向下
在Synchronization的attribute可以改变预设的行为, 有这些选项:
NOT_SUPPORTED: 就跟没加上Synchronization的属性一样
SUPPORTED: 如果来自别的synchronized 物件做初始化, 则延伸它的context, 否则保持unsynchronized
REQUIRED (预设): 如果来自别的synchronized 物件做初始化, 则延伸它的context, 否则建立新的Context
REQUIRES_NEW: 总是建立新增Synchronization context
以下範例是会产生Deadlock的Synchronization: using System; using System.Runtime.Remoting.Contexts; using System.Threading; namespace TestAutoLockDeadlock { [Synchronization] public class Deadlock : ContextBoundObject { public Deadlock Other; public void Demo() { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); Thread.Sleep(1000); Console.WriteLine("Call other"); Other.Hello(); } void Hello() { Console.WriteLine("hello"); } } class Program { private static void Main(string[] args) { Deadlock dead1 = new Deadlock(); Deadlock dead2 = new Deadlock(); dead1.Other = dead2; dead2.Other = dead1; new Thread(dead1.Demo).Start(); dead2.Demo(); Console.Read(); } } }
两个Deadlock物件都是在Program建立, Program本身是unsynchronized, 所以Deadlock物件建立各自的Synchronization Context, 也有各自的lock
呼叫对方的Hello方法后, 即发生Deadlock
Reentrancy
Reentrant的定义是, 如果有段程式码被中断, 执行绪去执行别的程式, 之后再回来执行这段程式而没造成影响
通常Thread-safe和reentrant视为同等
如果[Synchronization(true)]这样使用, 代表需要reentry, 当执行离开此程式码时, 会把lock释放, 可避免deadlock. 副作用是在这释放期间, 任何thread可以进入该物件的context(比如呼叫它的方法)
[Synchronization(true)]是类别层级, 所以在非该context的呼叫都会当class层面的木马(?)
如果没有reentrancy, 则在一些场合比较难工作, 比如在一个synchronized class实作多执行绪, 将逻辑委託给其他worker thread, 则worker thread彼此间要沟通没有reentrancy的话, 将会受阻.
同步自动锁造成deadlock, reentrancy, 删除併发等问题, 在一些应用场合没有手动lock来的好用