What’s lock? Why lock?

What’s lock?

多个进程可能对同一片地址空间操作并共享其中的数据结构,所以需要一种机制防止他们互相干扰。所以提出了锁,锁提供互斥功能。如果只能在持有特定锁时才能操作某个数据结构,就能保证在同一时间只有一个进程操作这个数据结构。

Why lock?

竞争条件

比如在 xv6-book 中提到的例子:

1   struct list{
2       int data;
3       struct list *next;
4   };
5
6   struct list *list = 0;
7
8   void
9   insert(int data)
10  {
11      struct list *l;
12
13      l = malloc(sizeof *l);
14      l->data = data;
15      l->next = list;
16      list = l;
17  }

如果两个不同的 CPU 同时执行 insert,可能会两者都运行到15行,而都未开始运行16行。这样的话,就会出现两个链表节点,并且 next 都被设置为 list。当两者都运行了16行的赋值后,后运行的一个会覆盖前运行的一个;于是先赋值的一个进程中添加的节点就丢失了。这种问题就被称为竞争条件。
f4-1-1
通过锁可以避免竞争条件:

6   struct list *list = 0;
    struct lock listlock;
7   
8   void
9   insert(int data)
10  {
11      struct list *l;
12
        acquire(&listlock);
13      l = malloc(sizeof *l);
14      l->data = data;
15      l->next = list;
16      list = l;
        release(&listlock);
17  }

spinlock 自旋锁

自旋锁是计算机科学用于多线程同步的一种锁,线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。
自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。因此操作系统的实现在很多地方往往用自旋锁。Windows操作系统提供的轻型读写锁(SRW Lock)内部就用了自旋锁。显然,单核CPU不适于使用自旋锁,这里的单核CPU指的是单核单线程的CPU,因为,在同一时间只有一个线程是处在运行状态,假设运行线程A发现无法获取锁,只能等待解锁,但因为A自身不挂起,所以那个持有锁的线程B没有办法进入运行状态,只能等到操作系统分给A的时间片用完,才能有机会被调度。这种情况下使用自旋锁的代价很高。

xv6 spinlock 源码分析

spinlock.h

spinlock.h 中定义了 spinlock 的结构体:

struct spinlock {
	uint locked;

	char *name;
	struct cpu *cpu;
	uint pcs[10];
};

其中 locked 表示该锁是否被持有,其余的属性都是用来 debug 的。在锁可以被获得时值为 0,而当锁已经被获得时值为非零。

spinlock.c 中定义了 spinlock 的一些方法。

initlock()

spinlock 的初始化吧

void initlock(struct spinlock *lk, char *name) {
  lk->name = name;
  lk->locked = 0;
  lk->cpu = 0;
}

pushcli() & popcli()

虽然接下来定义的是持有锁的方法,但是在 acquire() 中使用到了 pushcli 这个方法,所以先来看一下。

// Pushcli/popcli are like cli/sti except that they are matched:
// it takes two popcli to undo two pushcli.  Also, if interrupts
// are off, then pushcli, popcli leaves them off.

void pushcli(void) {
  int eflags;

  eflags = readeflags();
  cli();
  if(mycpu()->ncli == 0)
    mycpu()->intena = eflags & FL_IF;
  mycpu()->ncli += 1;
}

void popcli(void) {
  if(readeflags()&FL_IF)
    panic("popcli - interruptible");
  if(--mycpu()->ncli < 0)
    panic("popcli");
  if(mycpu()->ncli == 0 && mycpu()->intena)
    sti();
}

clisti 就是汇编指令,用于禁止中断发生和允许中断发生,可以看到这边 pushclipopcli 中分别用到了 cli()sti() 查看对应实现就是调用汇编来实现的:

static inline void cli(void) {
  asm volatile("cli");
}

static inline void sti(void) {
  asm volatile("sti");
}

nclicpu 这个结构体中的属性,表示的是 pushcli 的嵌套深度。intena 表示在这个 pushcli 之前是否允许中断。

acquire()

直接上源码

void acquire(struct spinlock *lk) {
  pushcli(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // The xchg is atomic.
  while(xchg(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen after the lock is acquired.
  __sync_synchronize();

  // Record info about lock acquisition for debugging.
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}

调用 pushcli() 禁用中断防止死锁。
逻辑上说,acquire() 应该这样写:

void acquire(struct spinlock *lk) {
	for (;;) {
		if (!lk->locked) {
			lk->locked = 1;
			break;
		}
	}
}

但是这段代码的问题在于 !lk->lockedlk->locked = 1 分开成两条语句,会造成竞争条件。所以使用 xchg 这样的原子操作。

release()

放锁类似,上源码

void release(struct spinlock *lk) {
  if(!holding(lk))
    panic("release");

  lk->pcs[0] = 0;
  lk->cpu = 0;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other cores before the lock is released.
  // Both the C compiler and the hardware may re-order loads and
  // stores; __sync_synchronize() tells them both not to.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code can't use a C assignment, since it might
  // not be atomic. A real OS would use C atomics here.
  asm volatile("movl $0, %0" : "+m" (lk->locked) : );

  popcli();
}

这里在给 locked 赋值的时候没有直接赋值,而是通过汇编语言,注释也写上了直接赋值是非原子性的。

holding()

判断是否持有锁的方法。

int holding(struct spinlock *lock) {
  return lock->locked && lock->cpu == mycpu();
}

非常的简单,就查看 locked 是否为真且 lock 存的 cpu 是否是当前的 cpu

getcallerpcs()

spinlock.c 中还有一个方法是 getcallerpcs() ,不过暂时还没有看懂是干嘛用的。

先贴代码

// Record the current call stack in pcs[] by following the %ebp chain.
void getcallerpcs(void *v, uint pcs[]) {
  uint *ebp;
  int i;

  ebp = (uint*)v - 2;
  for(i = 0; i < 10; i++){
    if(ebp == 0 || ebp < (uint*)KERNBASE || ebp == (uint*)0xffffffff)
      break;
    pcs[i] = ebp[1];     // saved %eip
    ebp = (uint*)ebp[0]; // saved %ebp
  }
  for(; i < 10; i++)
    pcs[i] = 0;
}

内存屏障

spinlockxv6 源码大概就是以上,算是一个还算比较容易的源码部分。不过注意到有一条语句 __sync_synchronize(),可以再深入了解一些。

内存屏障(Memory barrier),大多数现代计算机为了提高性能而采取乱序执行,所以需要内存屏障使之前的所有读写操作都执行后才可以开始之后的操作,更多的可以看最后资料中的 Why Memory Barriers。

上面提到的 __sync_synchronize() 就是这样一条 gcc 内置的函数。

资料

  1. xv6-book 第四章
  2. 6.172 LECTURE16 Synchronizing without locks
  3.  Why Memory Barriers?中文翻译(上) 
  4. 内存屏障——维基百科