patch-2.3.22 linux/net/sunrpc/sched.c

Next file: linux/net/sunrpc/xprt.c
Previous file: linux/net/sunrpc/auth.c
Back to the patch index
Back to the overall index

diff -u --recursive --new-file v2.3.21/linux/net/sunrpc/sched.c linux/net/sunrpc/sched.c
@@ -18,6 +18,7 @@
 #include <linux/unistd.h>
 #include <linux/smp.h>
 #include <linux/smp_lock.h>
+#include <linux/spinlock.h>
 
 #include <linux/sunrpc/clnt.h>
 
@@ -74,6 +75,16 @@
 static int			swap_buffer_used = 0;
 
 /*
+ * Spinlock for wait queues. Access to the latter also has to be
+ * interrupt-safe in order to allow timers to wake up sleeping tasks.
+ */
+spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
+/*
+ * Spinlock for other critical sections of code.
+ */
+spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
+
+/*
  * Add new request to wait queue.
  *
  * Swapper tasks always get inserted at the head of the queue.
@@ -81,8 +92,8 @@
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-int
-rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static int
+__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
 	if (task->tk_rpcwait) {
 		if (task->tk_rpcwait != queue)
@@ -104,12 +115,24 @@
 	return 0;
 }
 
+int
+rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
+{
+	unsigned long oldflags;
+	int result;
+
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	result = __rpc_add_wait_queue(q, task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+	return result;
+}
+
 /*
  * Remove request from queue.
- * Note: must be called with interrupts disabled.
+ * Note: must be called with spin lock held.
  */
-void
-rpc_remove_wait_queue(struct rpc_task *task)
+static void
+__rpc_remove_wait_queue(struct rpc_task *task)
 {
 	struct rpc_wait_queue *queue;
 
@@ -122,6 +145,16 @@
 				task->tk_pid, queue, rpc_qname(queue));
 }
 
+void
+rpc_remove_wait_queue(struct rpc_task *task)
+{
+	unsigned long oldflags;
+
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	__rpc_remove_wait_queue(task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
 /*
  * Set up a timer for the current task.
  */
@@ -165,7 +198,7 @@
  * Make an RPC task runnable.
  *
  * Note: If the task is ASYNC, this must be called with 
- * interrupts disabled to protect the wait queue operation.
+ * the spinlock held to protect the wait queue operation.
  */
 static inline void
 rpc_make_runnable(struct rpc_task *task)
@@ -177,7 +210,7 @@
 	task->tk_flags |= RPC_TASK_RUNNING;
 	if (RPC_IS_ASYNC(task)) {
 		int status;
-		status = rpc_add_wait_queue(&schedq, task);
+		status = __rpc_add_wait_queue(&schedq, task);
 		if (status)
 		{
 			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
@@ -214,18 +247,12 @@
 __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 			rpc_action action, rpc_action timer)
 {
-	unsigned long	oldflags;
 	int status;
 
 	dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
 				rpc_qname(q), jiffies);
 
-	/*
-	 * Protect the execution below.
-	 */
-	save_flags(oldflags); cli();
-
-	status = rpc_add_wait_queue(q, task);
+	status = __rpc_add_wait_queue(q, task);
 	if (status)
 	{
 		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
@@ -240,7 +267,6 @@
 		task->tk_flags &= ~RPC_TASK_RUNNING;
 	}
 
-	restore_flags(oldflags);
 	return;
 }
 
@@ -248,11 +274,17 @@
 rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 				rpc_action action, rpc_action timer)
 {
+	unsigned long	oldflags;
+	/*
+	 * Protect the queue operations.
+	 */
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	__rpc_sleep_on(q, task, action, timer);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
- * Wake up a single task -- must be invoked with bottom halves off.
+ * Wake up a single task -- must be invoked with spin lock held.
  *
  * It would probably suffice to cli/sti the del_timer and remove_wait_queue
  * operations individually.
@@ -272,7 +304,7 @@
 #endif
 	rpc_del_timer(task);
 	if (task->tk_rpcwait != &schedq)
-		rpc_remove_wait_queue(task);
+		__rpc_remove_wait_queue(task);
 	if (!RPC_IS_RUNNING(task)) {
 		task->tk_flags |= RPC_TASK_CALLBACK;
 		rpc_make_runnable(task);
@@ -289,7 +321,7 @@
 	dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
 	task->tk_status = -ETIMEDOUT;
 	task->tk_timeout = 0;
-	__rpc_wake_up(task);
+	rpc_wake_up_task(task);
 }
 
 /*
@@ -300,9 +332,9 @@
 {
 	unsigned long	oldflags;
 
-	save_flags(oldflags); cli();
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	__rpc_wake_up(task);
-	restore_flags(oldflags);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
@@ -315,10 +347,10 @@
 	struct rpc_task	*task;
 
 	dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
-	save_flags(oldflags); cli();
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	if ((task = queue->task) != 0)
 		__rpc_wake_up(task);
-	restore_flags(oldflags);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 
 	return task;
 }
@@ -331,10 +363,10 @@
 {
 	unsigned long	oldflags;
 
-	save_flags(oldflags); cli();
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	while (queue->task)
 		__rpc_wake_up(queue->task);
-	restore_flags(oldflags);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
@@ -346,12 +378,12 @@
 	struct rpc_task	*task;
 	unsigned long	oldflags;
 
-	save_flags(oldflags); cli();
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	while ((task = queue->task) != NULL) {
 		task->tk_status = status;
 		__rpc_wake_up(task);
 	}
-	restore_flags(oldflags);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
@@ -369,7 +401,7 @@
 __rpc_atrun(struct rpc_task *task)
 {
 	task->tk_status = 0;
-	__rpc_wake_up(task);
+	rpc_wake_up_task(task);
 }
 
 /*
@@ -432,13 +464,13 @@
 		 * and the RPC reply arrives before we get here, it will
 		 * have state RUNNING, but will still be on schedq.
 		 */
-		save_flags(oldflags); cli();
+		spin_lock_irqsave(&rpc_queue_lock, oldflags);
 		if (RPC_IS_RUNNING(task)) {
 			if (task->tk_rpcwait == &schedq)
-				rpc_remove_wait_queue(task);
+				__rpc_remove_wait_queue(task);
 		} else while (!RPC_IS_RUNNING(task)) {
 			if (RPC_IS_ASYNC(task)) {
-				restore_flags(oldflags);
+				spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 				return 0;
 			}
 
@@ -448,9 +480,9 @@
 			if (current->pid == rpciod_pid)
 				printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
 
-			sti();
+			spin_unlock_irq(&rpc_queue_lock);
 			__wait_event(task->tk_wait, RPC_IS_RUNNING(task));
-			cli();
+			spin_lock_irq(&rpc_queue_lock);
 
 			/*
 			 * When the task received a signal, remove from
@@ -462,7 +494,7 @@
 			dprintk("RPC: %4d sync task resuming\n",
 							task->tk_pid);
 		}
-		restore_flags(oldflags);
+		spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 
 		/*
 		 * When a sync task receives a signal, it exits with
@@ -522,15 +554,16 @@
 	int need_resched = current->need_resched;
 
 	dprintk("RPC:      rpc_schedule enter\n");
-	save_flags(oldflags);
 	while (1) {
-		cli();
-		if (!(task = schedq.task))
+		spin_lock_irqsave(&rpc_queue_lock, oldflags);
+		if (!(task = schedq.task)) {
+			spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 			break;
+		}
 		rpc_del_timer(task);
-		rpc_remove_wait_queue(task);
+		__rpc_remove_wait_queue(task);
 		task->tk_flags |= RPC_TASK_RUNNING;
-		restore_flags(oldflags);
+		spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 
 		__rpc_execute(task);
 
@@ -541,7 +574,6 @@
 		if (need_resched)
 			schedule();
 	}
-	restore_flags(oldflags);
 	dprintk("RPC:      rpc_schedule leave\n");
 }
 
@@ -626,11 +658,13 @@
 	task->tk_suid_retry = 1;
 
 	/* Add to global list of all tasks */
+	spin_lock(&rpc_sched_lock);
 	task->tk_next_task = all_tasks;
 	task->tk_prev_task = NULL;
 	if (all_tasks)
 		all_tasks->tk_prev_task = task;
 	all_tasks = task;
+	spin_unlock(&rpc_sched_lock);
 
 	if (clnt)
 		clnt->cl_users++;
@@ -679,10 +713,12 @@
 rpc_release_task(struct rpc_task *task)
 {
 	struct rpc_task	*next, *prev;
+	unsigned long	oldflags;
 
 	dprintk("RPC: %4d release task\n", task->tk_pid);
 
 	/* Remove from global task list */
+	spin_lock(&rpc_sched_lock);
 	prev = task->tk_prev_task;
 	next = task->tk_next_task;
 	if (next)
@@ -691,6 +727,19 @@
 		prev->tk_next_task = next;
 	else
 		all_tasks = next;
+	task->tk_next_task = task->tk_prev_task = NULL;
+	spin_unlock(&rpc_sched_lock);
+
+	/* Protect the execution below. */
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+
+	/* Delete any running timer */
+	rpc_del_timer(task);
+
+	/* Remove from any wait queue we're still on */
+	__rpc_remove_wait_queue(task);
+
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 
 	/* Release resources */
 	if (task->tk_rqstp)
@@ -738,12 +787,15 @@
 static void
 rpc_child_exit(struct rpc_task *child)
 {
+	unsigned long	oldflags;
 	struct rpc_task	*parent;
 
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	if ((parent = rpc_find_parent(child)) != NULL) {
 		parent->tk_status = child->tk_status;
-		rpc_wake_up_task(parent);
+		__rpc_wake_up(parent);
 	}
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 	rpc_release_task(child);
 }
 
@@ -772,11 +824,11 @@
 {
 	unsigned long oldflags;
 
-	save_flags(oldflags); cli();
-	rpc_make_runnable(child);
-	restore_flags(oldflags);
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	/* N.B. Is it possible for the child to have already finished? */
-	rpc_sleep_on(&childq, task, func, NULL);
+	__rpc_sleep_on(&childq, task, func, NULL);
+	rpc_make_runnable(child);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
@@ -789,8 +841,11 @@
 	struct rpc_task	**q, *rovr;
 
 	dprintk("RPC:      killing all tasks for client %p\n", clnt);
-	/* N.B. Why bother to inhibit? Nothing blocks here ... */
-	rpc_inhibit++;
+
+	/*
+	 * Spin lock all_tasks to prevent changes...
+	 */
+	spin_lock(&rpc_sched_lock);
 	for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) {
 		if (!clnt || rovr->tk_client == clnt) {
 			rovr->tk_flags |= RPC_TASK_KILLED;
@@ -798,11 +853,18 @@
 			rpc_wake_up_task(rovr);
 		}
 	}
-	rpc_inhibit--;
+	spin_unlock(&rpc_sched_lock);
 }
 
 static DECLARE_MUTEX_LOCKED(rpciod_running);
 
+static inline int
+rpciod_task_pending(void)
+{
+	return schedq.task != NULL || xprt_tcp_pending();
+}
+
+
 /*
  * This is the rpciod kernel thread
  */
@@ -810,7 +872,6 @@
 rpciod(void *ptr)
 {
 	wait_queue_head_t *assassin = (wait_queue_head_t*) ptr;
-	unsigned long	oldflags;
 	int		rounds = 0;
 
 	MOD_INC_USE_COUNT;
@@ -845,18 +906,15 @@
 			schedule();
 			rounds = 0;
 		}
-		save_flags(oldflags); cli();
 		dprintk("RPC: rpciod running checking dispatch\n");
 		rpciod_tcp_dispatcher();
 
-		if (!schedq.task) {
+		if (!rpciod_task_pending()) {
 			dprintk("RPC: rpciod back to sleep\n");
-			interruptible_sleep_on(&rpciod_idle);
+			wait_event_interruptible(rpciod_idle, rpciod_task_pending());
 			dprintk("RPC: switch to rpciod\n");
-			rpciod_tcp_dispatcher();
 			rounds = 0;
 		}
-		restore_flags(oldflags);
 	}
 
 	dprintk("RPC: rpciod shutdown commences\n");
@@ -983,8 +1041,12 @@
 	struct rpc_task *t = all_tasks, *next;
 	struct nfs_wreq *wreq;
 
-	if (!t)
+	spin_lock(&rpc_sched_lock);
+	t = all_tasks;
+	if (!t) {
+		spin_unlock(&rpc_sched_lock);
 		return;
+	}
 	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
 		"-rpcwait -action- --exit--\n");
 	for (; t; t = next) {
@@ -1007,5 +1069,6 @@
 			wreq->wb_file->f_dentry->d_parent->d_name.name,
 			wreq->wb_file->f_dentry->d_name.name);
 	}
+	spin_unlock(&rpc_sched_lock);
 }
 #endif

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)