[读书笔记] Threading in C# - PART 2: BASIC SYNCHRONIZATION

本篇同步发文于个人Blog: [读书笔记] Threading in C# - PART 2: BASIC SYNCHRONIZATION

Synchronization Essentials

同步的结构分4种:

Simple blocking methods: 像是Sleep, Join, Task.Wait等

Locking constructs: 限制数个Thread做后续操作, Excluseive locking只有一个thread, 比如lock(Monitor.Enter, Monitor.Exit), Mutex 和 SpinLock. 而Nonexclusive locking像是Semaphore, SemaphoreSlim 和 reader/writer locks

Signaling constructs: Thread可以暂停, 直到收到通知才恢复, 这可避免没效率的轮询. 比如用 event wait handler, Monitor的Wait/Pulse, CountdownEvent和Barrier

Nonblocking synchronization constructs: Thread.MemoryBarrier, Thread.VolatileRead, Thread.VolatileWrite, volatile关键字 和 Interlocked类别

Blocking

等待、Sleep等会让thread暂停, 把time slice还给CPU.

可用ThreadState检查是否Blocked

    bool blocked = (someThread.ThreadState & ThreadState.WaitSleepJoin) != 0

当Thread发生block 或 unblock, 都会造成 Context Switch

unblock的触发条件:

blocking条件已满足

operation timing out(有指定timeout的时候)

被interrupt

被aborted

Blocking Versus Spinning

用loop一直查询某条件, 造成CPU消耗很大的运算资源

一种比较好一点的写法是, 在loop内加个Thread.Sleep

如果要用Spinning的写法, 一种是保证该条件很快就满足的运算, 另外一种是用SpinLock/SpinWait

ThreadState

ThreadState可以检查Thread的状态

Locking

区分 thread-safe和thread-unsafe的程式码, 通常是有static变数, 如果有多个thread存取时会不会错误

最简单的方式是用lock关键字做同步化, 绑住某个同步化物件, 只允许一个thread操作, 其他的thread变成blocked状态, 且依照queue的顺序来排队

A Comparison of Locking Constructs

下表的overhead是指对一个thread做block和unblock的时间ConstructPurposeCross-Process ?Overheadlock (Monitor.Enter / Monitor.Exit)确保只有一个Thread能存取资源或一段code20 nsMutext同lockYes1000 nsSemaphoreSlim确保指定数量的thread能存取资源或一段code200 nsSemaphore同SemaphoreYes1000 nsReaderWriterLockSlim允许多个reader能与一个writer共存40 nsReaderWriterLock同ReaderWriterLockSlim100 ns

Monitor.Enter and Monitor.Exit

lock = Monitor.Enter + Monitor.Exit + try/finally
    Monitor.Enter(_locker);    try    {    DoSomething();    }    finally    {    Monitor.Exit(_locker);    }

上面写法会有bug, 如果在Enter和try之间发生exception (比如thread被Abort或记忆体溢出), 则永远不释放该locker

更好的写法是在try内用Enter, 且代入bool的变数, 用来判断是否lock成功, 成功的话可以呼叫Exit

    bool lockTaken = false;    try    {    Monitor.Enter(_locker, ref lockTaken)    }    finally    {    if(lockTaken)    {    Monitor.Exit(_locker);    }    }
还有个TryEnter, 可以代入timeout, 如果回传true代表lock成功, 如果回传false代表lock过程超时

Choosing the Synchronization Object

必须是reference type的物件

一般是private的物件, 做逻辑封装

精準的lock会用专门的locker物件

lock(this)或lock(typeof(SomeClass)), 很难预防死结和过多的blocking

When to Lock

基本的规则是, lock在存取可写的共享物件

Thread safe与unsafe的写法

    class ThreadUnsafe    {    static int _x;    static void Increment() { ++_x; }    static void Assign() { _x = 123; }    }        class ThreadSafe    {    static readonly object _locker = new object();    static int _x;    static void Increment() { lock(_locker) { ++_x; }}    static void Assign() { lock(_locker) { _x = 123; }}    }
non-blocking的同步化, 后续有memory-barriers和Interlocked可用

Locking and Atomicity

有一组变数, 写跟读总是在同一个lock, 则称它们是Atomic

比如下面x与y的除法範例

    lock (_locker)    {    if(x!=0)    y /= x;    }

有时会有破坏atomicity的bug, 比如有呼叫其他函式造成exception, 使某些变数没完整计算完

建议其他函式先运算完再把它的值带入到lock, 或者try的catch/finally做rollback

Nested Locking

lock可以巢状包装

适用于lock的内容, 有call其他的函式, 这些函式实作再加上lock

Deadlocks

当两个thread都掌握对方的资源且等待对方释放, 没任何进展就是死结

基本的死结案例:

    using System;    using System.Threading;        class TestDeadlocks    {        static void Main(string[] args){            object locker1 = new object();            object locker2 = new object();            new Thread(() => {                lock(locker1){                    Thread.Sleep(1000);                    Console.WriteLine("Ready to lock 2");                    lock(locker2);                }            }).Start();            lock(locker2){                Thread.Sleep(1000);                Console.WriteLine("Ready to lock 1");                lock(locker1);            }                Console.WriteLine("Hi");            Console.Read();        }    }

更複杂的死结是,Thread 1 lock 而呼叫A class的X方法, X方法呼叫B class的Y 方法, 另外Thread 2 lock而呼叫B class的Y方法, Y方法呼叫A class的X方法.

考虑lock是否要用在别的class的函式

之后的declarative, data parallelism, immutable types 和 nonblock synchronization能减少lock的需求

另一些常见的死结发生在WPF的Dispatcher.Invoke或Winform的Control.Invoke, 解法是用BeginInvoke

Performance

基本上lock的速度很快

如果有很短暂的lock, 可以改用SpinLock, 减少Context Switch

Lock得太久, 会减少共时性的效能; Lock也是造成Deadlock的风险

Mutex

跨Process的lock, 大约比lock慢50倍

使用WaitOne做lock, ReleaseMutex unlock, 而用close或dispose也是release

Mutex认出同样的lock是用Name

如果是执行在Terminal Services环境, 一般的Mutex无法跨terminal server session, 要在Name加上Global\ 前缀字

    using System;    using System.Threading;        class OneAtATimePlease    {        static void Main(string[] args){            using(var mutex = new Mutex(false, "test oreilly"))            {                if(!mutex.WaitOne(TimeSpan.FromSeconds(3), false)){                    Console.WriteLine(" another is running");                    Console.Read();                    return;                }                RunProgram();            }        }            static void RunProgram()        {            Console.WriteLine("To exit");            Console.Read();        }    }

Semaphore

Semaphore允许多个Thread在同一区段执行, 超过此容量的thread会block等待

把Semaphore的容量设为1, 就和lock与mutex一样, 但Semaphore的Release是任何thread都能呼叫

SemaphoreSlim有更低延迟, 且能带cancellation token, 用在parallel programming

如果Semaphore有给名字, 也是能跨Process

下面範例是最多3个Thread进入

    using System;    using System.Threading;        class SemaphoreClub    {        static SemaphoreSlim _sem = new SemaphoreSlim(3);        static void Main(string[] args){            for(int i = 0 ; i < 5; ++i){                new Thread(Enter).Start(i);            }            Console.Read();        }            static void Enter(object id)        {            Console.WriteLine(id + " wants to enter");            _sem.Wait();            Console.WriteLine(id + " is in!");            Thread.Sleep(500 * (int) id);            Console.WriteLine(id + " is leaving");            _sem.Release();        }    }

Thread Safety

一般的type很少是Thread-safe, 原因有如下:

开发时要维护该type所有Thread-safe栏位

Thread-safety有效能上的花费, 即使没有多执行绪也必要花费

使用Thread-safe的type不一定能让执行程式thread-safe

基本的用法是用exclusive lock去锁定特定的程式码而达到thread-safe

另外是减少共享资料, 达到无状态的功能, 比如ASP.NET Web的Request, 大都是独自处理

最后是用automatic locking regime的方式, 对class或property加上ContextBoundObject和Synchronization属性, 就能自动有锁的功能. 但缺点是会产生另一种方式的死结、併发性差、意外重入等问题. 尽量用exlusive lock.

Thread Safety and .NET Framework Types

Enumeration是thread-unsafe的行为, 所以共同资料要enumeration时, 先宣告一个local变数, 再用lock的方式copy (ToList, ToArray等)到local变数

Enumeration的另一种解法是reader/writer lock

    class ThreadSafe    {    static List<string> _list = new List<string>();    static void Main()    {    new Thread(AddItem).Start();    new Thread(AddItem).Start();    }        static void AddItem()    {    lock (_list) _list.Add("Item " + _list.Count());    string[] items;    lock (_list) items = _list.ToArray();    foreach(string s in items) Console.WriteLine(s);    }    }

Locking around thread-safe objects

如果物件本身是thread-safe, 但是对它的有些操作仍是要lock, 比如if的叙述, 没有lock的状况下, 多执行绪的情况会存取到同样的值而做后续if block的操作(且可能会改值).

Static members

.net framework设计static member是thread-safe, 而实例的member不是. 比如取DateTime.Now, 就不需要去用lock来取

static function不是thread-safe, 要确认功能对资料的共享性

Read-only thread safety

能在文件注明该collection是只读访问的thread-safe, 并要求使用者在只读的方法做写入

实作ToArray等, 本身会有thread-unsafe的issue

如果文件缺少说明, 要注意是否某些方法是read-only. 比如Random.Next(), 内部实作有更新private seed, 因此要用lock取值或者分开的Random物件

Thread Safety in Application Servers

通常像ASP.NET,WCF都是独立thread处理request, 但有时需要共享资料, 像catch, 更新和取资料都要lock, 会减少效能

Rich Client Applications and Thread Affinity

在WPF或Winform, UI的元件有Affinity特性, 代表哪个thread建立元件, 那元件只能被那thread存取.

所以别的thread需要marshal原本thread来控制元件, 比如Winform的Invoke或BeginInvoke, WPF的Invoke或BeginInvoke

Invoke是同步方法, 会block目前thread; BeginInvoke是非同步方法, 立即回传caller而marshal的request会进到queue(和keyboard, mouse的事件使用同样的message queue)

Worker threads versus UI threads

Rich client有两大thread: UI Thread和Worker Thread

UI Thread专门建立UI元件, Worker thread一般用来执行long-running job

Rich client都会有一个UI Thread且是Main thread, 再由它生成work thread, 可直接生成或者用BackgroundWorker

Single Document Interface (SDI), 像是Word, 会有多个UI Thread

Immutable Objects

物件能封装成里面的状态不能被内部与外部改变, 称为immutable object. 决定它内部值是在Constructor且值是Read-only. 可减少lock的执行时间.

下面範例是建立一个immutable object, 只有assign新物件才会需要lock, 取值不需要

    class ProgressStatus    {    public readonly int PercentComplete;    public readonly string StatusMessage;    public ProgressStatus(int percentComplete, string statusMessage)    {    PercentComplete = percentComplete;    StatusMessage = statusMessage;    }    }        class Program    {    readonly object _statusLocker = new object();    ProgressStatus _status;    void SomeFunction()    {    _status = new ProgressStatus(50, "Working on it");    ProgressStatus statusCopy;    lock(_statusLocker) statusCopy = _status;    int pc = statusCopy.PercentComplete;    string msg = statusCopy.StatusMessage;    }    }

在int pc = ... 的最后2行, 有隐含用Memory barrier包装

后续不使用lock, 还会有显示Memory barrier, Interlocked.CompareExchange, spin-waits等功能可用

Signaling with Event Wait Handles

Signaling是指thread会一直等待, 直到收到从别的Thread发的通知

和一般C#的event不相关

3种类型: AutoResetEvent, ManualResetEvent, CountdownEvent

A Comparison of Signaling Constructs

下表的overhead是指对一个signal和wait的时间ConstructPurposeCross-Process ?OverheadAutoResetEvent允许一个thread当收到singal时,执行一次unblockYes1000 nsManualResetEvent允许一个thread当收到singal时,执行无限期的unblock (直到它重置)Yes1000 nsManualResetEventSlim (Net Framework 4)同ManualResetEvent40 nsCountdownEvent (Net Framework 4)允许一个thread当收到预定数量的singal时,执行unblock40 nsBarrier (Net Framework 4)实作Thread执行屏障80 nsWait and Pulse允许一个thread block直到某条件达成120 ns for a Pulse

AutoResetEvent

它像是一个票闸, 插入一张票只让一个人过

Thread 在门闸时呼叫WaitOne来wait/block, 而呼叫Set插入票

如果有多个thread在门闸呼叫WaitOne, 变成queue排队

Ticket可以来自任何thread, 代表任何unblock的thread可存取该AutoResetEvent物件并呼叫Set

在constructor代入true的话, 代表直接呼叫Set

用EventWaitHandle可达到相同的功能 (EventWaitHandle是AutoResetEvent的父类别)

    var auto = new AutoResetEvent (false);        // 等同写法        var auto2 = new EventWaitHandle(false, EventResetMode.AutoReset);
使用的範例如下:
    using System;    using System.Threading;        class TestAutoResetEvent    {        static EventWaitHandle _waitHandle = new AutoResetEvent(false);        static void Main(string[] args){                        new Thread(() => {                Console.WriteLine("Wait...");                _waitHandle.WaitOne();                Console.WriteLine("awake");            }).Start();            Thread.Sleep(1000);            _waitHandle.Set();            Console.Read();        }    }
範例对应的时序表

Producer/consumer queue

一个queue用来放进要执行的任务, 而其他thread在背景从这queue挑任务来做

用这种queue能有效管理要执行的thread数量, 比如IO密集型任务可只安排一个thread, 其他需要10个

CLR的Thread pool也是一种Producer/consumer queue

queue插入的资料会有对应的任务, 比如填入档案名称, 而对应的任务是加密该档案

以下用AutoResetEvent实作範例

    using System;    using System.Collections.Generic;    using System.Threading;        namespace ProducerConsumerTest    {        class Program        {            static void Main(string[] args)            {                using (ProducerConsumerQueue q = new ProducerConsumerQueue())                {                    q.EnqueueTask("Hello");                    for(int i = 0; i < 20; ++i)                    {                        q.EnqueueTask("Say " + i);                    }                    q.EnqueueTask("Good bye");                }            }        }            public class ProducerConsumerQueue : IDisposable        {            EventWaitHandle _wh = new AutoResetEvent(false);            Thread _worker;            readonly object _locker = new object();            Queue<string> _tasks = new Queue<string>();            public ProducerConsumerQueue()            {                _worker = new Thread(Work);                _worker.Start();            }                public void EnqueueTask(string task)            {                lock (_locker)                {                    _tasks.Enqueue(task);                }                _wh.Set();            }                public void Dispose()            {                EnqueueTask(null); // signal the consumer to exit                _worker.Join(); // wait for the consumer's thread to finish                _wh.Close(); // release any OS Resources            }                private void Work()            {                while (true)                {                    string task = null;                    lock(_locker)                    {                        if(_tasks.Count > 0)                        {                            task = _tasks.Dequeue();                            if(task == null)                            {                                return;                            }                        }                    }                    if(task != null)                    {                        Console.WriteLine("Performing task : " + task);                        Thread.Sleep(1000); // simulate work...                    }                    else                    {                        _wh.WaitOne(); // no more tasks , wait for a signal                    }                }            }        }    }

用lock去锁定queue, 达到thread-safe

在enqueue之后, 呼叫Set, 通知在while(true)有wait的thread可以往下做

如果caller插入null的资料, 直接结束

queue如果是空的, 会呼叫WaitOne等待signal

在Dispose的实作, 呼叫Enqueue(null), 让Work方法读到null而return结束, 否则Thread的Join永远不结束; 对EventWaitHandle呼叫Close, 可以释放内部有用到的资源

.Net Framework 4 有BlockingCollection, 实作Producer/Consumer queue

上述用AutoResetEvent的Producer/Consumer queue是个好的範例, 未来加上cancellation或bounded queue, 都可以此为起点

ManualResetEvent

和AutoResetEvent相比, ManualResetEvent是一般的闸门, 呼叫Set时, 让所有等待(有呼叫过WaitOne)的Thread全都能进入

呼叫Reset能把闸门关上

呼叫WaitOne就会Block

等同的写法

var manual1 = new ManualResetEvent(false);
var manual2 = new EventWaitHandle(false, EventResetModel.ManualReset);

另一个是ManualResetEventSlim能执行更快且支援CancellationToken, 但不能跨Process

ManualResetEvent是让一个Thread允许多个Thread unblock, CountdownEvent则相反

CountdownEvent

用CountdownEvent可以等多个Thread执行后再往后执行

在.NET Framework 4之前, 可以用Wait and Pulse来实作CountdownEvent

建构CountdownEvent指定要的数量, 呼叫Wait则block该thread, 而呼叫Signal会降低count, 直到count为0, 该thread将unblock

以下範例是等待3个Thread执行后, 才继续执行

    using System;    using System.Threading;        public class Program    {    static CountdownEvent _countDown = new CountdownEvent(3);    public static void Main()    {    new Thread(SaySomething).Start("Thread 1");    new Thread(SaySomething).Start("Thread 2");    new Thread(SaySomething).Start("Thread 3");    _countDown.Wait();    Console.WriteLine("All threads have finished");    }    static void SaySomething(object msg)    {    Thread.Sleep(3000);    Console.WriteLine(msg);    _countDown.Signal();    }    }

Count可以用AddCount来加更多需等待的数量, 但如果已经达到count = 0而又呼叫AddCount, 将抛出exception

建议可用TryAddCount, 回传false代表count已经是0

呼叫Reset将Count回到初始值

Creating a Cross-Process EventWaitHandle

EventWaitHandle可以指定名字, 让多个Process根据同一个名字而共同参考

基本用法:

EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.AutoReset, "MyCompany.MyApp.Name");

Wait Handles and the Thread Pool

使用ThreadPool.RegisterWaitForSingleObject, 可以不绑特定的Thread来执行, 将要委託的任务交给Thread pool执行
    using System;    using System.Threading;        namespace TestWaitHandleThreadPool    {        class Program        {            static ManualResetEvent _starter = new ManualResetEvent(false);            static void Main(string[] args)            {                RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Go, "Some Data", -1, true);                Thread.Sleep(5000);                _starter.Set();                Console.ReadLine();                reg.Unregister(_starter);            }                static void Go(object data, bool timeOut)            {                Console.WriteLine("Start work : " + data);            }        }    }

参数-1代表不用timeout, 如果有timeout的话, 会检测传送的物件(範例是Some Data字串)的状态; 参数true代表该Thread pool收到signal后, 不再重设要Wait.

假如原本用WaiOne的方式处理, Server收到100个任务就得new 100个Thread, 变成绑定太多且大量Block. 改写的方法如下, 让后续的委託工作都给‘hread Pool处理

    void AppServerMethod()    {    _wh.WaitOne();    // ... continue execution    }        // 变成        void AppServerMethod()    {     RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Resume, null, -1, true);     // ...    }        static void Resume(object data, bool timeOut)    {    // ... continue execution    }

WaitAny, WaitAll, and SignalAndWait

WaitHandle提供static method, 包含WaitNay, WaitAll, SignalAndWait, 可以对有继承WaitHandle的物件使用较複杂的Signal/Wait的功能

WaitAny: 等待任一个Thread收到Signal

WaitAll: 等待所有Thread都收到Signal

SignalAndWait: 对第一个参数的thread发出signal, 对第二个参数的thread做等待

Alternatives to WaitAll and SignalAndWait

WaitAll和SignalAndWait不能在单一执行绪的环境执行.

SignalAndWait的替代方案是Barrier类别, 而WaitAll的替代方案是Parallel class的Invoke方法

Synchronization Contexts (.NET Core已不存在)

继承ContextBoundObject且加上Synchronization属性, CLR在这物件会自动使用lock

如下範例, 每个Demo函式会排队执行

    using System;    using System.Runtime.Remoting.Contexts;    using System.Threading;        namespace TestAutoLock    {        class Program        {            static void Main(string[] args)            {                AutoLock safeInstance = new AutoLock();                new Thread(safeInstance.Demo).Start();                new Thread(safeInstance.Demo).Start();                safeInstance.Demo();            }        }            [Synchronization]        public class AutoLock : ContextBoundObject        {            public void Demo()            {                Console.Write("Thread id : " + Thread.CurrentThread.ManagedThreadId);                Console.Write(" Start.....");                Thread.Sleep(1000);                Console.WriteLine("End");            }        }    }

自动lock不包含static的成员和没有继承ContextBoundObject的物件(比如Form)

想像是CLR将原始Class套上一层ContextBoundObject Proxy, 能呼叫原始Class的成员, 再为它的方法都加上同步化的功能

如果前面的AutoLock是个Collection, 则使用它的物件也必须是ContextBoundObject, 否则存取它的item需要手动加上lock

Synchronization Context预设会延伸从同一层Scope的Context, 也就是lock包含的深度一直向下

在Synchronization的attribute可以改变预设的行为, 有这些选项:

NOT_SUPPORTED: 就跟没加上Synchronization的属性一样

SUPPORTED: 如果来自别的synchronized 物件做初始化, 则延伸它的context, 否则保持unsynchronized

REQUIRED (预设): 如果来自别的synchronized 物件做初始化, 则延伸它的context, 否则建立新的Context

REQUIRES_NEW: 总是建立新增Synchronization context

以下範例是会产生Deadlock的Synchronization:
    using System;    using System.Runtime.Remoting.Contexts;    using System.Threading;        namespace TestAutoLockDeadlock    {        [Synchronization]        public class Deadlock : ContextBoundObject        {            public Deadlock Other;                public void Demo()            {                Console.WriteLine(Thread.CurrentThread.ManagedThreadId);                Thread.Sleep(1000);                Console.WriteLine("Call other");                Other.Hello();            }                void Hello()            {                Console.WriteLine("hello");            }        }        class Program        {            private static void Main(string[] args)            {                Deadlock dead1 = new Deadlock();                Deadlock dead2 = new Deadlock();                dead1.Other = dead2;                dead2.Other = dead1;                new Thread(dead1.Demo).Start();                dead2.Demo();                Console.Read();            }        }    }

两个Deadlock物件都是在Program建立, Program本身是unsynchronized, 所以Deadlock物件建立各自的Synchronization Context, 也有各自的lock

呼叫对方的Hello方法后, 即发生Deadlock

Reentrancy

Reentrant的定义是, 如果有段程式码被中断, 执行绪去执行别的程式, 之后再回来执行这段程式而没造成影响

通常Thread-safe和reentrant视为同等

如果[Synchronization(true)]这样使用, 代表需要reentry, 当执行离开此程式码时, 会把lock释放, 可避免deadlock. 副作用是在这释放期间, 任何thread可以进入该物件的context(比如呼叫它的方法)

[Synchronization(true)]是类别层级, 所以在非该context的呼叫都会当class层面的木马(?)

如果没有reentrancy, 则在一些场合比较难工作, 比如在一个synchronized class实作多执行绪, 将逻辑委託给其他worker thread, 则worker thread彼此间要沟通没有reentrancy的话, 将会受阻.

同步自动锁造成deadlock, reentrancy, 删除併发等问题, 在一些应用场合没有手动lock来的好用

参考资料

Threading in C#, PART 2: BASIC SYNCHRONIZATION, Joseph Albahari.C# 8.0 in a Nutshell: The Definitive Reference, Joseph Albahari (Amazon)

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章