UNIX操作系统的加锁解锁:等待事件及唤醒
加锁和解锁的基本思想是,当某个进程进入临界区,它将持有一个某种类型的锁(UNIX里一般来说是semaphore,Linux里一般是信号量和原子量或者spinlock)。当其他进程在该进程没有释放该锁时试图进入临界区(加锁),它将会被设置成睡眠状态,然后被置入等待该锁的进程队列(某个优先级的)。当该锁被释放时,也就是解锁事件发生时,内核将从等待该锁的进程优先级队列中寻找一个进程并将其置为就绪态,等待调度(schedule)。
在system v中,等待某一事件被称为sleep(sleep on an event),因此下文将统一使用睡眠(sleep)。等待某事件也可以成为等待某个锁。(注:本文中的sleep与sleep()系统调用不同)
系统的实现将一组事件映射到一组内核虚拟地址(锁);而且事件不区别对待到底有多少进程在等待。这就意味着两个不规则的事情:
一、当某个事件发生时,等待该事件的一组进程均被唤醒(而不是仅仅唤醒一个进程),并且状态均被设置成就绪(ready-to-run)。这时候由内核选择(schedule)一个进程来执行,由于system v内核不是可抢占的(Linux内核可抢占),因此其他的进程将一直在就绪状态等待调度,或者再次进入睡眠(因为该锁有可能被执行进程持有,而执行进程因为等待其他事件的发生而睡眠),或者等其他进程在用户态被抢占。
二、多个事件映射到同一个地址(锁)。假设事件e1和e2都映射到同一个地址(锁)addr,有一组进程在等待e1,一组进程在等待e2,它们等待的事件不同,但是对应的锁相同。假如e2发生了,所有等待e2的进程都被唤醒进入就绪状态,而由于e1没有发生,锁addr没有被释放,所有被唤醒的进程又回到睡眠状态。貌似一个事件对应一个地址会提高效率,但实际上由于system v是非抢占式内核,而且这种多对一映射非常少,再加上运行态进程很快就会释放资源(在其他进程被调度之前),因此这种映射不会导致性能的显著降低。
下面简单阐述一下sleep和wakeup的算法。
//伪代码
sleep(地址(事件),优先级)
返回值:进程能捕获的信号发生导致的返回则返回1,当进程不能捕获的信号发生时返回longjmp算法,否则返回0。
{ 提高处理器执行等级以禁用所有中断;//避免竞态条件 将进程的状态设置为睡眠; 根据事件将进程放入睡眠哈希队列;//一般来说每个事件都有一个等待队列 将睡眠地址(事件)及输入的优先级保存到进程表中; if (该等待是不可中断的等待) //一般有两种睡眠状态:可中断的和不可中断的。不可中断的睡眠是指进程除了等待的事件外, //不会被其他任何事件(如信号)中断睡眠状态,该情况不太常用。 { 上下文切换;//此处该进程执行上下文被保存起来,内核转而执行其他进程 //在别处进行了上下文切换,内核选择该上下文进行执行,此时该进程被唤醒 恢复处理器等级来允许中断; 返回0; } // 被信号中断的睡眠 if (没有未递送的信号) { 上下文切换; if (没有未递送的信号) { 恢复处理器等级来允许中断; 返回0; } } //有未递送的信号 若进程还在等待哈希队列中,将其从该队列移出; 恢复处理器等级来允许中断; if(进程捕获该信号) 返回1; 执行longjmp算法;//这一段我也不明白}
void wakeup(地址(事件))
{ 禁用所有的中断; 根据地址(事件)查找睡眠进程队列; for(每个在该事件上睡眠的进程) { 将该进程从哈希队列中移出; 设置状态为就绪; 将该进程放入调度链表中; 清除进程表中的睡眠地址(事件); if(进程不在内存中) { 唤醒swapper进程; } else if(唤醒的进程更适合运行) { 设置调度标志; } } 恢复中断;}
在wakeup调用之后,被唤醒的进程并不是马上投入运行,而是让其适合运行,等到下次调度该进程才有机会运行(也可能不会运行)。
代码示例:
由于没能找到UNIX的源码,因此用Linux的源代码替代。上述伪代码已经是比较简单的处理。Linux 0.01则更简单。
//Linux 0.01的实现://不可中断睡眠void sleep_on(struct task_struct **p){ struct task_struct *tmp; if (!p) return; if (current == &(init_task.task)) //current宏用来获取当前运行进程的task_struct panic("task[0] trying to sleep"); tmp = *p; //将已经在睡眠的进程p2保存到tmp *p = current; //将当前进程p1保存到睡眠进程队列中(实际上只有一个) current->state = TASK_UNINTERRUPTIBLE; //p1状态设置为不可中断的睡眠 schedule(); //上下文切换,执行其他进程 //p1被唤醒后回到此处 if (tmp) tmp->state=0; //将p2的状态设置为运行,等待调度}//可中断睡眠void interruptible_sleep_on(struct task_struct **p){ struct task_struct *tmp; if (!p) return; if (current == &(init_task.task)) panic("task[0] trying to sleep"); tmp=*p; //正在等待的进程p2保存到tmp *p=current; //将当前进程p1保存到睡眠进程队列中repeat: current->state = TASK_INTERRUPTIBLE; //将p1状态设置为可中断睡眠 schedule(); //上下文切换 if (*p && *p != current) { //p2睡眠被中断 (**p).state=0;//p1设置为运行态 goto repeat; //回到repeat,继续让p2睡眠 } *p=NULL; if (tmp) tmp->state=0; //将p2的状态设置为运行态,等待调度}
这两个函数比较难以理解,主要是在最后两条语句。在schedule()之前,切换的是当前进程的上下文,但是,切换回来之后,却是将原先正在睡眠的进程置为就绪态。在执行schedule()之前,各指针如下图所示(不好意思,不会粘贴图片):
---| p | --- || // ---- Step 3 ---------| *p |--------->| current | ---- --------- | X Step 1 | // ---------------- Step 2 -----| Wait Process |<--------| tmp | ---------------- -----
而在schedule()返回到这段代码之后,事情就不一样了。因为在step 3之后,current进程已经进入睡眠,tmp指向的睡眠进程的描述符也被保存下来。从schedule()返回之后,执行的代码仍然是current,而tmp指向的仍然是wait process,此时将其状态置为就绪,等待下一次调度。
与前两个函数相比,wake_up相当简单:
//被唤醒的进程并不是马上投入运行,而是让其适合运行void wake_up(struct task_struct **p){ if (p && *p) { (**p).state=0; //将要唤醒的进程状态置为就绪 *p=NULL; //将进程移出等待的进程 }}
有了sleep_on()和wake_up()之后,就可以对资源加锁了,如(硬盘缓冲加锁、等待缓冲可用、唤醒等待进程):
//锁住bhstatic inline void lock_buffer(struct buffer_head * bh){ if (bh->b_lock) printk("hd.c: buffer multiply locked/n"); bh->b_lock=1;} static inline void unlock_buffer(struct buffer_head * bh){ if (!bh->b_lock) printk("hd.c: free buffer being unlocked/n"); bh->b_lock=0; wake_up(&bh->b_wait);} static inline void wait_on_buffer(struct buffer_head * bh){ cli(); //禁止中断 while (bh->b_lock) sleep_on(&bh->b_wait); sti(); //恢复中断}//Linux 0.99.15的sleep和wake_up的实现(支持等待队列):static inline void __sleep_on(struct wait_queue **p, int state){ unsigned long flags; struct wait_queue wait = { current, NULL }; if (!p) return; if (current == task[0]) panic("task[0] trying to sleep"); current->state = state; add_wait_queue(p, &wait); //将当前进程加入等待队列 save_flags(flags); //保存中断掩码 sti(); //屏蔽中断 schedule(); //上下文切换 remove_wait_queue(p, &wait); //从等待队列中移除当前进程 restore_flags(flags); //恢复中断掩码}void wake_up(struct wait_queue **q){ struct wait_queue *tmp; struct task_struct * p; if (!q || !(tmp = *q)) return; do {//将等待队列中唤醒队首进程 if ((p = tmp->task) != NULL) { if ((p->state == TASK_UNINTERRUPTIBLE) || (p->state == TASK_INTERRUPTIBLE)) { p->state = TASK_RUNNING; if (p->counter > current->counter) need_resched = 1; } } if (!tmp->next) { printk("wait_queue is bad (eip = %08lx)/n",((unsigned long *) q)[-1]); printk(" q = %p/n",q); printk(" *q = %p/n",*q); printk(" tmp = %p/n",tmp); break; } tmp = tmp->next; } while (tmp != *q);}
(责任编辑:云子)