patch-2.3.22 linux/net/sunrpc/xprt.c

Next file: linux/net/unix/af_unix.c
Previous file: linux/net/sunrpc/sched.c
Back to the patch index
Back to the overall index

diff -u --recursive --new-file v2.3.21/linux/net/sunrpc/xprt.c linux/net/sunrpc/xprt.c
@@ -31,12 +31,16 @@
  *  primitives that `transparently' work for processes as well as async
  *  tasks that rely on callbacks.
  *
- *  Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
+ *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  *
  *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
  *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
  *  TCP NFS related read + write fixes
  *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
+ *
+ *  Rewrite of larges part of the code in order to stabilize TCP stuff.
+ *  Fix behaviour when socket buffer is full.
+ *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  */
 
 #define __KERNEL_SYSCALLS__
@@ -62,6 +66,8 @@
 #include <asm/uaccess.h>
 
 #define SOCK_HAS_USER_DATA
+/* Following value should be > 32k + RPC overhead */
+#define XPRT_MIN_WRITE_SPACE 35000
 
 /*
  * Local variables
@@ -70,6 +76,9 @@
 static struct rpc_xprt *	sock_list = NULL;
 #endif
 
+/* Spinlock for critical sections in the code. */
+spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
+
 #ifdef RPC_DEBUG
 # undef  RPC_DEBUG_DATA
 # define RPCDBG_FACILITY	RPCDBG_XPRT
@@ -84,11 +93,13 @@
  * Local functions
  */
 static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
+static void	do_xprt_transmit(struct rpc_task *);
 static void	xprt_transmit_status(struct rpc_task *task);
+static void	xprt_transmit_timeout(struct rpc_task *task);
 static void	xprt_receive_status(struct rpc_task *task);
 static void	xprt_reserve_status(struct rpc_task *task);
+static void	xprt_disconnect(struct rpc_xprt *);
 static void	xprt_reconn_timeout(struct rpc_task *task);
-static void	xprt_reconn_status(struct rpc_task *task);
 static struct socket *xprt_create_socket(int, struct sockaddr_in *,
 					struct rpc_timeout *);
 
@@ -144,39 +155,35 @@
  *	Adjust the iovec to move on 'n' bytes
  */
  
-extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
+extern inline void
+xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
 {
 	struct iovec *iv=msg->msg_iov;
+	int i;
 	
 	/*
 	 *	Eat any sent iovecs
 	 */
-
-	while(iv->iov_len < amount)
-	{
-		amount-=iv->iov_len;
+	while(iv->iov_len <= amount) {
+		amount -= iv->iov_len;
 		iv++;
 		msg->msg_iovlen--;
 	}
-	
-	msg->msg_iov=niv;
-	
+
 	/*
 	 *	And chew down the partial one
 	 */
-
 	niv[0].iov_len = iv->iov_len-amount;
 	niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
 	iv++;
-	
+
 	/*
 	 *	And copy any others
 	 */
-	 
-	for(amount=1;amount<msg->msg_iovlen; amount++)
-	{
-		niv[amount]=*iv++;
-	}
+	for(i = 1; i < msg->msg_iovlen; i++)
+		niv[i]=*iv++;
+
+	msg->msg_iov=niv;
 }
  
 /*
@@ -184,43 +191,42 @@
  */
 
 static inline int
-xprt_sendmsg(struct rpc_xprt *xprt)
+xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
 	struct socket	*sock = xprt->sock;
 	struct msghdr	msg;
 	mm_segment_t	oldfs;
 	int		result;
+	int		slen = req->rq_slen - req->rq_bytes_sent;
 	struct iovec	niv[MAX_IOVEC];
 
+	if (slen == 0)
+		return 0;
+
 	xprt_pktdump("packet data:",
-				xprt->snd_buf.io_vec->iov_base,
-				xprt->snd_buf.io_vec->iov_len);
+				req->rq_svec->iov_base,
+				req->rq_svec->iov_len);
 
 	msg.msg_flags   = MSG_DONTWAIT;
-	msg.msg_iov	= xprt->snd_buf.io_vec;
-	msg.msg_iovlen	= xprt->snd_buf.io_nr;
+	msg.msg_iov	= req->rq_svec;
+	msg.msg_iovlen	= req->rq_snr;
 	msg.msg_name	= (struct sockaddr *) &xprt->addr;
 	msg.msg_namelen = sizeof(xprt->addr);
 	msg.msg_control = NULL;
 	msg.msg_controllen = 0;
 
 	/* Dont repeat bytes */
-	
-	if(xprt->snd_sent)
-		xprt_move_iov(&msg, niv, xprt->snd_sent);
-		
+	if (req->rq_bytes_sent)
+		xprt_move_iov(&msg, niv, req->rq_bytes_sent);
+
 	oldfs = get_fs(); set_fs(get_ds());
-	result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
+	result = sock_sendmsg(sock, &msg, slen);
 	set_fs(oldfs);
 
-	dprintk("RPC:      xprt_sendmsg(%d) = %d\n",
-				xprt->snd_buf.io_len, result);
+	dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
 
-	if (result >= 0) {
-		xprt->snd_buf.io_len -= result;
-		xprt->snd_sent += result;
+	if (result >= 0)
 		return result;
-	}
 
 	switch (result) {
 	case -ECONNREFUSED:
@@ -229,9 +235,14 @@
 		 */
 		break;
 	case -EAGAIN:
-		return 0;
-	case -ENOTCONN: case -EPIPE:
+		if (sock->flags & SO_NOSPACE)
+			result = -ENOMEM;
+		break;
+	case -ENOTCONN:
+	case -EPIPE:
 		/* connection broken */
+		if (xprt->stream)
+			result = -ENOTCONN;
 		break;
 	default:
 		printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
@@ -252,7 +263,6 @@
 	mm_segment_t	oldfs;
 	int		result;
 
-#if LINUX_VERSION_CODE >= 0x020100
 	msg.msg_flags   = MSG_DONTWAIT;
 	msg.msg_iov	= iov;
 	msg.msg_iovlen	= nr;
@@ -264,20 +274,6 @@
 	oldfs = get_fs(); set_fs(get_ds());
 	result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
 	set_fs(oldfs);
-#else
-	int		alen = sizeof(sin);
-	msg.msg_flags   = 0;
-	msg.msg_iov	= iov;
-	msg.msg_iovlen	= nr;
-	msg.msg_name	= &sin;
-	msg.msg_namelen = sizeof(sin);
-	msg.msg_control = NULL;
-	msg.msg_controllen = 0;
-
-	oldfs = get_fs(); set_fs(get_ds());
-	result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
-	set_fs(oldfs);
-#endif
 
 	dprintk("RPC:      xprt_recvmsg(iov %p, len %d) = %d\n",
 						iov, len, result);
@@ -354,13 +350,15 @@
 {
 	struct sock	*sk = xprt->inet;
 
+	xprt_disconnect(xprt);
+
 #ifdef SOCK_HAS_USER_DATA
 	sk->user_data    = NULL;
 #endif
 	sk->data_ready   = xprt->old_data_ready;
-	sk->no_check 	 = 0;
 	sk->state_change = xprt->old_state_change;
 	sk->write_space  = xprt->old_write_space;
+	sk->no_check	 = 0;
 
 	sock_release(xprt->sock);
 	/*
@@ -378,9 +376,16 @@
 xprt_disconnect(struct rpc_xprt *xprt)
 {
 	dprintk("RPC:      disconnected transport %p\n", xprt);
+	xprt->connected = 0;
+	xprt->tcp_offset = 0;
+	xprt->tcp_more = 0;
+	xprt->tcp_total = 0;
+	xprt->tcp_reclen = 0;
+	xprt->tcp_copied = 0;
+	xprt->tcp_rqstp  = NULL;
+	xprt->rx_pending_flag = 0;
 	rpc_wake_up_status(&xprt->pending, -ENOTCONN);
 	rpc_wake_up_status(&xprt->sending, -ENOTCONN);
-	xprt->connected = 0;
 }
 
 /*
@@ -398,22 +403,33 @@
 				task->tk_pid, xprt, xprt->connected);
 	task->tk_status = 0;
 
+	if (xprt->shutdown)
+		return;
+
+	if (!xprt->stream)
+		return;
+
+	start_bh_atomic();
+	if (xprt->connected) {
+		end_bh_atomic();
+		return;
+	}
 	if (xprt->connecting) {
 		task->tk_timeout = xprt->timeout.to_maxval;
 		rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
+		end_bh_atomic();
 		return;
 	}
 	xprt->connecting = 1;
+	end_bh_atomic();
 
 	/* Create an unconnected socket */
-	if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
+	if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) {
+		xprt->connecting = 0;
 		goto defer;
+	}
 
-#if LINUX_VERSION_CODE >= 0x020100
 	inet = sock->sk;
-#else
-	inet = (struct sock *) sock->data;
-#endif
 	inet->data_ready   = xprt->inet->data_ready;
 	inet->state_change = xprt->inet->state_change;
 	inet->write_space  = xprt->inet->write_space;
@@ -422,18 +438,18 @@
 #endif
 
 	dprintk("RPC: %4d closing old socket\n", task->tk_pid);
-	xprt_disconnect(xprt);
 	xprt_close(xprt);
 
-	/* Reset to new socket and default congestion */
+	/* Reset to new socket */
 	xprt->sock = sock;
 	xprt->inet = inet;
-	xprt->cwnd = RPC_INITCWND;
 
 	/* Now connect it asynchronously. */
 	dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
 	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
 				sizeof(xprt->addr), O_NONBLOCK);
+
+	xprt->connecting = 0;
 	if (status < 0) {
 		if (status != -EINPROGRESS && status != -EALREADY) {
 			printk("RPC: TCP connect error %d!\n", -status);
@@ -447,37 +463,19 @@
 		start_bh_atomic();
 		if (!xprt->connected) {
 			rpc_sleep_on(&xprt->reconn, task,
-				xprt_reconn_status, xprt_reconn_timeout);
+				NULL, xprt_reconn_timeout);
 			end_bh_atomic();
 			return;
 		}
 		end_bh_atomic();
 	}
 
-	xprt->connecting = 0;
-	rpc_wake_up(&xprt->reconn);
-	return;
 
 defer:
-	task->tk_timeout = 30 * HZ;
-	rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
-	xprt->connecting = 0;
-}
-
-/*
- * Reconnect status
- */
-static void
-xprt_reconn_status(struct rpc_task *task)
-{
-	struct rpc_xprt	*xprt = task->tk_xprt;
-
-	dprintk("RPC: %4d xprt_reconn_status %d\n",
-				task->tk_pid, task->tk_status);
-	if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
-		task->tk_timeout = 30 * HZ;
-		rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
-	}
+	start_bh_atomic();
+	if (!xprt->connected)
+		rpc_wake_up_next(&xprt->reconn);
+	end_bh_atomic();
 }
 
 /*
@@ -490,11 +488,19 @@
 	dprintk("RPC: %4d xprt_reconn_timeout %d\n",
 				task->tk_pid, task->tk_status);
 	task->tk_status = -ENOTCONN;
-	task->tk_xprt->connecting = 0;
+	start_bh_atomic();
+	if (task->tk_xprt->connecting)
+		task->tk_xprt->connecting = 0;
+	if (!task->tk_xprt->connected)
+		task->tk_status = -ENOTCONN;
+	else
+		task->tk_status = -ETIMEDOUT;
+	end_bh_atomic();
 	task->tk_timeout = 0;
 	rpc_wake_up_task(task);
 }
 
+extern spinlock_t rpc_queue_lock;
 /*
  * Look up the RPC request corresponding to a reply.
  */
@@ -503,22 +509,28 @@
 {
 	struct rpc_task	*head, *task;
 	struct rpc_rqst	*req;
+	unsigned long	oldflags;
 	int		safe = 0;
 
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	if ((head = xprt->pending.task) != NULL) {
 		task = head;
 		do {
 			if ((req = task->tk_rqstp) && req->rq_xid == xid)
-				return req;
+				goto out;
 			task = task->tk_next;
 			if (++safe > 100) {
 				printk("xprt_lookup_rqst: loop in Q!\n");
-				return NULL;
+				goto out_bad;
 			}
 		} while (task != head);
 	}
 	dprintk("RPC:      unknown XID %08x in reply.\n", xid);
-	return NULL;
+ out_bad:
+	req = NULL;
+ out:
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+	return req;
 }
 
 /*
@@ -559,11 +571,13 @@
 	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
 	task->tk_status = copied;
 
-	rpc_wake_up_task(task);
+	if (!RPC_IS_RUNNING(task))
+		rpc_wake_up_task(task);
 	return;
 }
 
-/* We have set things up such that we perform the checksum of the UDP
+/*
+ * We have set things up such that we perform the checksum of the UDP
  * packet in parallel with the copies into the RPC client iovec.  -DaveM
  */
 static int csum_partial_copy_to_page_cache(struct iovec *iov,
@@ -609,7 +623,8 @@
 	return 0;
 }
 
-/* Input handler for RPC replies. Called from a bottom half and hence
+/*
+ * Input handler for RPC replies. Called from a bottom half and hence
  * atomic.
  */
 static inline void
@@ -621,12 +636,15 @@
 	int		err, repsize, copied;
 
 	dprintk("RPC:      udp_data_ready...\n");
-	if (!(xprt = xprt_from_sock(sk)))
+	if (!(xprt = xprt_from_sock(sk))) {
+		printk("RPC:      udp_data_ready request not found!\n");
 		return;
+	}
+
 	dprintk("RPC:      udp_data_ready client %p\n", xprt);
 
 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
-		return;
+		goto out_err;
 
 	repsize = skb->len - sizeof(struct udphdr);
 	if (repsize < 4) {
@@ -646,6 +664,7 @@
 	if ((copied = rovr->rq_rlen) > repsize)
 		copied = repsize;
 
+	rovr->rq_damaged  = 1;
 	/* Suck it into the iovec, verify checksum if not done by hw. */
 	if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
 		goto dropit;
@@ -658,6 +677,8 @@
 dropit:
 	skb_free_datagram(sk, skb);
 	return;
+out_err:
+	return;
 }
 
 /*
@@ -679,6 +700,7 @@
 	int		result, maxcpy, reclen, avail, want;
 
 	dprintk("RPC:      tcp_input_record\n");
+
 	offset = xprt->tcp_offset;
 	result = -EAGAIN;
 	if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
@@ -687,11 +709,6 @@
 		riov.iov_base = xprt->tcp_recm.data + offset;
 		riov.iov_len  = want;
 		result = xprt_recvmsg(xprt, &riov, 1, want);
-		if (!result)
-		{
-			dprintk("RPC: empty TCP record.\n");
-			return -ENOTCONN;
-		}
 		if (result < 0)
 			goto done;
 		offset += result;
@@ -733,9 +750,9 @@
 
 		dprintk("RPC: %4d TCP receiving %d bytes\n",
 					req->rq_task->tk_pid, want);
+		/* Request must be re-encoded before retransmit */
+		req->rq_damaged = 1;
 		result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
-		if (!result && want)
-			result = -EAGAIN;
 		if (result < 0)
 			goto done;
 		xprt->tcp_copied += result;
@@ -754,12 +771,10 @@
 			xprt->tcp_copied = 0;
 			xprt->tcp_rqstp  = NULL;
 		}
-		/* Request must be re-encoded before retransmit */
-		req->rq_damaged = 1;
 	}
 
 	/* Skip over any trailing bytes on short reads */
-	while (avail) {
+	while (avail > 0) {
 		static u8	dummy[64];
 
 		want = MIN(avail, sizeof(dummy));
@@ -767,8 +782,6 @@
 		riov.iov_len  = want;
 		dprintk("RPC:      TCP skipping %d bytes\n", want);
 		result = xprt_recvmsg(xprt, &riov, 1, want);
-		if (!result && want)
-			result=-EAGAIN;
 		if (result < 0)
 			goto done;
 		offset += result;
@@ -789,55 +802,40 @@
 	return result;
 }
 
-static __inline__ void tcp_output_record(struct rpc_xprt *xprt)
-{
-	if(xprt->snd_sent && xprt->snd_task)
-		dprintk("RPC: write space\n");
-	if(xprt->write_space == 0)
-	{
-		xprt->write_space = 1;
-		if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
-		{
-			if(xprt->snd_sent)
-				dprintk("RPC: Write wakeup snd_sent =%d\n",
-					xprt->snd_sent);
-			rpc_wake_up_task(xprt->snd_task);			
-		}
-	}
-}
-
 /*
  *	TCP task queue stuff
  */
  
-static struct rpc_xprt *rpc_rx_xprt_pending = NULL;	/* Chain by rx_pending of rpc_xprt's */
-static struct rpc_xprt *rpc_tx_xprt_pending = NULL;	/* Chain by tx_pending of rpc_xprt's */
+static struct rpc_xprt *rpc_xprt_pending = NULL;	/* Chain by rx_pending of rpc_xprt's */
 
 /*
  *	This is protected from tcp_data_ready and the stack as its run
  *	inside of the RPC I/O daemon
  */
-
-void rpciod_tcp_dispatcher(void)
+static void
+do_rpciod_tcp_dispatcher(void)
 {
 	struct rpc_xprt *xprt;
 	int result;
 
 	dprintk("rpciod_tcp_dispatcher: Queue Running\n");
-	
+
 	/*
 	 *	Empty each pending socket
 	 */
-	 
-	while((xprt=rpc_rx_xprt_pending)!=NULL)
-	{
+ 
+	while(1) {
 		int safe_retry=0;
-		
-		rpc_rx_xprt_pending=xprt->rx_pending;
-		xprt->rx_pending_flag=0;
-		
+
+		if ((xprt = rpc_xprt_pending) == NULL) {
+			break;
+		}
+		xprt->rx_pending_flag = 0;
+		rpc_xprt_pending=xprt->rx_pending;
+		xprt->rx_pending = NULL;
+
 		dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
-		
+
 		do 
 		{
 			if (safe_retry++ > 50)
@@ -845,28 +843,30 @@
 			result = tcp_input_record(xprt);
 		}
 		while (result >= 0);
-	
+
 		switch (result) {
 			case -EAGAIN:
-				continue;
 			case -ENOTCONN:
 			case -EPIPE:
-				xprt_disconnect(xprt);
 				continue;
 			default:
 				printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
 					result);
 		}
 	}
+}
 
-	while((xprt=rpc_tx_xprt_pending)!=NULL)
-	{
-		rpc_tx_xprt_pending = xprt->tx_pending;
-		xprt->tx_pending_flag = 0;
-		tcp_output_record(xprt);
-	}
+void rpciod_tcp_dispatcher(void)
+{
+	start_bh_atomic();
+	do_rpciod_tcp_dispatcher();
+	end_bh_atomic();
 }
 
+int xprt_tcp_pending(void)
+{
+	return rpc_xprt_pending != NULL;
+}
 
 extern inline void tcp_rpciod_queue(void)
 {
@@ -890,6 +890,7 @@
 		printk("Not a socket with xprt %p\n", sk);
 		return;
 	}
+
 	dprintk("RPC:      tcp_data_ready client %p\n", xprt);
 	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
 				sk->state, xprt->connected,
@@ -898,24 +899,16 @@
 	 *	If we are not waiting for the RPC bh run then
 	 *	we are now
 	 */
-	if (!xprt->rx_pending_flag)
-	{
-		int start_queue=0;
+	if (!xprt->rx_pending_flag) {
+		dprintk("RPC:     xprt queue %p\n", rpc_xprt_pending);
 
-		dprintk("RPC:     xprt queue %p\n", rpc_rx_xprt_pending);
-		if(rpc_rx_xprt_pending==NULL)
-			start_queue=1;
+		xprt->rx_pending=rpc_xprt_pending;
+		rpc_xprt_pending=xprt;
 		xprt->rx_pending_flag=1;
-		xprt->rx_pending=rpc_rx_xprt_pending;
-		rpc_rx_xprt_pending=xprt;
-		if (start_queue)
-		  {
-		    tcp_rpciod_queue();
-		    start_queue=0;
-		  }
-	}
-	else
+	} else
 		dprintk("RPC:     xprt queued already %p\n", xprt);
+	tcp_rpciod_queue();
+
 }
 
 
@@ -931,17 +924,32 @@
 				sk->state, xprt->connected,
 				sk->dead, sk->zapped);
 
-	if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
+	switch(sk->state) {
+	case TCP_ESTABLISHED:
+		if (xprt->connected)
+			break;
 		xprt->connected = 1;
 		xprt->connecting = 0;
 		rpc_wake_up(&xprt->reconn);
-	} else if (sk->zapped) {
-		rpc_wake_up_status(&xprt->pending, -ENOTCONN);
-		rpc_wake_up_status(&xprt->sending, -ENOTCONN);
+		rpc_wake_up_next(&xprt->sending);
+		tcp_rpciod_queue();
+		break;
+	case TCP_CLOSE:
+		if (xprt->connecting)
+			break;
+		xprt_disconnect(xprt);
 		rpc_wake_up_status(&xprt->reconn,  -ENOTCONN);
+		break;
+	default:
+		break;
 	}
+
 }
 
+/*
+ * The following 2 routines allow a task to sleep while socket memory is
+ * low.
+ */
 static void
 tcp_write_space(struct sock *sk)
 {
@@ -949,17 +957,43 @@
 
 	if (!(xprt = xprt_from_sock(sk)))
 		return;
-	if (!xprt->tx_pending_flag) {
-		int start_queue = 0;
 
-		if (rpc_tx_xprt_pending == NULL)
-			start_queue = 1;
-		xprt->tx_pending_flag = 1;
-		xprt->tx_pending = rpc_tx_xprt_pending;
-		rpc_tx_xprt_pending = xprt;
-		if (start_queue)
-			tcp_rpciod_queue();
-	}
+	/* Wait until we have enough socket memory */
+	if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+		return;
+
+	if (xprt->write_space)
+		return;
+
+	xprt->write_space = 1;
+
+	if (!xprt->snd_task)
+		rpc_wake_up_next(&xprt->sending);
+	else if (!RPC_IS_RUNNING(xprt->snd_task))
+		rpc_wake_up_task(xprt->snd_task);
+}
+
+static void
+udp_write_space(struct sock *sk)
+{
+	struct rpc_xprt *xprt;
+
+	if (!(xprt = xprt_from_sock(sk)))
+		return;
+
+
+	/* Wait until we have enough socket memory */
+	if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+		return;
+
+	if (xprt->write_space)
+		return;
+
+	xprt->write_space = 1;
+	if (!xprt->snd_task)
+		rpc_wake_up_next(&xprt->sending);
+	else if (!RPC_IS_RUNNING(xprt->snd_task))
+		rpc_wake_up_task(xprt->snd_task);
 }
 
 /*
@@ -982,32 +1016,50 @@
 	rpc_wake_up_task(task);
 }
 
+
 /*
- * (Partly) transmit the RPC packet
- * Note that task->tk_status is either 0 or negative on return.
- * Only when the reply is received will the status be set to a
- * positive value.
+ * Serialize access to sockets, in order to prevent different
+ * requests from interfering with each other.
  */
-static inline int
-xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
+static int
+xprt_down_transmit(struct rpc_task *task)
 {
+	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 	struct rpc_rqst	*req = task->tk_rqstp;
-	int		result;
 
-	task->tk_status = 0;
-	if ((result = xprt_sendmsg(xprt)) >= 0) {
-		if (!xprt->snd_buf.io_len || !xprt->stream) {
-			rpc_wake_up_next(&xprt->sending);
-			return req->rq_slen;
-		}
-		result = -EAGAIN;
-	} else if (xprt->stream) {
-		if (result == -ENOTCONN || result == -EPIPE) {
-			xprt_disconnect(xprt);
-			result = -ENOTCONN;
-		}
+	start_bh_atomic();
+	spin_lock(&xprt_lock);
+	if (xprt->snd_task && xprt->snd_task != task) {
+		dprintk("RPC: %4d TCP write queue full (task %d)\n",
+			task->tk_pid, xprt->snd_task->tk_pid);
+		task->tk_timeout = req->rq_timeout.to_current;
+		rpc_sleep_on(&xprt->sending, task, xprt_transmit, NULL);
+	} else if (!xprt->snd_task) {
+		xprt->snd_task = task;
+#ifdef RPC_PROFILE
+		req->rq_xtime = jiffies;
+#endif
+		req->rq_bytes_sent = 0;
+	}
+	spin_unlock(&xprt_lock);
+	end_bh_atomic();
+	return xprt->snd_task == task;
+}
+
+/*
+ * Releases the socket for use by other requests.
+ */
+static void
+xprt_up_transmit(struct rpc_task *task)
+{
+	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
+
+	if (xprt->snd_task && xprt->snd_task == task) {
+		start_bh_atomic();
+		xprt->snd_task = NULL;
+		rpc_wake_up_next(&xprt->sending);
+		end_bh_atomic();
 	}
-	return task->tk_status = result;
 }
 
 /*
@@ -1020,71 +1072,65 @@
 	struct rpc_timeout *timeo;
 	struct rpc_rqst	*req = task->tk_rqstp;
 	struct rpc_xprt	*xprt = req->rq_xprt;
-	int status;
 
 	dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, 
 				*(u32 *)(req->rq_svec[0].iov_base));
 
-	if (xprt->shutdown) {
+	if (xprt->shutdown)
 		task->tk_status = -EIO;
+
+	if (task->tk_status < 0)
 		return;
+
+	/* Reset timeout parameters */
+	timeo = &req->rq_timeout;
+	if (timeo->to_retries < 0) {
+		dprintk("RPC: %4d xprt_transmit reset timeo\n",
+					task->tk_pid);
+		timeo->to_retries = xprt->timeout.to_retries;
+		timeo->to_current = timeo->to_initval;
 	}
 
-	/* If we're not already in the process of transmitting our call,
-	 * set up everything as needed. */
-	if (xprt->snd_task != task) {
-		/* Write the record marker */
-		if (xprt->stream) {
-			u32	marker;
+	/* set up everything as needed. */
+	/* Write the record marker */
+	if (xprt->stream) {
+		u32	marker;
 
-			if (!xprt->connected) {
-				task->tk_status = -ENOTCONN;
-				return;
-			}
-			marker = htonl(0x80000000|(req->rq_slen-4));
-			*((u32 *) req->rq_svec[0].iov_base) = marker;
-		}
+		marker = htonl(0x80000000|(req->rq_slen-4));
+		*((u32 *) req->rq_svec[0].iov_base) = marker;
 
-		/* Reset timeout parameters */
-		timeo = &req->rq_timeout;
-		if (timeo->to_retries < 0) {
-			dprintk("RPC: %4d xprt_transmit reset timeo\n",
-						task->tk_pid);
-			timeo->to_retries = xprt->timeout.to_retries;
-			timeo->to_current = timeo->to_initval;
-		}
+	}
 
-#ifdef RPC_PROFILE
-		req->rq_xtime = jiffies;
-#endif
-		req->rq_gotit = 0;
+	if (!xprt_down_transmit(task))
+		return;
 
-		if (xprt->snd_task) {
-			dprintk("RPC: %4d TCP write queue full (task %d)\n",
-					task->tk_pid, xprt->snd_task->tk_pid);
-			rpc_sleep_on(&xprt->sending, task,
-					xprt_transmit_status, NULL);
-			return;
-		}
-		xprt->snd_buf  = req->rq_snd_buf;
-		xprt->snd_task = task;
-		xprt->snd_sent = 0;
+	do_xprt_transmit(task);
+}
+
+static void
+do_xprt_transmit(struct rpc_task *task)
+{
+	struct rpc_rqst	*req = task->tk_rqstp;
+	struct rpc_xprt	*xprt = req->rq_xprt;
+	int status, retry = 0;
+
+	if (xprt->shutdown) {
+		task->tk_status = -EIO;
+		goto out_release;
 	}
 
 	/* For fast networks/servers we have to put the request on
 	 * the pending list now:
 	 */
-	start_bh_atomic();
+	req->rq_gotit = 0;
 	status = rpc_add_wait_queue(&xprt->pending, task);
 	if (!status)
 		task->tk_callback = NULL;
-	end_bh_atomic();
 
-	if (status)
-	{
+	if (status) {
 		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
 		task->tk_status = status;
-		return;
+		goto out_release;
 	}
 
 	/* Continue transmitting the packet/record. We must be careful
@@ -1093,27 +1139,55 @@
 	 */
 	while (1) {
 		xprt->write_space = 0;
-		if (xprt_transmit_some(xprt, task) != -EAGAIN) {
+		status = xprt_sendmsg(xprt, req);
+
+		if (status < 0)
+			break;
+
+		if (xprt->stream) {
+			req->rq_bytes_sent += status;
+
+			if (req->rq_bytes_sent >= req->rq_slen)
+				goto out_release;
+		}
+
+		if (status < req->rq_slen)
+			status = -EAGAIN;
+
+		if (status >= 0 || !xprt->stream) {
 			dprintk("RPC: %4d xmit complete\n", task->tk_pid);
-			xprt->snd_task = NULL;
-			return;
+			goto out_release;
 		}
 
-		/*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
-				task->tk_pid, xprt->snd_buf.io_len,
+		dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
+				task->tk_pid, req->rq_slen - req->rq_bytes_sent,
 				req->rq_slen);
-		task->tk_status = 0;
-		start_bh_atomic();
-		if (!xprt->write_space) {
-			/* Remove from pending */
-			rpc_remove_wait_queue(task);
-			rpc_sleep_on(&xprt->sending, task,
-					xprt_transmit_status, NULL);
-			end_bh_atomic();
-			return;
-		}
+
+		if (retry++ > 50)
+			break;
+	}
+
+	task->tk_status = (status == -ENOMEM) ? -EAGAIN : status;
+
+	/* We don't care if we got a reply, so don't protect
+	 * against bh. */
+	if (task->tk_rpcwait == &xprt->pending)
+		rpc_remove_wait_queue(task);
+
+	/* Protect against (udp|tcp)_write_space */
+	start_bh_atomic();
+	if (status == -ENOMEM || status == -EAGAIN) {
+		task->tk_timeout = req->rq_timeout.to_current;
+		if (!xprt->write_space)
+			rpc_sleep_on(&xprt->sending, task, xprt_transmit_status,
+				     xprt_transmit_timeout);
 		end_bh_atomic();
+		return;
 	}
+	end_bh_atomic();
+
+out_release:
+	xprt_up_transmit(task);
 }
 
 /*
@@ -1126,19 +1200,27 @@
 	struct rpc_xprt	*xprt = task->tk_client->cl_xprt;
 
 	dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
-	if (xprt->snd_task == task) 
-	{
-		if (task->tk_status < 0)
-		{
-			xprt->snd_task = NULL;
-			xprt_disconnect(xprt);
-		}
-		else
-			xprt_transmit(task);
+	if (xprt->snd_task == task) {
+		task->tk_status = 0;
+		do_xprt_transmit(task);
+		return;
 	}
 }
 
 /*
+ * RPC transmit timeout handler.
+ */
+static void
+xprt_transmit_timeout(struct rpc_task *task)
+{
+	dprintk("RPC: %4d transmit_timeout %d\n", task->tk_pid, task->tk_status);
+	task->tk_status  = -ETIMEDOUT;
+	task->tk_timeout = 0;
+	rpc_wake_up_task(task);
+	xprt_up_transmit(task);
+}
+
+/*
  * Wait for the reply to our call.
  * When the callback is invoked, the congestion window should have
  * been updated already.
@@ -1150,25 +1232,33 @@
 	struct rpc_xprt	*xprt = req->rq_xprt;
 
 	dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
-	if (xprt->connected == 0) {
-		task->tk_status = -ENOTCONN;
-		return;
-	}
 
 	/*
-	 * Wait until rq_gotit goes non-null, or timeout elapsed.
+         * Wait until rq_gotit goes non-null, or timeout elapsed.
 	 */
 	task->tk_timeout = req->rq_timeout.to_current;
 
 	start_bh_atomic();
+	if (task->tk_rpcwait)
+		rpc_remove_wait_queue(task);
+
+	if (task->tk_status < 0 || xprt->shutdown) {
+		end_bh_atomic();
+		goto out;
+	}
+
 	if (!req->rq_gotit) {
 		rpc_sleep_on(&xprt->pending, task,
 				xprt_receive_status, xprt_timer);
+		end_bh_atomic();
+		return;
 	}
 	end_bh_atomic();
 
 	dprintk("RPC: %4d xprt_receive returns %d\n",
 				task->tk_pid, task->tk_status);
+ out:
+	xprt_receive_status(task);
 }
 
 static void
@@ -1176,8 +1266,9 @@
 {
 	struct rpc_xprt	*xprt = task->tk_xprt;
 
-	if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
+	if (xprt->tcp_rqstp == task->tk_rqstp)
 		xprt->tcp_rqstp = NULL;
+
 }
 
 /*
@@ -1194,7 +1285,7 @@
 
 	dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
 				task->tk_pid, xprt->cong, xprt->cwnd);
-	if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
+	if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
 		xprt_reserve_status(task);
 		task->tk_timeout = 0;
 	} else if (!task->tk_timeout) {
@@ -1223,40 +1314,30 @@
 		/* NOP */
 	} else if (task->tk_rqstp) {
 		/* We've already been given a request slot: NOP */
-	} else if (!RPCXPRT_CONGESTED(xprt)) {
+	} else if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
 		/* OK: There's room for us. Grab a free slot and bump
 		 * congestion value */
-		req = xprt->free;
-		if (!req)
-			goto bad_list;
-		if (req->rq_xid)
-			goto bad_used;
+		spin_lock(&xprt_lock);
+		if (!(req = xprt->free)) {
+			spin_unlock(&xprt_lock);
+			goto out_nofree;
+		}
 		xprt->free     = req->rq_next;
+		req->rq_next   = NULL;
+		spin_unlock(&xprt_lock);
 		xprt->cong    += RPC_CWNDSCALE;
 		task->tk_rqstp = req;
-		req->rq_next   = NULL;
 		xprt_request_init(task, xprt);
-	} else {
-		task->tk_status = -EAGAIN;
-	}
 
-	if (xprt->free && !RPCXPRT_CONGESTED(xprt))
-		rpc_wake_up_next(&xprt->backlog);
+		if (xprt->free)
+			xprt_clear_backlog(xprt);
+	} else
+		goto out_nofree;
 
 	return;
 
-bad_list:
-	printk(KERN_ERR 
-		"RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
-		task->tk_pid, xprt->cong, xprt->cwnd);
-	rpc_debug = ~0;
-	goto bummer;
-bad_used:
-	printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req);
-bummer:
-	task->tk_status = -EIO;
-	xprt->free = NULL;
-	return;
+out_nofree:
+	task->tk_status = -EAGAIN;
 }
 
 /*
@@ -1298,13 +1379,15 @@
 
 	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
 
+	spin_lock(&xprt_lock);
+	req->rq_next = xprt->free;
+	xprt->free   = req;
+	spin_unlock(&xprt_lock);
+
 	/* remove slot from queue of pending */
 	start_bh_atomic();
 	if (task->tk_rpcwait) {
 		printk("RPC: task of released request still queued!\n");
-#ifdef RPC_DEBUG
-		printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
-#endif
 		rpc_del_timer(task);
 		rpc_remove_wait_queue(task);
 	}
@@ -1313,31 +1396,7 @@
 	/* Decrease congestion value. */
 	xprt->cong -= RPC_CWNDSCALE;
 
-#if 0
-	/* If congestion threshold is not yet reached, pass on the request slot.
-	 * This looks kind of kludgy, but it guarantees backlogged requests
-	 * are served in order.
-	 * N.B. This doesn't look completely safe, as the task is still
-	 * on the backlog list after wake-up.
-	 */
-	if (!RPCXPRT_CONGESTED(xprt)) {
-		struct rpc_task	*next = rpc_wake_up_next(&xprt->backlog);
-
-		if (next && next->tk_rqstp == 0) {
-			xprt->cong += RPC_CWNDSCALE;
-			next->tk_rqstp = req;
-			xprt_request_init(next, xprt);
-			return;
-		}
-	}
-#endif
-
-	req->rq_next = xprt->free;
-	xprt->free   = req;
-
-	/* If not congested, wake up the next backlogged process */
-	if (!RPCXPRT_CONGESTED(xprt))
-		rpc_wake_up_next(&xprt->backlog);
+	xprt_clear_backlog(xprt);
 }
 
 /*
@@ -1382,11 +1441,7 @@
 	dprintk("RPC:      setting up %s transport...\n",
 				proto == IPPROTO_UDP? "UDP" : "TCP");
 
-#if LINUX_VERSION_CODE >= 0x020100
 	inet = sock->sk;
-#else
-	inet = (struct sock *) sock->data;
-#endif
 
 	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
 		return NULL;
@@ -1398,7 +1453,8 @@
 	xprt->addr = *ap;
 	xprt->prot = proto;
 	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
-	xprt->cwnd = RPC_INITCWND;
+	xprt->congtime = jiffies;
+	init_waitqueue_head(&xprt->cong_wait);
 #ifdef SOCK_HAS_USER_DATA
 	inet->user_data = xprt;
 #else
@@ -1410,11 +1466,14 @@
 	xprt->old_write_space = inet->write_space;
 	if (proto == IPPROTO_UDP) {
 		inet->data_ready = udp_data_ready;
+		inet->write_space = udp_write_space;
 		inet->no_check = UDP_CSUM_NORCV;
+		xprt->cwnd = RPC_INITCWND;
 	} else {
 		inet->data_ready = tcp_data_ready;
 		inet->state_change = tcp_state_change;
 		inet->write_space = tcp_write_space;
+		xprt->cwnd = RPC_MAXCWND;
 		xprt->nocong = 1;
 	}
 	xprt->connected = 1;
@@ -1487,6 +1546,7 @@
 			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
 
 	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
+
 	if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
 		printk("RPC: can't create socket (%d).\n", -err);
 		goto failed;
@@ -1543,6 +1603,21 @@
 	rpc_wake_up(&xprt->pending);
 	rpc_wake_up(&xprt->backlog);
 	rpc_wake_up(&xprt->reconn);
+	wake_up(&xprt->cong_wait);
+}
+
+/*
+ * Clear the xprt backlog queue
+ */
+int
+xprt_clear_backlog(struct rpc_xprt *xprt) {
+	if (!xprt)
+		return 0;
+	if (RPCXPRT_CONGESTED(xprt))
+		return 0;
+	rpc_wake_up_next(&xprt->backlog);
+	wake_up(&xprt->cong_wait);
+	return 1;
 }
 
 /*

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)