patch-2.4.10 linux/net/sunrpc/xprt.c

Next file: linux/net/unix/af_unix.c
Previous file: linux/net/sunrpc/xdr.c
Back to the patch index
Back to the overall index

diff -u --recursive --new-file v2.4.9/linux/net/sunrpc/xprt.c linux/net/sunrpc/xprt.c
@@ -75,10 +75,6 @@
  * Local variables
  */
 
-/* Spinlock for critical sections in the code. */
-spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;
-spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
-
 #ifdef RPC_DEBUG
 # undef  RPC_DEBUG_DATA
 # define RPCDBG_FACILITY	RPCDBG_XPRT
@@ -172,6 +168,44 @@
 }
 
 /*
+ * Serialize write access to sockets, in order to prevent different
+ * requests from interfering with each other.
+ * Also prevents TCP socket reconnections from colliding with writes.
+ */
+static int
+xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+	int retval;
+	spin_lock_bh(&xprt->sock_lock);
+	if (!xprt->snd_task)
+		xprt->snd_task = task;
+	else if (xprt->snd_task != task) {
+		dprintk("RPC: %4d TCP write queue full (task %d)\n",
+			task->tk_pid, xprt->snd_task->tk_pid);
+		task->tk_timeout = 0;
+		task->tk_status = -EAGAIN;
+		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+	}
+	retval = xprt->snd_task == task;
+	spin_unlock_bh(&xprt->sock_lock);
+	return retval;
+}
+
+/*
+ * Releases the socket for use by other requests.
+ */
+static void
+xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+	spin_lock_bh(&xprt->sock_lock);
+	if (xprt->snd_task == task) {
+		xprt->snd_task = NULL;
+		rpc_wake_up_next(&xprt->sending);
+	}
+	spin_unlock_bh(&xprt->sock_lock);
+}
+
+/*
  * Write data to socket.
  */
 static inline int
@@ -285,7 +319,10 @@
 
 	if (xprt->nocong)
 		return;
-	spin_lock_bh(&xprt_sock_lock);
+	/*
+	 * Note: we're in a BH context
+	 */
+	spin_lock(&xprt->xprt_lock);
 	cwnd = xprt->cwnd;
 	if (result >= 0) {
 		if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
@@ -313,7 +350,7 @@
 
 	xprt->cwnd = cwnd;
  out:
-	spin_unlock_bh(&xprt_sock_lock);
+	spin_unlock(&xprt->xprt_lock);
 }
 
 /*
@@ -394,6 +431,8 @@
 
 /*
  * Reconnect a broken TCP connection.
+ *
+ * Note: This cannot collide with the TCP reads, as both run from rpciod
  */
 void
 xprt_reconnect(struct rpc_task *task)
@@ -416,15 +455,10 @@
 		return;
 	}
 
-	spin_lock(&xprt_lock);
-	if (xprt->connecting) {
-		task->tk_timeout = 0;
-		rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
-		spin_unlock(&xprt_lock);
+	if (!xprt_lock_write(xprt, task))
 		return;
-	}
-	xprt->connecting = 1;
-	spin_unlock(&xprt_lock);
+	if (xprt_connected(xprt))
+		goto out_write;
 
 	status = -ENOTCONN;
 	if (!inet) {
@@ -439,6 +473,7 @@
 
 	/* Reset TCP record info */
 	xprt->tcp_offset = 0;
+	xprt->tcp_reclen = 0;
 	xprt->tcp_copied = 0;
 	xprt->tcp_more = 0;
 
@@ -467,24 +502,22 @@
 		dprintk("RPC: %4d connect status %d connected %d\n",
 				task->tk_pid, status, xprt_connected(xprt));
 
-		spin_lock_bh(&xprt_sock_lock);
+		spin_lock_bh(&xprt->sock_lock);
 		if (!xprt_connected(xprt)) {
 			task->tk_timeout = xprt->timeout.to_maxval;
-			rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL);
-			spin_unlock_bh(&xprt_sock_lock);
+			rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL);
+			spin_unlock_bh(&xprt->sock_lock);
 			return;
 		}
-		spin_unlock_bh(&xprt_sock_lock);
+		spin_unlock_bh(&xprt->sock_lock);
 	}
 defer:
-	spin_lock(&xprt_lock);
-	xprt->connecting = 0;
 	if (status < 0) {
 		rpc_delay(task, 5*HZ);
 		task->tk_status = -ENOTCONN;
 	}
-	rpc_wake_up(&xprt->reconn);
-	spin_unlock(&xprt_lock);
+ out_write:
+	xprt_release_write(xprt, task);
 }
 
 /*
@@ -499,10 +532,7 @@
 	dprintk("RPC: %4d xprt_reconn_timeout %d\n",
 				task->tk_pid, task->tk_status);
 
-	spin_lock(&xprt_lock);
-	xprt->connecting = 0;
-	rpc_wake_up(&xprt->reconn);
-	spin_unlock(&xprt_lock);
+	xprt_release_write(xprt, task);
 }
 
 /*
@@ -699,10 +729,6 @@
 	struct iovec	riov;
 	int		want, result;
 
-	if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) {
-		xprt->tcp_offset = 0;
-		xprt->tcp_reclen = 0;
-	}
 	if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
 		goto done;
 
@@ -718,10 +744,6 @@
 		want -= result;
 	} while (want);
 
-	/* Is this another fragment in the last message */
-	if (!xprt->tcp_more)
-		xprt->tcp_copied = 0; /* No, so we're reading a new message */
-
 	/* Get the record length and mask out the last fragment bit */
 	xprt->tcp_reclen = ntohl(xprt->tcp_recm);
 	xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
@@ -744,7 +766,7 @@
 
 	if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
 		goto done;
-	want = min(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
+	want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
 	do {
 		dprintk("RPC:      reading xid (%d bytes)\n", want);
 		riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
@@ -771,7 +793,7 @@
 
 	if (req->rq_rlen <= xprt->tcp_copied || !avail)
 		goto done;
-	want = min(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
+	want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
 	do {
 		dprintk("RPC: %4d TCP receiving %d bytes\n",
 			req->rq_task->tk_pid, want);
@@ -805,7 +827,7 @@
 	int		want, result = 0;
 
 	while (avail) {
-		want = min(unsigned int, avail, sizeof(dummy));
+		want = min_t(unsigned int, avail, sizeof(dummy));
 		riov.iov_base = dummy;
 		riov.iov_len  = want;
 		dprintk("RPC:      TCP skipping %d bytes\n", want);
@@ -843,14 +865,15 @@
 
 	/* Read in a new fragment marker if necessary */
 	/* Can we ever really expect to get completely empty fragments? */
-	if ((result = tcp_read_fraghdr(xprt)) <= 0)
+	if ((result = tcp_read_fraghdr(xprt)) < 0)
 		return result;
 	avail = result;
 
 	/* Read in the xid if necessary */
-	if ((result = tcp_read_xid(xprt, avail)) <= 0)
+	if ((result = tcp_read_xid(xprt, avail)) < 0)
 		return result;
-	avail = result;
+	if (!(avail = result))
+		goto out_ok;
 
 	/* Find and lock the request corresponding to this xid */
 	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
@@ -868,9 +891,14 @@
 	if ((result = tcp_read_discard(xprt, avail)) < 0)
 		return result;
 
+ out_ok:
 	dprintk("RPC:      tcp_input_record done (off %d reclen %d copied %d)\n",
 			xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
 	result = xprt->tcp_reclen;
+	xprt->tcp_reclen = 0;
+	xprt->tcp_offset = 0;
+	if (!xprt->tcp_more)
+		xprt->tcp_copied = 0;
 	return result;
 }
 
@@ -885,11 +913,19 @@
 	rpciod_wake_up();
 }
 
+int xprt_tcp_pending(void)
+{
+	int retval;
+
+	spin_lock_bh(&rpc_queue_lock);
+	retval = !list_empty(&rpc_xprt_pending);
+	spin_unlock_bh(&rpc_queue_lock);
+	return retval;
+}
+
 static inline
 void xprt_append_pending(struct rpc_xprt *xprt)
 {
-	if (!list_empty(&xprt->rx_pending))
-		return;
 	spin_lock_bh(&rpc_queue_lock);
 	if (list_empty(&xprt->rx_pending)) {
 		list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
@@ -1003,11 +1039,10 @@
 	case TCP_ESTABLISHED:
 		if (xprt_test_and_set_connected(xprt))
 			break;
-		spin_lock_bh(&xprt_sock_lock);
+		spin_lock(&xprt->sock_lock);
 		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
 			rpc_wake_up_task(xprt->snd_task);
-		rpc_wake_up(&xprt->reconn);
-		spin_unlock_bh(&xprt_sock_lock);
+		spin_unlock(&xprt->sock_lock);
 		break;
 	case TCP_SYN_SENT:
 	case TCP_SYN_RECV:
@@ -1041,10 +1076,10 @@
 		return;
 
 	if (!xprt_test_and_set_wspace(xprt)) {
-		spin_lock_bh(&xprt_sock_lock);
+		spin_lock(&xprt->sock_lock);
 		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
 			rpc_wake_up_task(xprt->snd_task);
-		spin_unlock_bh(&xprt_sock_lock);
+		spin_unlock(&xprt->sock_lock);
 	}
 
 	if (test_bit(SOCK_NOSPACE, &sock->flags)) {
@@ -1067,14 +1102,14 @@
 
 
 	/* Wait until we have enough socket memory */
-	if (sock_wspace(sk) < min(int, sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+	if (sock_wspace(sk) < min_t(int, sk->sndbuf,XPRT_MIN_WRITE_SPACE))
 		return;
 
 	if (!xprt_test_and_set_wspace(xprt)) {
-		spin_lock_bh(&xprt_sock_lock);
+		spin_lock(&xprt->sock_lock);
 		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
 			rpc_wake_up_task(xprt->snd_task);
-		spin_unlock_bh(&xprt_sock_lock);
+		spin_unlock(&xprt->sock_lock);
 	}
 
 	if (sk->sleep && waitqueue_active(sk->sleep))
@@ -1100,55 +1135,6 @@
 	rpc_wake_up_task(task);
 }
 
-
-/*
- * Serialize access to sockets, in order to prevent different
- * requests from interfering with each other.
- */
-static int
-xprt_down_transmit(struct rpc_task *task)
-{
-	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
-	struct rpc_rqst	*req = task->tk_rqstp;
-
-	spin_lock_bh(&xprt_sock_lock);
-	spin_lock(&xprt_lock);
-	if (xprt->snd_task && xprt->snd_task != task) {
-		dprintk("RPC: %4d TCP write queue full (task %d)\n",
-			task->tk_pid, xprt->snd_task->tk_pid);
-		task->tk_timeout = 0;
-		task->tk_status = -EAGAIN;
-		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
-	} else if (!xprt->snd_task) {
-		xprt->snd_task = task;
-#ifdef RPC_PROFILE
-		req->rq_xtime = jiffies;
-#endif
-		req->rq_bytes_sent = 0;
-	}
-	spin_unlock(&xprt_lock);
-	spin_unlock_bh(&xprt_sock_lock);
-	return xprt->snd_task == task;
-}
-
-/*
- * Releases the socket for use by other requests.
- */
-static inline void
-xprt_up_transmit(struct rpc_task *task)
-{
-	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
-
-	if (xprt->snd_task && xprt->snd_task == task) {
-		spin_lock_bh(&xprt_sock_lock);
-		spin_lock(&xprt_lock);
-		xprt->snd_task = NULL;
-		rpc_wake_up_next(&xprt->sending);
-		spin_unlock(&xprt_lock);
-		spin_unlock_bh(&xprt_sock_lock);
-	}
-}
-
 /*
  * Place the actual RPC call.
  * We have to copy the iovec because sendmsg fiddles with its contents.
@@ -1182,9 +1168,12 @@
 		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
 	}
 
-	if (!xprt_down_transmit(task))
+	if (!xprt_lock_write(xprt, task))
 		return;
 
+#ifdef RPC_PROFILE
+	req->rq_xtime = jiffies;
+#endif
 	do_xprt_transmit(task);
 }
 
@@ -1252,12 +1241,12 @@
 	switch (status) {
 	case -ENOMEM:
 		/* Protect against (udp|tcp)_write_space */
-		spin_lock_bh(&xprt_sock_lock);
+		spin_lock_bh(&xprt->sock_lock);
 		if (!xprt_wspace(xprt)) {
 			task->tk_timeout = req->rq_timeout.to_current;
 			rpc_sleep_on(&xprt->sending, task, NULL, NULL);
 		}
-		spin_unlock_bh(&xprt_sock_lock);
+		spin_unlock_bh(&xprt->sock_lock);
 		return;
 	case -EAGAIN:
 		/* Keep holding the socket if it is blocked */
@@ -1268,6 +1257,9 @@
 		if (!xprt->stream)
 			return;
 	default:
+		if (xprt->stream)
+			xprt_disconnect(xprt);
+		req->rq_bytes_sent = 0;
 		goto out_release;
 	}
 
@@ -1278,7 +1270,7 @@
 	rpc_add_timer(task, xprt_timer);
 	rpc_unlock_task(task);
  out_release:
-	xprt_up_transmit(task);
+	xprt_release_write(xprt, task);
 }
 
 /*
@@ -1313,7 +1305,7 @@
 
 	dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
 				task->tk_pid, xprt->cong, xprt->cwnd);
-	spin_lock_bh(&xprt_sock_lock);
+	spin_lock_bh(&xprt->xprt_lock);
 	xprt_reserve_status(task);
 	if (task->tk_rqstp) {
 		task->tk_timeout = 0;
@@ -1324,7 +1316,7 @@
 		task->tk_status = -EAGAIN;
 		rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
 	}
-	spin_unlock_bh(&xprt_sock_lock);
+	spin_unlock_bh(&xprt->xprt_lock);
 	dprintk("RPC: %4d xprt_reserve returns %d\n",
 				task->tk_pid, task->tk_status);
 	return task->tk_status;
@@ -1397,7 +1389,11 @@
 	struct rpc_xprt	*xprt = task->tk_xprt;
 	struct rpc_rqst	*req;
 
-	xprt_up_transmit(task);
+	if (xprt->snd_task == task) {
+		if (xprt->stream)
+			xprt_disconnect(xprt);
+		xprt_release_write(xprt, task);
+	}
 	if (!(req = task->tk_rqstp))
 		return;
 	task->tk_rqstp = NULL;
@@ -1411,7 +1407,7 @@
 		rpc_remove_wait_queue(task);
 	}
 
-	spin_lock_bh(&xprt_sock_lock);
+	spin_lock_bh(&xprt->xprt_lock);
 	req->rq_next = xprt->free;
 	xprt->free   = req;
 
@@ -1419,7 +1415,7 @@
 	xprt->cong -= RPC_CWNDSCALE;
 
 	xprt_clear_backlog(xprt);
-	spin_unlock_bh(&xprt_sock_lock);
+	spin_unlock_bh(&xprt->xprt_lock);
 }
 
 /*
@@ -1476,6 +1472,8 @@
 	} else
 		xprt->cwnd = RPC_INITCWND;
 	xprt->congtime = jiffies;
+	spin_lock_init(&xprt->sock_lock);
+	spin_lock_init(&xprt->xprt_lock);
 	init_waitqueue_head(&xprt->cong_wait);
 
 	/* Set timeout parameters */
@@ -1489,7 +1487,6 @@
 	xprt->pending = RPC_INIT_WAITQ("xprt_pending");
 	xprt->sending = RPC_INIT_WAITQ("xprt_sending");
 	xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
-	xprt->reconn  = RPC_INIT_WAITQ("xprt_reconn");
 
 	/* initialize free list */
 	for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
@@ -1625,7 +1622,6 @@
 	rpc_wake_up(&xprt->sending);
 	rpc_wake_up(&xprt->pending);
 	rpc_wake_up(&xprt->backlog);
-	rpc_wake_up(&xprt->reconn);
 	if (waitqueue_active(&xprt->cong_wait))
 		wake_up(&xprt->cong_wait);
 }

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)