实例解析C++/CLI线程之线程状态持久性
来源:岁月联盟
时间:2006-09-23
我们可使用类Monitor与类Thread中的某些函数,直接控制线程的同步,请看例1。
例1:
using namespace System;
using namespace System::Threading;
int main()
{
/*1*/ MessageBuffer^ m = gcnew MessageBuffer;
/*2a*/ ProcessMessages^ pm = gcnew ProcessMessages(m);
/*2b*/ Thread^ pmt = gcnew Thread(gcnew ThreadStart(pm,&ProcessMessages::ProcessMessagesEntryPoint));
/*2c*/ pmt->Start();
/*3a*/ CreateMessages^ cm = gcnew CreateMessages(m);
/*3b*/ Thread^ cmt = gcnew Thread(gcnew ThreadStart(cm, &CreateMessages::CreateMessagesEntryPoint));
/*3c*/ cmt->Start();
/*4*/ cmt->Join();
/*5*/ pmt->Interrupt();
/*6*/ pmt->Join();
Console::WriteLine("Primary thread terminating");
}
public ref class MessageBuffer
{
String^ messageText;
public:
void SetMessage(String^ s)
{
/*7*/ Monitor::Enter(this);
messageText = s;
/*8*/ Monitor::Pulse(this);
Console::WriteLine("Set new message {0}", messageText);
Monitor::Exit(this);
}
void ProcessMessages()
{
/*9*/ Monitor::Enter(this);
while (true)
{
try
{
/*10*/ Monitor::Wait(this);
}
catch (ThreadInterruptedException^ e)
{
Console::WriteLine("ProcessMessage interrupted");
return;
}
Console::WriteLine("Processed new message {0}", messageText);
}
Monitor::Exit(this);
}
};
public ref class CreateMessages
{
MessageBuffer^ msg;
public:
CreateMessages(MessageBuffer^ m)
{
msg = m;
}
void CreateMessagesEntryPoint()
{
for (int i = 1; i <= 5; ++i)
{
msg->SetMessage(String::Concat("M-", i.ToString()));
Thread::Sleep(2000);
}
Console::WriteLine("CreateMessages thread terminating");
}
};
public ref class ProcessMessages
{
MessageBuffer^ msg;
public:
ProcessMessages(MessageBuffer^ m)
{
msg = m;
}
void ProcessMessagesEntryPoint()
{
msg->ProcessMessages();
Console::WriteLine("ProcessMessages thread terminating");
}
};
在标记1中,创建一个MessageBuffer类型的共享缓冲区;接着在标记2a、2b、2c中,创建了一个线程用于处理放置于缓冲区中的每条信息;标记3a、3b和3c,也创建了一个线程,并在共享缓冲区中放置了连续的5条信息以便处理。这两个线程已被同步,因此处理者线程必须等到有"东西"放入到缓冲区中,才可以进行处理,且在前一条信息被处理完之前,不能放入第二条信息。在标记4中,将一直等待,直到创建者线程完成它的工作。
当标记5执行时,处理者线程必须处理所有创建者线程放入的信息,因为使用了Thread::Interrupt让其停止工作,并继续等待标记6中调用的Thread::Join,这个函数允许调用线程阻塞它自己,直到其他线程结束。(一个线程可指定一个等待的最大时间,而不用无限等待下去。)
线程CreateMessages非常清晰明了,它向共享缓冲区中写入了5条信息,并在每条信息之间等待2秒。为把一个线程挂起一个给定的时间(以毫秒计),我们调用了Thread::Sleep,在此,一个睡眠的线程可再继续执行,原因在于运行时环境,而不是另一个线程。
线程ProcessMessages甚至更加简单,因为它利用了类MessageBuffer来做它的所有工作。类MessageBuffer中的函数是被同步的,因此在同一时间,只有一个函数能访问共享缓冲区。
主程序首先启动处理者线程,这个线程会执行ProcessMessages,其将获得父对象的同步锁;然而,它立即调用了标记10中的Wait函数,这个函数将让它一直等待,直到再次被告之运行,期间,它也交出了同步锁,这样,允许创建者线程得到同步锁并执行SetMessage。一旦函数把新的信息放入到共享缓冲区中,就会调用标记8中的Pulse,其允许等待同步锁的任意线程被唤醒,并继续执行下去。但是,在SetMessage执行完成之前,这些都不可能发生,因为它在函数返回前都不可能交出同步锁。如果情况一旦发生,处理者线程将重新得到同步锁,并从标记10之后开始继续执行。此处要说明的是,一个线程即可无限等待,也可等到一个指定的时间到达。插1是程序的输出。
插1:
Set new message M-1
Processed new message M-1
Set new message M-2
Processed new message M-2
Set new message M-3
Processed new message M-3
Set new message M-4
Processed new message M-4
Set new message M-5
Processed new message M-5
CreateMessages thread terminating
ProcessMessage interrupted
ProcessMessages thread terminating
Primary thread terminating
请仔细留意,处理者线程启动于创建者线程之前。如果以相反的顺序启动,将会在没有处理者线程等待的情况下,添加第一条信息,此时,没有可供唤醒处理者线程,当处理者线程运行到它的第一个函数调用Wait时,将会错过第一条信息,且只会在第二条信息存储时被唤醒。
管理线程
默认情况下,如果一个线程是前台线程,它将会一直执行下去,直到进入点函数结束,而不管它父类的生命期是多久;而在另一方面,后台线程则会在父类线程结束时自动结束。可通过设置Thread的IsBackground属性,把一个线程配置为后台线程,用同样的方法,也可把一个后台线程配置为前台线程。
一旦线程被启动,它即为活跃线程,可通过检查Thread的IsAlive属性来判断一个线程是否为活跃线程;通过调用Wait函数,并传递给它一个零毫秒,可使一个线程放弃剩余的CPU时间片;另外,线程还可通过CurrentThread::Thread::CurrentThread属性得到其自己的Thread对象。
每个线程都有与之相关的优先级,运行时环境(即操作)通过它来调度线程的执行,可通过Thread::Priority属性来设置或检测线程的优先级,它的范围从ThreadPriority::Lowest 到ThreadPriority::Highest,默认情况下,线程的优先级为ThreadPriority::Normal。另外,因为实现环境的不同,线程调度会有所不同,所以在控制线程方面,不应该过分依赖线程的优先级。
易变字段(域)
volatile这个限定类型告诉编译器,可能会有多个线程控制或访问它所指定的对象,尤其是,一个或多个线程可能将异步读写此变量。基本上,这个限定词是强制编译器在进行优化时不要那么"激进"。
请看例2中的代码段,在缺少volatile时,标记1中的代码完全可以忽略,因为在标记2中立即就改写了i的值;然而,指定了volatile后,编译器则必须执行这两行代码。
例2:
volatile int i = 0;
/*1*/ i = 10;
/*2*/ i = 20;
/*3*/ if (i < 5 || i > 10) {
// ...
}
int copy = i;
/*4*/ if (copy < 5 || copy > 10) {
// ...
}
在标记3中,编译器必须生成取回值i的代码两次,但是,在两次取值过程中,数值都有可能改变。为确保我们测试的是同一个值,在此不得不以类似标记4的代码来代替。通过把值i的一个快照存储在一个非易变的变量中,我们就可以安全地多次使用这个值了--因为它的值不可能在"后台"改变。在此,使用volatile,可避免对特定类型变量的显式异步访问。本篇文章发表于
线程局部存储
当编写多线程应用程序时,只在特定的线程中使用特定的变量,这是一个非常好的习惯,请看例3的程序:
例3:
using namespace System;
using namespace System::Threading;
public ref class ThreadX
{
/*1*/ int m1;
/*2*/ static int m2 = 20;
/*3*/ [ThreadStatic] static int m3 = 30;
public:
ThreadX()
{
m1 = 10;
}
void TMain()
{
String^ threadName = Thread::CurrentThread->Name;
/*4*/ Monitor::Enter(ThreadX::typeid);
for (int i = 1; i <= 5; ++i)
{
++m1;
++m2;
++m3;
}
Console::WriteLine("Thread {0}: m1 = {1}, m2 = {2}, m3 = {3}",
threadName, m1, m2, m3);
Monitor::Exit(ThreadX::typeid);
}
};
int main()
{
/*5*/ Thread::CurrentThread->Name = "t0";
ThreadX^ o1 = gcnew ThreadX;
Thread^ t1 = gcnew Thread(gcnew ThreadStart(o1, &ThreadX::TMain));
t1->Name = "t1";
ThreadX^ o2 = gcnew ThreadX;
Thread^ t2 = gcnew Thread(gcnew ThreadStart(o2, &ThreadX::TMain));
t2->Name = "t2";
t1->Start();
/*6*/ (gcnew ThreadX)->TMain();
t2->Start();
t1->Join();
t2->Join();
}
m1是一个实例字段,所以每个ThreadX的实例都有一份各自的拷贝,且在父类对象的生命期中都会存在;而另一方面,m2是一个类字段,所以对类来说,不管有几个类的实例,它只有单独的一个,从理论上来说,它将会一直存在,直到程序结束。但这两个字段都不是特定于某个线程的,如果以适当的构造,这两种类型的字段都能被多个线程访问。
简单来说,线程局部存储就是特定线程拥有的某段内存,这段内存在新线程创建时被分配,而在线程结束时被释放,它结合了局部变量的私有性和静态变量的持久性。通过指定ThreadStatic属性,可把一个字段标记为线程局部类型,如例中的标记3所示,在成为静态字段之后,m3甚至还能有一个初始化函数。
函数TMain为新线程的入口点,这个函数只是简单地递增这三个变量:m1、m2和m3,每回5次,并打印出它们当前的值。标记4中的同步锁保证了在这些字段递增或打印时,另一个线程不会同时访问它们。
在标记5中,主线程把它的名字设置为t0,接着创建并启动了两个线程,另外,它也把TMain当作了一个普通函数直接调用,而不是作为创建的新线程的一部分来调用。程序的输出请见插2。
插2:
Thread t0: m1 = 15, m2 = 25, m3 = 35
Thread t1: m1 = 15, m2 = 30, m3 = 5
Thread t2: m1 = 15, m2 = 35, m3 = 5
每个线程都有其自己的m1实例,它被初始化为10,所以在递增5次之后,每个线程中的值都为15。而m2则有所不同,所有的三个线程都共享同一变量,所以这一变量被递增了15次。
线程t1与t2在经过线程创建过程之后,每个都有其自己的m3,然而,这些线程局部变量会被赋予默认的零值,而不是在源代码中初始化的30,注意了,在经过5次递增之后,各个值均为5,而线程t0则有所不同,正如我们所看到的,这个线程不是由创建其他两个线程同样的机制创建的,所以,它的m3会接受显式初始化的值30。同时也请注意标记6,TMain作为一个普通函数被调用,而不是作为创建的新线程的一部分。
原子性与互锁操作
如果存在这样一种情况:一个应用程序有多个线程并行运行,每个线程对某些共享的整形变量,都有写操作--只是简单地使用++把变量递增1。这看起来似乎没什么问题,毕竟,还算像是一个原子性操作,但在多数中--至少从机器指令的角度来看,C++/CLI执行环境对所有整形类型,并不能普遍地保证无误。
作为示例,例4中的程序有三个线程,每个线程都同时递增一个共享的64位整形变量一千万次,最后显示出这个变量的最终值,从理论上说,应该共递增了三千万次。这个程序目前可以两种方式运行:默认方式使用++操作符以非同步方式运行;而另一种方式,通过带有命令行参数Y或y,这回使用了一个同步的库递增函数。
例4:
using namespace System;
using namespace System::Threading;
static bool interlocked = false;
const int maxCount = 10000000;
/*1*/ static long long value = 0;
void TMain()
{
if (interlocked)
{
for (int i = 1; i <= maxCount; ++i)
{
/*2*/ Interlocked::Increment(value);
}
}
else
{
for (int i = 1; i <= maxCount; ++i)
{
/*3*/ ++value;
}
}
}
int main(array<String^>^ argv)
{
if (argv->Length == 1)
{
if (argv[0]->Equals("Y") || argv[0]->Equals("y"))
{
interlocked = true;
}
}
/*4*/ Thread^ t1 = gcnew Thread(gcnew ThreadStart(&TMain));
Thread^ t2 = gcnew Thread(gcnew ThreadStart(&TMain));
Thread^ t3 = gcnew Thread(gcnew ThreadStart(&TMain));
t1->Start();
t2->Start();
t3->Start();
t1->Join();
t2->Join();
t3->Join();
Console::WriteLine("After {0} operations, value = {1}", 3 * maxCount, value);
}
当使用标准++操作符时,程序5次连续执行之后,输出如插3所示,可看出,结果与正确答案相距甚远,简单估算,大概有17%至50%的递增操作未正确完成;当程序运行于同步方式时--即使用Interlocked::Increment,所有的三千万次递增操作都正常完成,结果计算正确。本篇文章发表于
插3:
使用++操作符的输出
After 30000000 operations, value = 14323443
After 30000000 operations, value = 24521969
After 30000000 operations, value = 20000000
After 30000000 operations, value = 24245882
After 30000000 operations, value = 25404963
使用Interlocked递增函数的输出
After 30000000 operations, value = 30000000
另外,补充一点,Interlocked类还有另一个decrement函数。