patch-2.3.99-pre3 linux/net/sunrpc/xprt.c

Next file: linux/scripts/mkdep.c
Previous file: linux/net/sunrpc/sunrpc_syms.c
Back to the patch index
Back to the overall index

diff -u --recursive --new-file v2.3.99-pre2/linux/net/sunrpc/xprt.c linux/net/sunrpc/xprt.c
@@ -68,7 +68,14 @@
 /* Following value should be > 32k + RPC overhead */
 #define XPRT_MIN_WRITE_SPACE 35000
 
+extern spinlock_t rpc_queue_lock;
+
+/*
+ * Local variables
+ */
+
 /* Spinlock for critical sections in the code. */
+spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;
 spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
 
 #ifdef RPC_DEBUG
@@ -86,14 +93,12 @@
  */
 static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
 static void	do_xprt_transmit(struct rpc_task *);
-static void	xprt_transmit_status(struct rpc_task *task);
-static void	xprt_transmit_timeout(struct rpc_task *task);
-static void	xprt_receive_status(struct rpc_task *task);
 static void	xprt_reserve_status(struct rpc_task *task);
 static void	xprt_disconnect(struct rpc_xprt *);
-static void	xprt_reconn_timeout(struct rpc_task *task);
-static struct socket *xprt_create_socket(int, struct sockaddr_in *,
-					struct rpc_timeout *);
+static void	xprt_reconn_status(struct rpc_task *task);
+static struct socket *xprt_create_socket(int, struct rpc_timeout *);
+static int	xprt_bind_socket(struct rpc_xprt *, struct socket *);
+static void	xprt_remove_pending(struct rpc_xprt *);
 
 #ifdef RPC_DEBUG_DATA
 /*
@@ -140,7 +145,7 @@
  */
  
 extern inline void
-xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
+xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
 {
 	struct iovec *iv=msg->msg_iov;
 	int i;
@@ -148,7 +153,7 @@
 	/*
 	 *	Eat any sent iovecs
 	 */
-	while(iv->iov_len <= amount) {
+	while (iv->iov_len <= amount) {
 		amount -= iv->iov_len;
 		iv++;
 		msg->msg_iovlen--;
@@ -184,14 +189,17 @@
 	int		slen = req->rq_slen - req->rq_bytes_sent;
 	struct iovec	niv[MAX_IOVEC];
 
-	if (slen == 0)
+	if (slen <= 0)
 		return 0;
 
+	if (!sock)
+		return -ENOTCONN;
+
 	xprt_pktdump("packet data:",
 				req->rq_svec->iov_base,
 				req->rq_svec->iov_len);
 
-	msg.msg_flags   = MSG_DONTWAIT;
+	msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
 	msg.msg_iov	= req->rq_svec;
 	msg.msg_iovlen	= req->rq_snr;
 	msg.msg_name	= (struct sockaddr *) &xprt->addr;
@@ -238,23 +246,30 @@
 /*
  * Read data from socket
  */
-static inline int
-xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
+static int
+xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
 {
 	struct socket	*sock = xprt->sock;
-	struct sockaddr_in sin;
 	struct msghdr	msg;
 	mm_segment_t	oldfs;
+	struct iovec	niv[MAX_IOVEC];
 	int		result;
 
-	msg.msg_flags   = MSG_DONTWAIT;
+	if (!sock)
+		return -ENOTCONN;
+
+	msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
 	msg.msg_iov	= iov;
 	msg.msg_iovlen	= nr;
-	msg.msg_name	= &sin;
-	msg.msg_namelen = sizeof(sin);
+	msg.msg_name	= NULL;
+	msg.msg_namelen = 0;
 	msg.msg_control = NULL;
 	msg.msg_controllen = 0;
 
+	/* Adjust the iovec if we've already filled it */
+	if (shift)
+		xprt_move_iov(&msg, niv, shift);
+
 	oldfs = get_fs(); set_fs(get_ds());
 	result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
 	set_fs(oldfs);
@@ -309,21 +324,30 @@
 int
 xprt_adjust_timeout(struct rpc_timeout *to)
 {
-	if (to->to_exponential)
-		to->to_current <<= 1;
-	else
-		to->to_current += to->to_increment;
-	if (to->to_maxval && to->to_current >= to->to_maxval) {
-		to->to_current = to->to_maxval;
-		to->to_retries = 0;
+	if (to->to_retries > 0) {
+		if (to->to_exponential)
+			to->to_current <<= 1;
+		else
+			to->to_current += to->to_increment;
+		if (to->to_maxval && to->to_current >= to->to_maxval)
+			to->to_current = to->to_maxval;
+	} else {
+		if (to->to_exponential)
+			to->to_initval <<= 1;
+		else
+			to->to_initval += to->to_increment;
+		if (to->to_maxval && to->to_initval >= to->to_maxval)
+			to->to_initval = to->to_maxval;
+		to->to_current = to->to_initval;
 	}
+
 	if (!to->to_current) {
 		printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
 		to->to_current = 5 * HZ;
 	}
 	pprintk("RPC: %lu %s\n", jiffies,
 			to->to_retries? "retrans" : "timeout");
-	return (to->to_retries)--;
+	return to->to_retries-- > 0;
 }
 
 /*
@@ -332,22 +356,29 @@
 static void
 xprt_close(struct rpc_xprt *xprt)
 {
+	struct socket	*sock = xprt->sock;
 	struct sock	*sk = xprt->inet;
 
-	xprt_disconnect(xprt);
+	if (!sk)
+		return;
+
+	xprt->inet = NULL;
+	xprt->sock = NULL;
 
 	sk->user_data    = NULL;
 	sk->data_ready   = xprt->old_data_ready;
 	sk->state_change = xprt->old_state_change;
 	sk->write_space  = xprt->old_write_space;
+
+	xprt_disconnect(xprt);
 	sk->no_check	 = 0;
 
-	sock_release(xprt->sock);
+	sock_release(sock);
 	/*
 	 *	TCP doesnt require the rpciod now - other things may
 	 *	but rpciod handles that not us.
 	 */
-	if(xprt->stream && !xprt->connecting)
+	if(xprt->stream)
 		rpciod_down();
 }
 
@@ -360,14 +391,10 @@
 	dprintk("RPC:      disconnected transport %p\n", xprt);
 	xprt->connected = 0;
 	xprt->tcp_offset = 0;
-	xprt->tcp_more = 0;
-	xprt->tcp_total = 0;
-	xprt->tcp_reclen = 0;
 	xprt->tcp_copied = 0;
-	xprt->tcp_rqstp  = NULL;
-	xprt->rx_pending_flag = 0;
+	xprt->tcp_more = 0;
+	xprt_remove_pending(xprt);
 	rpc_wake_up_status(&xprt->pending, -ENOTCONN);
-	rpc_wake_up_status(&xprt->sending, -ENOTCONN);
 }
 
 /*
@@ -377,85 +404,87 @@
 xprt_reconnect(struct rpc_task *task)
 {
 	struct rpc_xprt	*xprt = task->tk_xprt;
-	struct socket	*sock;
-	struct sock	*inet;
+	struct socket	*sock = xprt->sock;
+	struct sock	*inet = xprt->inet;
 	int		status;
 
 	dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
 				task->tk_pid, xprt, xprt->connected);
-	task->tk_status = 0;
-
 	if (xprt->shutdown)
 		return;
 
 	if (!xprt->stream)
 		return;
 
-	spin_lock_bh(&xprt_lock);
-	if (xprt->connected) {
-		spin_unlock_bh(&xprt_lock);
+	if (!xprt->addr.sin_port) {
+		task->tk_status = -EIO;
 		return;
 	}
+
+	spin_lock(&xprt_lock);
 	if (xprt->connecting) {
-		task->tk_timeout = xprt->timeout.to_maxval;
+		task->tk_timeout = 0;
 		rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
-		spin_unlock_bh(&xprt_lock);
+		spin_unlock(&xprt_lock);
 		return;
 	}
 	xprt->connecting = 1;
-	spin_unlock_bh(&xprt_lock);
-
-	/* Create an unconnected socket */
-	if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) {
-		xprt->connecting = 0;
-		goto defer;
-	}
-
-	inet = sock->sk;
-	inet->data_ready   = xprt->inet->data_ready;
-	inet->state_change = xprt->inet->state_change;
-	inet->write_space  = xprt->inet->write_space;
-	inet->user_data    = xprt;
+	spin_unlock(&xprt_lock);
 
-	dprintk("RPC: %4d closing old socket\n", task->tk_pid);
-	xprt_close(xprt);
+	status = -ENOTCONN;
+	if (!inet) {
+		/* Create an unconnected socket */
+		if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
+			goto defer;
+		xprt_bind_socket(xprt, sock);
+		inet = sock->sk;
+	}
 
-	/* Reset to new socket */
-	xprt->sock = sock;
-	xprt->inet = inet;
+	xprt_disconnect(xprt);
 
 	/* Now connect it asynchronously. */
 	dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
 	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
 				sizeof(xprt->addr), O_NONBLOCK);
 
-	xprt->connecting = 0;
 	if (status < 0) {
-		if (status != -EINPROGRESS && status != -EALREADY) {
+		switch (status) {
+		case -EALREADY:
+		case -EINPROGRESS:
+			status = 0;
+			break;
+		case -EISCONN:
+		case -EPIPE:
+			status = 0;
+			xprt_close(xprt);
+			goto defer;
+		default:
 			printk("RPC: TCP connect error %d!\n", -status);
+			xprt_close(xprt);
 			goto defer;
 		}
 
 		dprintk("RPC: %4d connect status %d connected %d\n",
 				task->tk_pid, status, xprt->connected);
-		task->tk_timeout = 60 * HZ;
 
-		spin_lock_bh(&xprt_lock);
+		spin_lock_bh(&xprt_sock_lock);
 		if (!xprt->connected) {
-			rpc_sleep_on(&xprt->reconn, task,
-				NULL, xprt_reconn_timeout);
-			spin_unlock_bh(&xprt_lock);
+			task->tk_timeout = xprt->timeout.to_maxval;
+			rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL);
+			spin_unlock_bh(&xprt_sock_lock);
 			return;
 		}
-		spin_unlock_bh(&xprt_lock);
+		spin_unlock_bh(&xprt_sock_lock);
 	}
-
-
 defer:
-	spin_lock_bh(&xprt_lock);
-	if (!xprt->connected)
-		rpc_wake_up_next(&xprt->reconn);
-	spin_unlock_bh(&xprt_lock);
+	spin_lock(&xprt_lock);
+	xprt->connecting = 0;
+	if (status < 0) {
+		rpc_delay(task, 5*HZ);
+		task->tk_status = -ENOTCONN;
+	}
+	rpc_wake_up(&xprt->reconn);
+	spin_unlock(&xprt_lock);
 }
 
 /*
@@ -463,29 +492,21 @@
  * process of reconnecting, and leave the rest to the upper layers.
  */
 static void
-xprt_reconn_timeout(struct rpc_task *task)
+xprt_reconn_status(struct rpc_task *task)
 {
-	spin_lock_bh(&xprt_lock);
+	struct rpc_xprt	*xprt = task->tk_xprt;
+
 	dprintk("RPC: %4d xprt_reconn_timeout %d\n",
 				task->tk_pid, task->tk_status);
-	task->tk_status = -ENOTCONN;
-	if (task->tk_xprt->connecting)
-		task->tk_xprt->connecting = 0;
-	if (!task->tk_xprt->connected)
-		task->tk_status = -ENOTCONN;
-	else
-		task->tk_status = -ETIMEDOUT;
-	task->tk_timeout = 0;
-	rpc_wake_up_task(task);
-	spin_unlock_bh(&xprt_lock);
+
+	spin_lock(&xprt_lock);
+	xprt->connecting = 0;
+	rpc_wake_up(&xprt->reconn);
+	spin_unlock(&xprt_lock);
 }
 
-extern spinlock_t rpc_queue_lock;
 /*
- * Look up the RPC request corresponding to a reply.
- *
- * RED-PEN: Niiice... Guys, when will we learn finally that locking
- * in this manner is NOOP? --ANK
+ * Look up the RPC request corresponding to a reply, and then lock it.
  */
 static inline struct rpc_rqst *
 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
@@ -511,6 +532,8 @@
  out_bad:
 	req = NULL;
  out:
+	if (req && !rpc_lock_task(req->rq_task))
+		req = NULL;
 	spin_unlock_bh(&rpc_queue_lock);
 	return req;
 }
@@ -524,9 +547,6 @@
 {
 	struct rpc_task	*task = req->rq_task;
 
-	req->rq_rlen   = copied;
-	req->rq_gotit  = 1;
-
 	/* Adjust congestion window */
 	xprt_adjust_cwnd(xprt, copied);
 
@@ -549,12 +569,11 @@
 	}
 #endif
 
-	/* ... and wake up the process. */
 	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
 	task->tk_status = copied;
 
-	if (!RPC_IS_RUNNING(task))
-		rpc_wake_up_task(task);
+	/* ... and wake up the process. */
+	rpc_wake_up_task(task);
 	return;
 }
 
@@ -612,6 +631,7 @@
 static inline void
 udp_data_ready(struct sock *sk, int len)
 {
+	struct rpc_task	*task;
 	struct rpc_xprt	*xprt;
 	struct rpc_rqst *rovr;
 	struct sk_buff	*skb;
@@ -626,7 +646,10 @@
 	dprintk("RPC:      udp_data_ready client %p\n", xprt);
 
 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
-		goto out_err;
+		return;
+
+	if (xprt->shutdown)
+		goto dropit;
 
 	repsize = skb->len - sizeof(struct udphdr);
 	if (repsize < 4) {
@@ -634,14 +657,15 @@
 		goto dropit;
 	}
 
-	/* Look up the request corresponding to the given XID */
-	if (!(rovr = xprt_lookup_rqst(xprt,
-				      *(u32 *) (skb->h.raw + sizeof(struct udphdr)))))
+	/* Look up and lock the request corresponding to the given XID */
+	rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
+	if (!rovr)
 		goto dropit;
+	task = rovr->rq_task;
 
-	dprintk("RPC: %4d received reply\n", rovr->rq_task->tk_pid);
+	dprintk("RPC: %4d received reply\n", task->tk_pid);
 	xprt_pktdump("packet data:",
-		     (u32 *) (skb->h.raw + sizeof(struct udphdr)), repsize);
+		     (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
 
 	if ((copied = rovr->rq_rlen) > repsize)
 		copied = repsize;
@@ -649,213 +673,287 @@
 	rovr->rq_damaged  = 1;
 	/* Suck it into the iovec, verify checksum if not done by hw. */
 	if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
-		goto dropit;
+		goto out_unlock;
 
 	/* Something worked... */
 	dst_confirm(skb->dst);
 
 	xprt_complete_rqst(xprt, rovr, copied);
 
-dropit:
+ out_unlock:
+	rpc_unlock_task(task);
+
+ dropit:
 	skb_free_datagram(sk, skb);
-	return;
-out_err:
-	return;
 }
 
 /*
- * TCP record receive routine
- * This is not the most efficient code since we call recvfrom twice--
- * first receiving the record marker and XID, then the data.
- * 
- * The optimal solution would be a RPC support in the TCP layer, which
- * would gather all data up to the next record marker and then pass us
- * the list of all TCP segments ready to be copied.
+ * TCP read fragment marker
  */
 static inline int
-tcp_input_record(struct rpc_xprt *xprt)
+tcp_read_fraghdr(struct rpc_xprt *xprt)
 {
-	struct rpc_rqst	*req;
-	struct iovec	*iov;
 	struct iovec	riov;
-	u32		offset;
-	int		result, maxcpy, reclen, avail, want;
+	int		want, result;
 
-	dprintk("RPC:      tcp_input_record\n");
+	if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) {
+		xprt->tcp_offset = 0;
+		xprt->tcp_reclen = 0;
+	}
+	if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
+		goto done;
 
-	offset = xprt->tcp_offset;
-	result = -EAGAIN;
-	if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
-		want = (xprt->tcp_more? 4 : 8) - offset;
-		dprintk("RPC:      reading header (%d bytes)\n", want);
-		riov.iov_base = xprt->tcp_recm.data + offset;
+	want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+	dprintk("RPC:      reading header (%d bytes)\n", want);
+	do {
+		riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
 		riov.iov_len  = want;
-		result = xprt_recvmsg(xprt, &riov, 1, want);
+		result = xprt_recvmsg(xprt, &riov, 1, want, 0);
 		if (result < 0)
-			goto done;
-		offset += result;
-		if (result < want) {
-			result = -EAGAIN;
-			goto done;
-		}
+			return result;
+		xprt->tcp_offset += result;
+		want -= result;
+	} while (want);
 
-		/* Get the record length and mask out the more_fragments bit */
-		reclen = ntohl(xprt->tcp_reclen);
-		dprintk("RPC:      reclen %08x\n", reclen);
-		xprt->tcp_more = (reclen & 0x80000000)? 0 : 1;
-		reclen &= 0x7fffffff;
-		xprt->tcp_total += reclen;
-		xprt->tcp_reclen = reclen;
-
-		dprintk("RPC:      got xid %08x reclen %d morefrags %d\n",
-			xprt->tcp_xid, xprt->tcp_reclen, xprt->tcp_more);
-		if (!xprt->tcp_copied
-		 && (req = xprt_lookup_rqst(xprt, xprt->tcp_xid))) {
-			iov = xprt->tcp_iovec;
-			memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
-#if 0
-*(u32 *)iov->iov_base = req->rq_xid;
-#endif
-			iov->iov_base += 4;
-			iov->iov_len  -= 4;
-			xprt->tcp_copied = 4;
-			xprt->tcp_rqstp  = req;
-		}
-	} else {
-		reclen = xprt->tcp_reclen;
-	}
+	/* Is this another fragment in the last message */
+	if (!xprt->tcp_more)
+		xprt->tcp_copied = 0; /* No, so we're reading a new message */
+
+	/* Get the record length and mask out the last fragment bit */
+	xprt->tcp_reclen = ntohl(xprt->tcp_recm);
+	xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
+	xprt->tcp_reclen &= 0x7fffffff;
+
+	dprintk("RPC:      New record reclen %d morefrags %d\n",
+				   xprt->tcp_reclen, xprt->tcp_more);
+ done:
+	return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
+}
+
+/*
+ * TCP read xid
+ */
+static inline int
+tcp_read_xid(struct rpc_xprt *xprt, int avail)
+{
+	struct iovec	riov;
+	int		want, result;
+
+	if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
+		goto done;
+	want = MIN(sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
+	do {
+		dprintk("RPC:      reading xid (%d bytes)\n", want);
+		riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
+		riov.iov_len  = want;
+		result = xprt_recvmsg(xprt, &riov, 1, want, 0);
+		if (result < 0)
+			return result;
+		xprt->tcp_copied += result;
+		xprt->tcp_offset += result;
+		want  -= result;
+		avail -= result;
+	} while (want);
+ done:
+	return avail;
+}
 
-	avail = reclen - (offset - 4);
-	if ((req = xprt->tcp_rqstp) && req->rq_xid == xprt->tcp_xid
-	 && req->rq_task->tk_rpcwait == &xprt->pending) {
-		want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
+/*
+ * TCP read and complete request
+ */
+static inline int
+tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
+{
+	int	want, result;
 
+	if (req->rq_rlen <= xprt->tcp_copied || !avail)
+		goto done;
+	want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
+	do {
 		dprintk("RPC: %4d TCP receiving %d bytes\n",
-					req->rq_task->tk_pid, want);
-		/* Request must be re-encoded before retransmit */
-		req->rq_damaged = 1;
-		result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
+			req->rq_task->tk_pid, want);
+
+		result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
 		if (result < 0)
-			goto done;
+			return result;
 		xprt->tcp_copied += result;
-		offset += result;
+		xprt->tcp_offset += result;
 		avail  -= result;
-		if (result < want) {
-			result = -EAGAIN;
-			goto done;
-		}
+		want   -= result;
+	} while (want);
 
-		maxcpy = MIN(req->rq_rlen, xprt->tcp_total);
-		if (xprt->tcp_copied == maxcpy && !xprt->tcp_more) {
-			dprintk("RPC: %4d received reply complete\n",
-					req->rq_task->tk_pid);
-			xprt_complete_rqst(xprt, req, xprt->tcp_total);
-			xprt->tcp_copied = 0;
-			xprt->tcp_rqstp  = NULL;
-		}
-	}
+ done:
+	if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
+		return avail;
+	dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
+	xprt_complete_rqst(xprt, req, xprt->tcp_copied);
 
-	/* Skip over any trailing bytes on short reads */
-	while (avail > 0) {
-		static u8	dummy[64];
+	return avail;
+}
 
+/*
+ * TCP discard extra bytes from a short read
+ */
+static inline int
+tcp_read_discard(struct rpc_xprt *xprt, int avail)
+{
+	struct iovec	riov;
+	static u8	dummy[64];
+	int		want, result = 0;
+
+	while (avail) {
 		want = MIN(avail, sizeof(dummy));
 		riov.iov_base = dummy;
 		riov.iov_len  = want;
 		dprintk("RPC:      TCP skipping %d bytes\n", want);
-		result = xprt_recvmsg(xprt, &riov, 1, want);
+		result = xprt_recvmsg(xprt, &riov, 1, want, 0);
 		if (result < 0)
-			goto done;
-		offset += result;
+			return result;
+		xprt->tcp_offset += result;
 		avail  -= result;
-		if (result < want) {
-			result = -EAGAIN;
-			goto done;
+	}
+	return avail;
+}
+
+/*
+ * TCP record receive routine
+ * This is not the most efficient code since we call recvfrom thrice--
+ * first receiving the record marker, then the XID, then the data.
+ * 
+ * The optimal solution would be a RPC support in the TCP layer, which
+ * would gather all data up to the next record marker and then pass us
+ * the list of all TCP segments ready to be copied.
+ */
+static int
+tcp_input_record(struct rpc_xprt *xprt)
+{
+	struct rpc_rqst	*req = NULL;
+	struct rpc_task	*task = NULL;
+	int		avail, result;
+
+	dprintk("RPC:      tcp_input_record\n");
+
+	if (xprt->shutdown)
+		return -EIO;
+	if (!xprt->connected)
+		return -ENOTCONN;
+
+	/* Read in a new fragment marker if necessary */
+	/* Can we ever really expect to get completely empty fragments? */
+	if ((result = tcp_read_fraghdr(xprt)) <= 0)
+		return result;
+	avail = result;
+
+	/* Read in the xid if necessary */
+	if ((result = tcp_read_xid(xprt, avail)) <= 0)
+		return result;
+	avail = result;
+
+	/* Find and lock the request corresponding to this xid */
+	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
+	if (req) {
+		task = req->rq_task;
+		if (xprt->tcp_copied == sizeof(xprt->tcp_xid) || req->rq_damaged) {
+			req->rq_damaged = 1;
+			/* Read in the request data */
+			result = tcp_read_request(xprt,  req, avail);
 		}
+		rpc_unlock_task(task);
+		if (result < 0)
+			return result;
+		avail = result;
 	}
-	if (!xprt->tcp_more)
-		xprt->tcp_total = 0;
-	offset = 0;
 
-done:
-	dprintk("RPC:      tcp_input_record done (off %d total %d copied %d)\n",
-			offset, xprt->tcp_total, xprt->tcp_copied);
-	xprt->tcp_offset = offset;
+	/* Skip over any trailing bytes on short reads */
+	if ((result = tcp_read_discard(xprt, avail)) < 0)
+		return result;
+
+	dprintk("RPC:      tcp_input_record done (off %d reclen %d copied %d)\n",
+			xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
+	result = xprt->tcp_reclen;
 	return result;
 }
 
 /*
  *	TCP task queue stuff
  */
- 
-static struct rpc_xprt *rpc_xprt_pending = NULL;	/* Chain by rx_pending of rpc_xprt's */
+LIST_HEAD(rpc_xprt_pending);	/* List of xprts having pending tcp requests */
+
+static inline
+void tcp_rpciod_queue(void)
+{
+	rpciod_wake_up();
+}
+
+static inline
+void xprt_append_pending(struct rpc_xprt *xprt)
+{
+	if (!list_empty(&xprt->rx_pending))
+		return;
+	spin_lock_bh(&rpc_queue_lock);
+	if (list_empty(&xprt->rx_pending)) {
+		list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
+		dprintk("RPC:     xprt queue %p\n", xprt);
+		tcp_rpciod_queue();
+	}
+	spin_unlock_bh(&rpc_queue_lock);
+}
+
+static
+void xprt_remove_pending(struct rpc_xprt *xprt)
+{
+	spin_lock_bh(&rpc_queue_lock);
+	if (!list_empty(&xprt->rx_pending)) {
+		list_del(&xprt->rx_pending);
+		INIT_LIST_HEAD(&xprt->rx_pending);
+	}
+	spin_unlock_bh(&rpc_queue_lock);
+}
+
+static inline
+struct rpc_xprt *xprt_remove_pending_next(void)
+{
+	struct rpc_xprt	*xprt = NULL;
+
+	spin_lock_bh(&rpc_queue_lock);
+	if (!list_empty(&rpc_xprt_pending)) {
+		xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
+		list_del(&xprt->rx_pending);
+		INIT_LIST_HEAD(&xprt->rx_pending);
+	}
+	spin_unlock_bh(&rpc_queue_lock);
+	return xprt;
+}
 
 /*
  *	This is protected from tcp_data_ready and the stack as its run
  *	inside of the RPC I/O daemon
  */
-static void
-do_rpciod_tcp_dispatcher(void)
+void
+__rpciod_tcp_dispatcher(void)
 {
 	struct rpc_xprt *xprt;
-	int result = 0;
+	int safe_retry = 0, result;
 
 	dprintk("rpciod_tcp_dispatcher: Queue Running\n");
 
 	/*
 	 *	Empty each pending socket
 	 */
- 
-	while(1) {
-		int safe_retry=0;
-
-		if ((xprt = rpc_xprt_pending) == NULL) {
-			break;
-		}
-		xprt->rx_pending_flag = 0;
-		rpc_xprt_pending=xprt->rx_pending;
-		xprt->rx_pending = NULL;
-
+	while ((xprt = xprt_remove_pending_next()) != NULL) {
 		dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
 
-		do 
-		{
-			if (safe_retry++ > 50)
-				break;
+		do {
 			result = tcp_input_record(xprt);
-		}
-		while (result >= 0);
+		} while (result >= 0);
 
-		switch (result) {
-			case -EAGAIN:
-			case -ENOTCONN:
-			case -EPIPE:
-				continue;
-			default:
-				printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
-					result);
+		if (safe_retry++ > 200) {
+			schedule();
+			safe_retry = 0;
 		}
 	}
 }
 
-void rpciod_tcp_dispatcher(void)
-{
-	/* mama... start_bh_atomic was here...
-	   Calls to sock->ops _are_ _impossible_ with disabled bh. Period. --ANK
-	 */
-	do_rpciod_tcp_dispatcher();
-}
-
-int xprt_tcp_pending(void)
-{
-	return rpc_xprt_pending != NULL;
-}
-
-extern inline void tcp_rpciod_queue(void)
-{
-	rpciod_wake_up();
-}
-
 /*
  *	data_ready callback for TCP. We can't just jump into the
  *	tcp recvmsg functions inside of the network receive bh or
@@ -874,24 +972,15 @@
 		return;
 	}
 
+	if (xprt->shutdown)
+		return;
+
+	xprt_append_pending(xprt);
+
 	dprintk("RPC:      tcp_data_ready client %p\n", xprt);
 	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
 				sk->state, xprt->connected,
 				sk->dead, sk->zapped);
-	/*
-	 *	If we are not waiting for the RPC bh run then
-	 *	we are now
-	 */
-	if (!xprt->rx_pending_flag) {
-		dprintk("RPC:     xprt queue %p\n", rpc_xprt_pending);
-
-		xprt->rx_pending=rpc_xprt_pending;
-		rpc_xprt_pending=xprt;
-		xprt->rx_pending_flag=1;
-	} else
-		dprintk("RPC:     xprt queued already %p\n", xprt);
-	tcp_rpciod_queue();
-
 }
 
 
@@ -907,26 +996,20 @@
 				sk->state, xprt->connected,
 				sk->dead, sk->zapped);
 
-	switch(sk->state) {
+	spin_lock_bh(&xprt_sock_lock);
+	switch (sk->state) {
 	case TCP_ESTABLISHED:
-		if (xprt->connected)
-			break;
 		xprt->connected = 1;
-		xprt->connecting = 0;
+		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
+			rpc_wake_up_task(xprt->snd_task);
 		rpc_wake_up(&xprt->reconn);
-		rpc_wake_up_next(&xprt->sending);
-		tcp_rpciod_queue();
-		break;
-	case TCP_CLOSE:
-		if (xprt->connecting)
-			break;
-		xprt_disconnect(xprt);
-		rpc_wake_up_status(&xprt->reconn,  -ENOTCONN);
 		break;
 	default:
+		xprt->connected = 0;
+		rpc_wake_up_status(&xprt->pending, -ENOTCONN);
 		break;
 	}
-
+	spin_unlock_bh(&xprt_sock_lock);
 }
 
 /*
@@ -940,20 +1023,23 @@
 
 	if (!(xprt = xprt_from_sock(sk)))
 		return;
+	if (xprt->shutdown)
+		return;
 
 	/* Wait until we have enough socket memory */
 	if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
 		return;
 
+	spin_lock_bh(&xprt_sock_lock);
 	if (xprt->write_space)
-		return;
+		goto out_unlock;
 
 	xprt->write_space = 1;
 
-	if (!xprt->snd_task)
-		rpc_wake_up_next(&xprt->sending);
-	else if (!RPC_IS_RUNNING(xprt->snd_task))
+	if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
 		rpc_wake_up_task(xprt->snd_task);
+ out_unlock:
+	spin_unlock_bh(&xprt_sock_lock);
 }
 
 static void
@@ -963,20 +1049,24 @@
 
 	if (!(xprt = xprt_from_sock(sk)))
 		return;
+	if (xprt->shutdown)
+		return;
 
 
 	/* Wait until we have enough socket memory */
 	if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
 		return;
 
+	spin_lock_bh(&xprt_sock_lock);
 	if (xprt->write_space)
-		return;
+		goto out_unlock;
 
 	xprt->write_space = 1;
-	if (!xprt->snd_task)
-		rpc_wake_up_next(&xprt->sending);
-	else if (!RPC_IS_RUNNING(xprt->snd_task))
+
+	if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
 		rpc_wake_up_task(xprt->snd_task);
+ out_unlock:
+	spin_unlock_bh(&xprt_sock_lock);
 }
 
 /*
@@ -987,9 +1077,8 @@
 {
 	struct rpc_rqst	*req = task->tk_rqstp;
 
-	if (req) {
+	if (req)
 		xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
-	}
 
 	dprintk("RPC: %4d xprt_timer (%s request)\n",
 		task->tk_pid, req ? "pending" : "backlogged");
@@ -1010,12 +1099,13 @@
 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 	struct rpc_rqst	*req = task->tk_rqstp;
 
-	spin_lock_bh(&xprt_lock);
+	spin_lock(&xprt_lock);
 	if (xprt->snd_task && xprt->snd_task != task) {
 		dprintk("RPC: %4d TCP write queue full (task %d)\n",
 			task->tk_pid, xprt->snd_task->tk_pid);
-		task->tk_timeout = req->rq_timeout.to_current;
-		rpc_sleep_on(&xprt->sending, task, xprt_transmit, NULL);
+		task->tk_timeout = 0;
+		task->tk_status = -EAGAIN;
+		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
 	} else if (!xprt->snd_task) {
 		xprt->snd_task = task;
 #ifdef RPC_PROFILE
@@ -1023,23 +1113,23 @@
 #endif
 		req->rq_bytes_sent = 0;
 	}
-	spin_unlock_bh(&xprt_lock);
+	spin_unlock(&xprt_lock);
 	return xprt->snd_task == task;
 }
 
 /*
  * Releases the socket for use by other requests.
  */
-static void
+static inline void
 xprt_up_transmit(struct rpc_task *task)
 {
 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 
 	if (xprt->snd_task && xprt->snd_task == task) {
-		spin_lock_bh(&xprt_lock);
+		spin_lock(&xprt_lock);
 		xprt->snd_task = NULL;
 		rpc_wake_up_next(&xprt->sending);
-		spin_unlock_bh(&xprt_lock);
+		spin_unlock(&xprt_lock);
 	}
 }
 
@@ -1050,7 +1140,6 @@
 void
 xprt_transmit(struct rpc_task *task)
 {
-	struct rpc_timeout *timeo;
 	struct rpc_rqst	*req = task->tk_rqstp;
 	struct rpc_xprt	*xprt = req->rq_xprt;
 
@@ -1060,26 +1149,21 @@
 	if (xprt->shutdown)
 		task->tk_status = -EIO;
 
+	if (!xprt->connected)
+		task->tk_status = -ENOTCONN;
+
 	if (task->tk_status < 0)
 		return;
 
-	/* Reset timeout parameters */
-	timeo = &req->rq_timeout;
-	if (timeo->to_retries < 0) {
-		dprintk("RPC: %4d xprt_transmit reset timeo\n",
-					task->tk_pid);
-		timeo->to_retries = xprt->timeout.to_retries;
-		timeo->to_current = timeo->to_initval;
-	}
+	if (task->tk_rpcwait)
+		rpc_remove_wait_queue(task);
 
 	/* set up everything as needed. */
 	/* Write the record marker */
 	if (xprt->stream) {
-		u32	marker;
-
-		marker = htonl(0x80000000|(req->rq_slen-4));
-		*((u32 *) req->rq_svec[0].iov_base) = marker;
+		u32	*marker = req->rq_svec[0].iov_base;
 
+		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
 	}
 
 	if (!xprt_down_transmit(task))
@@ -1095,24 +1179,14 @@
 	struct rpc_xprt	*xprt = req->rq_xprt;
 	int status, retry = 0;
 
-	if (xprt->shutdown) {
-		task->tk_status = -EIO;
-		goto out_release;
-	}
 
 	/* For fast networks/servers we have to put the request on
 	 * the pending list now:
+	 * Note that we don't want the task timing out during the
+	 * call to xprt_sendmsg(), so we initially disable the timeout,
+	 * and then reset it later...
 	 */
-	req->rq_gotit = 0;
-	status = rpc_add_wait_queue(&xprt->pending, task);
-	if (!status)
-		task->tk_callback = NULL;
-
-	if (status) {
-		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-		task->tk_status = status;
-		goto out_release;
-	}
+	xprt_receive(task);
 
 	/* Continue transmitting the packet/record. We must be careful
 	 * to cope with writespace callbacks arriving _after_ we have
@@ -1129,80 +1203,67 @@
 			req->rq_bytes_sent += status;
 
 			if (req->rq_bytes_sent >= req->rq_slen)
-				goto out_release;
-		}
-
-		if (status < req->rq_slen)
-			status = -EAGAIN;
-
-		if (status >= 0 || !xprt->stream) {
-			dprintk("RPC: %4d xmit complete\n", task->tk_pid);
-			goto out_release;
+				goto out_receive;
+		} else {
+			if (status >= req->rq_slen)
+				goto out_receive;
+			status = -ENOMEM;
+			break;
 		}
 
 		dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
 				task->tk_pid, req->rq_slen - req->rq_bytes_sent,
 				req->rq_slen);
 
+		status = -EAGAIN;
 		if (retry++ > 50)
 			break;
 	}
+	rpc_unlock_task(task);
 
-	task->tk_status = (status == -ENOMEM) ? -EAGAIN : status;
+	task->tk_status = status;
 
-	/* We don't care if we got a reply, so don't protect
-	 * against bh. */
-	if (task->tk_rpcwait == &xprt->pending)
-		rpc_remove_wait_queue(task);
+	/* Note: at this point, task->tk_sleeping has not yet been set,
+	 *	 hence there is no danger of the waking up task being put on
+	 *	 schedq, and being picked up by a parallel run of rpciod().
+	 */
+	rpc_wake_up_task(task);
+	if (!RPC_IS_RUNNING(task))
+		goto out_release;
 
-	/* Protect against (udp|tcp)_write_space */
-	spin_lock_bh(&xprt_lock);
-	if (status == -ENOMEM || status == -EAGAIN) {
+	switch (status) {
+	case -ENOMEM:
+		/* Protect against (udp|tcp)_write_space */
 		task->tk_timeout = req->rq_timeout.to_current;
+		spin_lock_bh(&xprt_sock_lock);
 		if (!xprt->write_space)
-			rpc_sleep_on(&xprt->sending, task, xprt_transmit_status,
-				     xprt_transmit_timeout);
-		spin_unlock_bh(&xprt_lock);
+			rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+		spin_unlock_bh(&xprt_sock_lock);
 		return;
-	}
-	spin_unlock_bh(&xprt_lock);
-
-out_release:
-	xprt_up_transmit(task);
-}
-
-/*
- * This callback is invoked when the sending task is forced to sleep
- * because the TCP write buffers are full
- */
-static void
-xprt_transmit_status(struct rpc_task *task)
-{
-	struct rpc_xprt	*xprt = task->tk_client->cl_xprt;
-
-	dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
-	if (xprt->snd_task == task) {
-		task->tk_status = 0;
-		do_xprt_transmit(task);
+	case -EAGAIN:
+		/* Keep holding the socket if it is blocked */
+		rpc_delay(task, HZ>>4);
 		return;
+	case -ECONNREFUSED:
+	case -ENOTCONN:
+		if (!xprt->stream)
+			return;
+	default:
+		goto out_release;
 	}
-}
 
-/*
- * RPC transmit timeout handler.
- */
-static void
-xprt_transmit_timeout(struct rpc_task *task)
-{
-	dprintk("RPC: %4d transmit_timeout %d\n", task->tk_pid, task->tk_status);
-	task->tk_status  = -ETIMEDOUT;
-	task->tk_timeout = 0;
-	rpc_wake_up_task(task);
+ out_receive:
+	dprintk("RPC: %4d xmit complete\n", task->tk_pid);
+	/* Set the task's receive timeout value */
+	task->tk_timeout = req->rq_timeout.to_current;
+	rpc_add_timer(task, xprt_timer);
+	rpc_unlock_task(task);
+ out_release:
 	xprt_up_transmit(task);
 }
 
 /*
- * Wait for the reply to our call.
+ * Queue the task for a reply to our call.
  * When the callback is invoked, the congestion window should have
  * been updated already.
  */
@@ -1214,42 +1275,8 @@
 
 	dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
 
-	/*
-         * Wait until rq_gotit goes non-null, or timeout elapsed.
-	 */
-	task->tk_timeout = req->rq_timeout.to_current;
-
-	spin_lock_bh(&xprt_lock);
-	if (task->tk_rpcwait)
-		rpc_remove_wait_queue(task);
-
-	if (task->tk_status < 0 || xprt->shutdown) {
-		spin_unlock_bh(&xprt_lock);
-		goto out;
-	}
-
-	if (!req->rq_gotit) {
-		rpc_sleep_on(&xprt->pending, task,
-				xprt_receive_status, xprt_timer);
-		spin_unlock_bh(&xprt_lock);
-		return;
-	}
-	spin_unlock_bh(&xprt_lock);
-
-	dprintk("RPC: %4d xprt_receive returns %d\n",
-				task->tk_pid, task->tk_status);
- out:
-	xprt_receive_status(task);
-}
-
-static void
-xprt_receive_status(struct rpc_task *task)
-{
-	struct rpc_xprt	*xprt = task->tk_xprt;
-
-	if (xprt->tcp_rqstp == task->tk_rqstp)
-		xprt->tcp_rqstp = NULL;
-
+	task->tk_timeout = 0;
+	rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
 }
 
 /*
@@ -1335,7 +1362,6 @@
 
 	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
 	task->tk_status = 0;
-	req->rq_gotit	= 0;
 	req->rq_timeout = xprt->timeout;
 	req->rq_task	= task;
 	req->rq_xprt    = xprt;
@@ -1353,6 +1379,7 @@
 	struct rpc_xprt	*xprt = task->tk_xprt;
 	struct rpc_rqst	*req;
 
+	xprt_up_transmit(task);
 	if (!(req = task->tk_rqstp))
 		return;
 	task->tk_rqstp = NULL;
@@ -1363,16 +1390,16 @@
 	spin_lock(&xprt_lock);
 	req->rq_next = xprt->free;
 	xprt->free   = req;
-	spin_unlock(&xprt_lock);
 
 	/* remove slot from queue of pending */
-	spin_lock_bh(&xprt_lock);
 	if (task->tk_rpcwait) {
 		printk("RPC: task of released request still queued!\n");
-		rpc_del_timer(task);
+#ifdef RPC_DEBUG
+		printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
+#endif
 		rpc_remove_wait_queue(task);
 	}
-	spin_unlock_bh(&xprt_lock);
+	spin_unlock(&xprt_lock);
 
 	/* Decrease congestion value. */
 	xprt->cong -= RPC_CWNDSCALE;
@@ -1389,7 +1416,7 @@
 	if (proto == IPPROTO_UDP)
 		xprt_set_timeout(to, 5,  5 * HZ);
 	else
-		xprt_set_timeout(to, 5, 15 * HZ);
+		xprt_set_timeout(to, 5, 60 * HZ);
 }
 
 /*
@@ -1416,52 +1443,33 @@
 {
 	struct rpc_xprt	*xprt;
 	struct rpc_rqst	*req;
-	struct sock	*inet;
 	int		i;
 
 	dprintk("RPC:      setting up %s transport...\n",
 				proto == IPPROTO_UDP? "UDP" : "TCP");
 
-	inet = sock->sk;
-
 	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
 		return NULL;
 	memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
 
-	xprt->file = NULL;
-	xprt->sock = sock;
-	xprt->inet = inet;
 	xprt->addr = *ap;
 	xprt->prot = proto;
 	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
-	xprt->congtime = jiffies;
-	init_waitqueue_head(&xprt->cong_wait);
-	inet->user_data = xprt;
-	xprt->old_data_ready = inet->data_ready;
-	xprt->old_state_change = inet->state_change;
-	xprt->old_write_space = inet->write_space;
-	if (proto == IPPROTO_UDP) {
-		inet->data_ready = udp_data_ready;
-		inet->write_space = udp_write_space;
-		inet->no_check = UDP_CSUM_NORCV;
-		xprt->cwnd = RPC_INITCWND;
-	} else {
-		inet->data_ready = tcp_data_ready;
-		inet->state_change = tcp_state_change;
-		inet->write_space = tcp_write_space;
+	if (xprt->stream) {
 		xprt->cwnd = RPC_MAXCWND;
 		xprt->nocong = 1;
-	}
-	xprt->connected = 1;
+	} else
+		xprt->cwnd = RPC_INITCWND;
+	xprt->congtime = jiffies;
+	init_waitqueue_head(&xprt->cong_wait);
 
 	/* Set timeout parameters */
 	if (to) {
 		xprt->timeout = *to;
 		xprt->timeout.to_current = to->to_initval;
 		xprt->timeout.to_resrvval = to->to_maxval << 1;
-	} else {
+	} else
 		xprt_default_timeout(&xprt->timeout, xprt->prot);
-	}
 
 	xprt->pending = RPC_INIT_WAITQ("xprt_pending");
 	xprt->sending = RPC_INIT_WAITQ("xprt_sending");
@@ -1474,13 +1482,11 @@
 	req->rq_next = NULL;
 	xprt->free = xprt->slot;
 
+	INIT_LIST_HEAD(&xprt->rx_pending);
+
 	dprintk("RPC:      created transport %p\n", xprt);
 	
-	/*
-	 *	TCP requires the rpc I/O daemon is present
-	 */
-	if(proto==IPPROTO_TCP)
-		rpciod_up();
+	xprt_bind_socket(xprt, sock);
 	return xprt;
 }
 
@@ -1508,17 +1514,52 @@
 	return err;
 }
 
+static int 
+xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
+{
+	struct sock	*sk = sock->sk;
+
+	if (xprt->inet)
+		return -EBUSY;
+
+	sk->user_data = xprt;
+	xprt->old_data_ready = sk->data_ready;
+	xprt->old_state_change = sk->state_change;
+	xprt->old_write_space = sk->write_space;
+	if (xprt->prot == IPPROTO_UDP) {
+		sk->data_ready = udp_data_ready;
+		sk->write_space = udp_write_space;
+		sk->no_check = UDP_CSUM_NORCV;
+		xprt->connected = 1;
+	} else {
+		sk->data_ready = tcp_data_ready;
+		sk->state_change = tcp_state_change;
+		sk->write_space = tcp_write_space;
+		xprt->connected = 0;
+	}
+
+	/* Reset to new socket */
+	xprt->sock = sock;
+	xprt->inet = sk;
+	/*
+	 *	TCP requires the rpc I/O daemon is present
+	 */
+	if(xprt->stream)
+		rpciod_up();
+
+	return 0;
+}
+
 /*
  * Create a client socket given the protocol and peer address.
  */
 static struct socket *
-xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
+xprt_create_socket(int proto, struct rpc_timeout *to)
 {
 	struct socket	*sock;
 	int		type, err;
 
-	dprintk("RPC:      xprt_create_socket(%08x, %s %d)\n",
-			   sap? ntohl(sap->sin_addr.s_addr) : 0,
+	dprintk("RPC:      xprt_create_socket(%s %d)\n",
 			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
 
 	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
@@ -1532,15 +1573,6 @@
 	if (!current->fsuid && xprt_bindresvport(sock) < 0)
 		goto failed;
 
-	if (type == SOCK_STREAM && sap) {
-		err = sock->ops->connect(sock, (struct sockaddr *) sap,
-						sizeof(*sap), 0);
-		if (err < 0) {
-			printk("RPC: TCP connect failed (%d).\n", -err);
-			goto failed;
-		}
-	}
-
 	return sock;
 
 failed:
@@ -1559,7 +1591,7 @@
 
 	dprintk("RPC:      xprt_create_proto called\n");
 
-	if (!(sock = xprt_create_socket(proto, sap, to)))
+	if (!(sock = xprt_create_socket(proto, to)))
 		return NULL;
 
 	if (!(xprt = xprt_setup(sock, proto, sap, to)))
@@ -1587,8 +1619,6 @@
  */
 int
 xprt_clear_backlog(struct rpc_xprt *xprt) {
-	if (!xprt)
-		return 0;
 	if (RPCXPRT_CONGESTED(xprt))
 		return 0;
 	rpc_wake_up_next(&xprt->backlog);
@@ -1603,6 +1633,7 @@
 xprt_destroy(struct rpc_xprt *xprt)
 {
 	dprintk("RPC:      destroying transport %p\n", xprt);
+	xprt_shutdown(xprt);
 	xprt_close(xprt);
 	kfree(xprt);
 

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)